/** * Lakehouse MCP Server — bridges local LLMs to the data substrate. * * Tools: * - search_workers: hybrid SQL+vector (the core fix) * - query_sql: analytical SQL on any dataset * - match_contract: find workers for a job order * - get_worker: single worker by ID * - rag_question: full RAG pipeline * - log_success: record what worked → playbook DB * - get_playbooks: retrieve past successes * - swap_profile: hot-swap model + data context * - vram_status: GPU introspection */ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; async function api(method: string, path: string, body?: any) { const resp = await fetch(`${BASE}${path}`, { method, headers: body ? { "Content-Type": "application/json" } : {}, body: body ? JSON.stringify(body) : undefined, }); const text = await resp.text(); try { return JSON.parse(text); } catch { return { raw: text, status: resp.status }; } } const server = new McpServer({ name: "lakehouse", version: "1.0.0" }); server.tool( "search_workers", "Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.", { question: z.string().describe("Natural language question about workers"), sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""), dataset: z.string().default("ethereal_workers"), id_column: z.string().default("worker_id"), top_k: z.number().default(5), }, async ({ question, sql_filter, dataset, id_column, top_k }) => { const body: any = { question, index_name: "ethereal_workers_v1", filter_dataset: dataset, id_column, top_k, generate: true }; if (sql_filter) body.sql_filter = sql_filter; const r = await api("POST", "/vectors/hybrid", body); return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] }; }, ); server.tool( "query_sql", "Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (100K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).", { sql: z.string().describe("SQL query") }, async ({ sql }) => { const r = await api("POST", "/query/sql", { sql }); if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] }; return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] }; }, ); server.tool( "match_contract", "Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.", { role: z.string(), state: z.string(), city: z.string().optional(), min_reliability: z.number().default(0.7), required_certs: z.array(z.string()).default([]), headcount: z.number().default(5), }, async ({ role, state, city, min_reliability, required_certs, headcount }) => { let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`; if (city) filter += ` AND city = '${city}'`; const r = await api("POST", "/vectors/hybrid", { question: `Find the best ${role} workers with relevant skills and certifications`, index_name: "ethereal_workers_v1", sql_filter: filter, filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: headcount * 2, generate: false, }); let matches = r.sources || []; if (required_certs.length > 0) { const req = new Set(required_certs.map((c: string) => c.toLowerCase())); matches = matches.filter((m: any) => { const certs = (m.chunk_text || "").toLowerCase(); return [...req].every(c => certs.includes(c)); }); } return { content: [{ type: "text" as const, text: JSON.stringify({ contract: { role, state, city, min_reliability, required_certs }, matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method, }, null, 2) }] }; }, ); server.tool( "get_worker", "Fetch one worker profile by ID — all fields including scores and comms.", { worker_id: z.number() }, async ({ worker_id }) => { const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` }); if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] }; return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] }; }, ); server.tool( "rag_question", "Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.", { question: z.string(), index: z.string().default("ethereal_workers_v1"), top_k: z.number().default(5) }, async ({ question, index, top_k }) => { const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k }); return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] }; }, ); server.tool( "log_success", "Record a successful operation to the playbook database. Small models query this later to learn what worked.", { operation: z.string().describe("What was done"), approach: z.string().describe("How it was done"), result: z.string().describe("Outcome"), context: z.string().optional(), }, async ({ operation, approach, result, context }) => { const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`; const form = new FormData(); form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv"); const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form }); return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] }; }, ); server.tool( "get_playbooks", "Retrieve past successful operations. Small models use this to learn what approaches worked.", { keyword: z.string().optional(), limit: z.number().default(10) }, async ({ keyword, limit }) => { let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`; if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`; const r = await api("POST", "/query/sql", { sql }); if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] }; return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] }; }, ); server.tool( "swap_profile", "Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).", { profile_id: z.string() }, async ({ profile_id }) => { const r = await api("POST", `/vectors/profile/${profile_id}/activate`); return { content: [{ type: "text" as const, text: JSON.stringify({ profile: r.profile_id, model: r.ollama_name, indexes: r.indexes_warmed?.length, vectors: r.total_vectors, previous: r.previous_profile, duration: r.duration_secs, }, null, 2) }] }; }, ); server.tool( "vram_status", "GPU VRAM usage + loaded Ollama models. Check before swapping profiles.", {}, async () => { const r = await api("GET", "/ai/vram"); return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] }; }, ); // Resources server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => { const r = await api("GET", "/catalog/datasets") as any[]; const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n"); return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] }; }); // Start async function main() { const transport = new StdioServerTransport(); await server.connect(transport); console.error(`Lakehouse MCP server started → ${BASE}`); console.error("Tools: search_workers, query_sql, match_contract, get_worker, rag_question, log_success, get_playbooks, swap_profile, vram_status"); } main().catch(console.error);