root a001a21902 MCP self-orientation: /context + /verify + architecture resources
Any agent (Claude Code via MCP stdio, or sub-agents via HTTP :3700)
can now self-orient without human explanation:

GET /context returns:
  - System purpose and name
  - All datasets with row counts
  - All vector indexes with backends
  - Available models and their strengths
  - Complete tool list with rules
  - Current VRAM state

POST /verify fact-checks any claim about a worker against the golden
data. Agent says "worker 1313 is a Forklift Operator in IL with
reliability 0.82" → endpoint returns verified=true/false with exact
discrepancies.

MCP resources (stdio path for Claude Code):
  - lakehouse://system — live system status
  - lakehouse://architecture — full PRD
  - lakehouse://instructions — agent operating manual
  - lakehouse://playbooks — successful operations database
  - lakehouse://datasets — dataset listing

This is the "command and control" layer J asked for: any agent
connecting to this system gets the context it needs to operate
independently. No human intermediary required.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-17 00:41:46 -05:00

426 lines
20 KiB
TypeScript

/**
* Lakehouse MCP Server — bridges local LLMs to the data substrate.
*
* Tools:
* - search_workers: hybrid SQL+vector (the core fix)
* - query_sql: analytical SQL on any dataset
* - match_contract: find workers for a job order
* - get_worker: single worker by ID
* - rag_question: full RAG pipeline
* - log_success: record what worked → playbook DB
* - get_playbooks: retrieve past successes
* - swap_profile: hot-swap model + data context
* - vram_status: GPU introspection
*/
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { z } from "zod";
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const PORT = parseInt(process.env.MCP_PORT || "3700");
const MODE = process.env.MCP_TRANSPORT || "http"; // "stdio" or "http"
async function api(method: string, path: string, body?: any) {
const resp = await fetch(`${BASE}${path}`, {
method,
headers: body ? { "Content-Type": "application/json" } : {},
body: body ? JSON.stringify(body) : undefined,
});
const text = await resp.text();
try { return JSON.parse(text); } catch { return { raw: text, status: resp.status }; }
}
const server = new McpServer({ name: "lakehouse", version: "1.0.0" });
server.tool(
"search_workers",
"Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.",
{
question: z.string().describe("Natural language question about workers"),
sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""),
dataset: z.string().default("ethereal_workers"),
id_column: z.string().default("worker_id"),
top_k: z.number().default(5),
},
async ({ question, sql_filter, dataset, id_column, top_k }) => {
const body: any = { question, index_name: "ethereal_workers_v1", filter_dataset: dataset, id_column, top_k, generate: true };
if (sql_filter) body.sql_filter = sql_filter;
const r = await api("POST", "/vectors/hybrid", body);
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
},
);
server.tool(
"query_sql",
"Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (100K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).",
{ sql: z.string().describe("SQL query") },
async ({ sql }) => {
const r = await api("POST", "/query/sql", { sql });
if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] };
return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] };
},
);
server.tool(
"match_contract",
"Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.",
{
role: z.string(), state: z.string(), city: z.string().optional(),
min_reliability: z.number().default(0.7),
required_certs: z.array(z.string()).default([]),
headcount: z.number().default(5),
},
async ({ role, state, city, min_reliability, required_certs, headcount }) => {
let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`;
if (city) filter += ` AND city = '${city}'`;
const r = await api("POST", "/vectors/hybrid", {
question: `Find the best ${role} workers with relevant skills and certifications`,
index_name: "ethereal_workers_v1", sql_filter: filter,
filter_dataset: "ethereal_workers", id_column: "worker_id",
top_k: headcount * 2, generate: false,
});
let matches = r.sources || [];
if (required_certs.length > 0) {
const req = new Set(required_certs.map((c: string) => c.toLowerCase()));
matches = matches.filter((m: any) => {
const certs = (m.chunk_text || "").toLowerCase();
return [...req].every(c => certs.includes(c));
});
}
return { content: [{ type: "text" as const, text: JSON.stringify({
contract: { role, state, city, min_reliability, required_certs },
matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method,
}, null, 2) }] };
},
);
server.tool(
"get_worker",
"Fetch one worker profile by ID — all fields including scores and comms.",
{ worker_id: z.number() },
async ({ worker_id }) => {
const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` });
if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] };
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] };
},
);
server.tool(
"rag_question",
"Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.",
{ question: z.string(), index: z.string().default("ethereal_workers_v1"), top_k: z.number().default(5) },
async ({ question, index, top_k }) => {
const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k });
return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] };
},
);
server.tool(
"log_success",
"Record a successful operation to the playbook database. Small models query this later to learn what worked.",
{
operation: z.string().describe("What was done"),
approach: z.string().describe("How it was done"),
result: z.string().describe("Outcome"),
context: z.string().optional(),
},
async ({ operation, approach, result, context }) => {
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`;
const form = new FormData();
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] };
},
);
server.tool(
"get_playbooks",
"Retrieve past successful operations. Small models use this to learn what approaches worked.",
{ keyword: z.string().optional(), limit: z.number().default(10) },
async ({ keyword, limit }) => {
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`;
const r = await api("POST", "/query/sql", { sql });
if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] };
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] };
},
);
server.tool(
"swap_profile",
"Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).",
{ profile_id: z.string() },
async ({ profile_id }) => {
const r = await api("POST", `/vectors/profile/${profile_id}/activate`);
return { content: [{ type: "text" as const, text: JSON.stringify({
profile: r.profile_id, model: r.ollama_name,
indexes: r.indexes_warmed?.length, vectors: r.total_vectors,
previous: r.previous_profile, duration: r.duration_secs,
}, null, 2) }] };
},
);
server.tool(
"vram_status",
"GPU VRAM usage + loaded Ollama models. Check before swapping profiles.",
{},
async () => {
const r = await api("GET", "/ai/vram");
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
},
);
// Resources — these give any MCP client full context about the system
server.resource("lakehouse://system", "lakehouse://system", async (uri) => {
const health = await api("GET", "/health");
const datasets = await api("GET", "/catalog/datasets") as any[];
const indexes = await api("GET", "/vectors/indexes") as any[];
const vram = await api("GET", "/ai/vram");
const agent = await api("GET", "/vectors/agent/status");
const buckets = await api("GET", "/storage/buckets");
const text = `# Lakehouse System Status
## Health: ${health === "lakehouse ok" ? "OK" : JSON.stringify(health)}
## Datasets (${datasets.length})
${datasets.map((d: any) => `- ${d.name}: ${d.row_count || "?"} rows`).join("\n")}
## Vector Indexes (${indexes.length})
${(indexes as any[]).map((i: any) => `- ${i.index_name}: ${i.chunk_count} chunks (${i.vector_backend || "parquet"})`).join("\n")}
## GPU
- Used: ${vram?.gpu?.used_mib || "?"}/${vram?.gpu?.total_mib || "?"} MiB
- Models loaded: ${(vram?.ollama_loaded || []).map((m: any) => m.name).join(", ") || "none"}
## Autotune Agent
- Running: ${agent?.running}, Trials: ${agent?.trials_run}, Promotions: ${agent?.promotions}
## Buckets (${(buckets as any[])?.length || 0})
${(buckets as any[] || []).map((b: any) => `- ${b.name}: ${b.backend} (${b.reachable ? "reachable" : "DOWN"})`).join("\n")}
## Services
- Lakehouse Gateway: :3100
- AI Sidecar: :3200
- Agent Gateway: :3700
- Langfuse: :3001
- MinIO S3: :9000
- Ollama: :11434
## Available Models
- qwen3: 8.2B, 40K context, thinking+tools (best for reasoning)
- qwen2.5: 7B, 8K context (best for fast SQL generation)
- mistral: 7B, 8K context (general generation)
- nomic-embed-text: 137M (embedding, automatic)
`;
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
});
server.resource("lakehouse://architecture", "lakehouse://architecture", async (uri) => {
// Read the PRD directly
const prd = await Bun.file("/home/profit/lakehouse/docs/PRD.md").text().catch(() => "PRD not found");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: prd }] };
});
server.resource("lakehouse://instructions", "lakehouse://instructions", async (uri) => {
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "Instructions not found");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: instructions }] };
});
server.resource("lakehouse://playbooks", "lakehouse://playbooks", async (uri) => {
const r = await api("POST", "/query/sql", {
sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20"
});
const rows = r?.rows || [];
const text = rows.length === 0
? "No playbooks yet. Log successful operations with the log_success tool."
: rows.map((p: any) => `## ${p.operation}\n- Approach: ${p.approach}\n- Result: ${p.result}\n- Context: ${p.context || "—"}\n`).join("\n");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: `# Successful Playbooks\n\n${text}` }] };
});
server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => {
const r = await api("GET", "/catalog/datasets") as any[];
const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n");
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
});
// ─── Dual mode: stdio (Claude Code) or HTTP (internal agents) ───
async function main() {
if (MODE === "stdio") {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error(`Lakehouse MCP (stdio) → ${BASE}`);
return;
}
// HTTP mode — a REST gateway that internal agents call directly.
// No MCP protocol complexity for consumers — just POST JSON, get JSON.
// The MCP tool definitions above are reused for the stdio path; this
// HTTP path wraps the same lakehouse API with agent-friendly routing.
Bun.serve({
port: PORT,
async fetch(req) {
const url = new URL(req.url);
const json = async () => req.method === "POST" ? await req.json() : {};
const ok = (data: any) => Response.json(data);
const err = (msg: string, status = 400) => Response.json({ error: msg }, { status });
try {
// Health
if (url.pathname === "/health") return ok({ status: "ok", lakehouse: BASE, tools: 11 });
// Self-orientation: any agent calls this first to understand the system
if (url.pathname === "/context") {
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "");
const datasets = await api("GET", "/catalog/datasets") as any[];
const indexes = await api("GET", "/vectors/indexes") as any[];
const vram = await api("GET", "/ai/vram");
return ok({
system: "Lakehouse Staffing Co-Pilot",
purpose: "AI anticipates staffing coordinator needs — pre-matches workers to contracts, surfaces alerts, builds playbooks from successful operations",
instructions: instructions.slice(0, 3000),
datasets: (datasets || []).map((d: any) => ({ name: d.name, rows: d.row_count })),
indexes: (indexes || []).map((i: any) => ({ name: i.index_name, chunks: i.chunk_count, backend: i.vector_backend })),
models: { qwen3: "8.2B reasoning+tools", qwen2_5: "7B fast SQL", mistral: "7B generation", nomic: "137M embedding" },
vram: vram?.gpu,
tools: ["/search","/sql","/match","/worker/:id","/ask","/log","/playbooks","/profile/:id","/vram","/context","/verify"],
rules: [
"Never hallucinate — only state facts from tool responses",
"SQL for counts/aggregations, hybrid /search for matching",
"Log every successful operation to /log",
"Check /playbooks before complex tasks",
"Verify worker details via /worker/:id before communicating",
],
});
}
// Verification endpoint — agent can check any claim against SQL
if (url.pathname === "/verify") {
const b = await json();
// b.claim: "worker 4925 is a Forklift Operator in IL with reliability 0.82"
// b.worker_id: 4925
// b.checks: { role: "Forklift Operator", state: "IL", reliability: 0.82 }
if (!b.worker_id) return err("worker_id required");
const r = await api("POST", "/query/sql", {
sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${b.worker_id}`
});
const worker = r?.rows?.[0];
if (!worker) return ok({ verified: false, reason: `worker ${b.worker_id} not found` });
const checks = b.checks || {};
const failures: string[] = [];
for (const [field, expected] of Object.entries(checks)) {
const actual = worker[field];
if (actual === undefined) continue;
if (typeof expected === "number") {
if (Math.abs(Number(actual) - expected) > 0.05) {
failures.push(`${field}: claimed=${expected} actual=${actual}`);
}
} else if (String(actual).toLowerCase() !== String(expected).toLowerCase()) {
failures.push(`${field}: claimed=${expected} actual=${actual}`);
}
}
return ok({
verified: failures.length === 0,
worker_id: b.worker_id,
worker_name: worker.name,
failures,
actual: worker,
});
}
// Tool: hybrid search
if (url.pathname === "/search") {
const b = await json();
return ok(await api("POST", "/vectors/hybrid", {
question: b.question, index_name: b.index || "ethereal_workers_v1",
sql_filter: b.sql_filter, filter_dataset: b.dataset || "ethereal_workers",
id_column: b.id_column || "worker_id", top_k: b.top_k || 5, generate: b.generate !== false,
}));
}
// Tool: SQL
if (url.pathname === "/sql") {
const b = await json();
return ok(await api("POST", "/query/sql", { sql: b.sql }));
}
// Tool: match contract
if (url.pathname === "/match") {
const b = await json();
let filter = `role = '${b.role}' AND state = '${b.state}' AND reliability >= ${b.min_reliability || 0.7}`;
if (b.city) filter += ` AND city = '${b.city}'`;
return ok(await api("POST", "/vectors/hybrid", {
question: `Best ${b.role} workers with relevant skills`,
index_name: b.index || "ethereal_workers_v1", sql_filter: filter,
filter_dataset: b.dataset || "ethereal_workers",
id_column: "worker_id", top_k: (b.headcount || 5) * 2, generate: false,
}));
}
// Tool: get worker
if (url.pathname.startsWith("/worker/")) {
const id = url.pathname.split("/")[2];
return ok(await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${id}` }));
}
// Tool: RAG
if (url.pathname === "/ask") {
const b = await json();
return ok(await api("POST", "/vectors/rag", { index_name: b.index || "ethereal_workers_v1", question: b.question, top_k: b.top_k || 5 }));
}
// Tool: log success
if (url.pathname === "/log") {
const b = await json();
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${(b.operation||"").replace(/"/g,'""')}","${(b.approach||"").replace(/"/g,'""')}","${(b.result||"").replace(/"/g,'""')}","${(b.context||"").replace(/"/g,'""')}"`;
const form = new FormData();
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
const r = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
return ok({ logged: true, response: await r.text() });
}
// Tool: get playbooks
if (url.pathname === "/playbooks") {
const kw = url.searchParams.get("keyword");
const limit = url.searchParams.get("limit") || "10";
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
if (kw) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${kw}%' OR approach LIKE '%${kw}%' ORDER BY timestamp DESC LIMIT ${limit}`;
const r = await api("POST", "/query/sql", { sql });
return ok(r.error ? { playbooks: [], note: "No playbooks yet" } : { playbooks: r.rows });
}
// Tool: swap profile
if (url.pathname.startsWith("/profile/")) {
const id = url.pathname.split("/")[2];
return ok(await api("POST", `/vectors/profile/${id}/activate`));
}
// Tool: VRAM
if (url.pathname === "/vram") return ok(await api("GET", "/ai/vram"));
// Pass-through to lakehouse for anything else
if (url.pathname.startsWith("/api/")) {
const path = url.pathname.replace("/api", "");
const body = req.method !== "GET" ? await req.text() : undefined;
const r = await fetch(`${BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body });
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
}
return err("Unknown path. Available: /health /search /sql /match /worker/:id /ask /log /playbooks /profile/:id /vram /api/*", 404);
} catch (e: any) {
return err(e.message || String(e), 500);
}
},
});
console.error(`Lakehouse Agent Gateway :${PORT}${BASE}`);
}
main().catch(console.error);