Three fixes: 1. CORS headers on all gateway responses (browser dashboard was blocked by same-origin policy) 2. Dashboard JS uses window.location.origin instead of hardcoded localhost:3700 (LAN browsers couldn't reach it) 3. Langfuse tracing wired into every gateway request — api() wrapper creates spans for each lakehouse call, logGeneration for LLM calls. Week simulation now produces 34 observations per run visible in Langfuse UI. 7 traces confirmed in Langfuse after restart. Every /sql, /search, /vram, /simulation call is tracked with timing + inputs + outputs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
623 lines
28 KiB
TypeScript
623 lines
28 KiB
TypeScript
/**
|
|
* Lakehouse MCP Server — bridges local LLMs to the data substrate.
|
|
*
|
|
* Tools:
|
|
* - search_workers: hybrid SQL+vector (the core fix)
|
|
* - query_sql: analytical SQL on any dataset
|
|
* - match_contract: find workers for a job order
|
|
* - get_worker: single worker by ID
|
|
* - rag_question: full RAG pipeline
|
|
* - log_success: record what worked → playbook DB
|
|
* - get_playbooks: retrieve past successes
|
|
* - swap_profile: hot-swap model + data context
|
|
* - vram_status: GPU introspection
|
|
*/
|
|
|
|
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
|
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
|
|
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
|
|
import { z } from "zod";
|
|
import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js";
|
|
|
|
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
|
const PORT = parseInt(process.env.MCP_PORT || "3700");
|
|
const MODE = process.env.MCP_TRANSPORT || "http"; // "stdio" or "http"
|
|
|
|
// Active trace for the current request — set per-request in the HTTP handler
|
|
let activeTrace: ReturnType<typeof startTrace> | null = null;
|
|
|
|
async function api(method: string, path: string, body?: any) {
|
|
const t0 = Date.now();
|
|
const resp = await fetch(`${BASE}${path}`, {
|
|
method,
|
|
headers: body ? { "Content-Type": "application/json" } : {},
|
|
body: body ? JSON.stringify(body) : undefined,
|
|
});
|
|
const text = await resp.text();
|
|
const ms = Date.now() - t0;
|
|
let parsed: any;
|
|
try { parsed = JSON.parse(text); } catch { parsed = { raw: text, status: resp.status }; }
|
|
|
|
// Trace the call if we have an active trace
|
|
if (activeTrace) {
|
|
const isGen = path.includes("/generate");
|
|
if (isGen) {
|
|
logGeneration(activeTrace, `lakehouse${path}`, {
|
|
model: body?.model || "unknown",
|
|
prompt: typeof body?.prompt === "string" ? body.prompt.slice(0, 500) : JSON.stringify(body).slice(0, 300),
|
|
completion: typeof parsed?.text === "string" ? parsed.text.slice(0, 500) : JSON.stringify(parsed).slice(0, 300),
|
|
duration_ms: ms,
|
|
tokens_in: parsed?.prompt_eval_count,
|
|
tokens_out: parsed?.eval_count,
|
|
});
|
|
} else {
|
|
logSpan(activeTrace, `lakehouse${path}`, body, {
|
|
rows: parsed?.row_count, sources: parsed?.sources?.length,
|
|
sql_matches: parsed?.sql_matches, method: parsed?.method,
|
|
}, ms);
|
|
}
|
|
}
|
|
|
|
return parsed;
|
|
}
|
|
|
|
const server = new McpServer({ name: "lakehouse", version: "1.0.0" });
|
|
|
|
server.tool(
|
|
"search_workers",
|
|
"Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.",
|
|
{
|
|
question: z.string().describe("Natural language question about workers"),
|
|
sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""),
|
|
dataset: z.string().default("ethereal_workers"),
|
|
id_column: z.string().default("worker_id"),
|
|
top_k: z.number().default(5),
|
|
},
|
|
async ({ question, sql_filter, dataset, id_column, top_k }) => {
|
|
const body: any = { question, index_name: "ethereal_workers_v1", filter_dataset: dataset, id_column, top_k, generate: true };
|
|
if (sql_filter) body.sql_filter = sql_filter;
|
|
const r = await api("POST", "/vectors/hybrid", body);
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"query_sql",
|
|
"Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (100K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).",
|
|
{ sql: z.string().describe("SQL query") },
|
|
async ({ sql }) => {
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] };
|
|
return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"match_contract",
|
|
"Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.",
|
|
{
|
|
role: z.string(), state: z.string(), city: z.string().optional(),
|
|
min_reliability: z.number().default(0.7),
|
|
required_certs: z.array(z.string()).default([]),
|
|
headcount: z.number().default(5),
|
|
},
|
|
async ({ role, state, city, min_reliability, required_certs, headcount }) => {
|
|
let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`;
|
|
if (city) filter += ` AND city = '${city}'`;
|
|
const r = await api("POST", "/vectors/hybrid", {
|
|
question: `Find the best ${role} workers with relevant skills and certifications`,
|
|
index_name: "ethereal_workers_v1", sql_filter: filter,
|
|
filter_dataset: "ethereal_workers", id_column: "worker_id",
|
|
top_k: headcount * 2, generate: false,
|
|
});
|
|
let matches = r.sources || [];
|
|
if (required_certs.length > 0) {
|
|
const req = new Set(required_certs.map((c: string) => c.toLowerCase()));
|
|
matches = matches.filter((m: any) => {
|
|
const certs = (m.chunk_text || "").toLowerCase();
|
|
return [...req].every(c => certs.includes(c));
|
|
});
|
|
}
|
|
return { content: [{ type: "text" as const, text: JSON.stringify({
|
|
contract: { role, state, city, min_reliability, required_certs },
|
|
matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method,
|
|
}, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"get_worker",
|
|
"Fetch one worker profile by ID — all fields including scores and comms.",
|
|
{ worker_id: z.number() },
|
|
async ({ worker_id }) => {
|
|
const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` });
|
|
if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] };
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"rag_question",
|
|
"Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.",
|
|
{ question: z.string(), index: z.string().default("ethereal_workers_v1"), top_k: z.number().default(5) },
|
|
async ({ question, index, top_k }) => {
|
|
const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k });
|
|
return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"log_success",
|
|
"Record a successful operation to the playbook database. Small models query this later to learn what worked.",
|
|
{
|
|
operation: z.string().describe("What was done"),
|
|
approach: z.string().describe("How it was done"),
|
|
result: z.string().describe("Outcome"),
|
|
context: z.string().optional(),
|
|
},
|
|
async ({ operation, approach, result, context }) => {
|
|
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`;
|
|
const form = new FormData();
|
|
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
|
|
const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
|
|
return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"get_playbooks",
|
|
"Retrieve past successful operations. Small models use this to learn what approaches worked.",
|
|
{ keyword: z.string().optional(), limit: z.number().default(10) },
|
|
async ({ keyword, limit }) => {
|
|
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] };
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"swap_profile",
|
|
"Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).",
|
|
{ profile_id: z.string() },
|
|
async ({ profile_id }) => {
|
|
const r = await api("POST", `/vectors/profile/${profile_id}/activate`);
|
|
return { content: [{ type: "text" as const, text: JSON.stringify({
|
|
profile: r.profile_id, model: r.ollama_name,
|
|
indexes: r.indexes_warmed?.length, vectors: r.total_vectors,
|
|
previous: r.previous_profile, duration: r.duration_secs,
|
|
}, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"vram_status",
|
|
"GPU VRAM usage + loaded Ollama models. Check before swapping profiles.",
|
|
{},
|
|
async () => {
|
|
const r = await api("GET", "/ai/vram");
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
// Resources — these give any MCP client full context about the system
|
|
|
|
server.resource("lakehouse://system", "lakehouse://system", async (uri) => {
|
|
const health = await api("GET", "/health");
|
|
const datasets = await api("GET", "/catalog/datasets") as any[];
|
|
const indexes = await api("GET", "/vectors/indexes") as any[];
|
|
const vram = await api("GET", "/ai/vram");
|
|
const agent = await api("GET", "/vectors/agent/status");
|
|
const buckets = await api("GET", "/storage/buckets");
|
|
|
|
const text = `# Lakehouse System Status
|
|
|
|
## Health: ${health === "lakehouse ok" ? "OK" : JSON.stringify(health)}
|
|
|
|
## Datasets (${datasets.length})
|
|
${datasets.map((d: any) => `- ${d.name}: ${d.row_count || "?"} rows`).join("\n")}
|
|
|
|
## Vector Indexes (${indexes.length})
|
|
${(indexes as any[]).map((i: any) => `- ${i.index_name}: ${i.chunk_count} chunks (${i.vector_backend || "parquet"})`).join("\n")}
|
|
|
|
## GPU
|
|
- Used: ${vram?.gpu?.used_mib || "?"}/${vram?.gpu?.total_mib || "?"} MiB
|
|
- Models loaded: ${(vram?.ollama_loaded || []).map((m: any) => m.name).join(", ") || "none"}
|
|
|
|
## Autotune Agent
|
|
- Running: ${agent?.running}, Trials: ${agent?.trials_run}, Promotions: ${agent?.promotions}
|
|
|
|
## Buckets (${(buckets as any[])?.length || 0})
|
|
${(buckets as any[] || []).map((b: any) => `- ${b.name}: ${b.backend} (${b.reachable ? "reachable" : "DOWN"})`).join("\n")}
|
|
|
|
## Services
|
|
- Lakehouse Gateway: :3100
|
|
- AI Sidecar: :3200
|
|
- Agent Gateway: :3700
|
|
- Langfuse: :3001
|
|
- MinIO S3: :9000
|
|
- Ollama: :11434
|
|
|
|
## Available Models
|
|
- qwen3: 8.2B, 40K context, thinking+tools (best for reasoning)
|
|
- qwen2.5: 7B, 8K context (best for fast SQL generation)
|
|
- mistral: 7B, 8K context (general generation)
|
|
- nomic-embed-text: 137M (embedding, automatic)
|
|
`;
|
|
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
|
|
});
|
|
|
|
server.resource("lakehouse://architecture", "lakehouse://architecture", async (uri) => {
|
|
// Read the PRD directly
|
|
const prd = await Bun.file("/home/profit/lakehouse/docs/PRD.md").text().catch(() => "PRD not found");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: prd }] };
|
|
});
|
|
|
|
server.resource("lakehouse://instructions", "lakehouse://instructions", async (uri) => {
|
|
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "Instructions not found");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: instructions }] };
|
|
});
|
|
|
|
server.resource("lakehouse://playbooks", "lakehouse://playbooks", async (uri) => {
|
|
const r = await api("POST", "/query/sql", {
|
|
sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20"
|
|
});
|
|
const rows = r?.rows || [];
|
|
const text = rows.length === 0
|
|
? "No playbooks yet. Log successful operations with the log_success tool."
|
|
: rows.map((p: any) => `## ${p.operation}\n- Approach: ${p.approach}\n- Result: ${p.result}\n- Context: ${p.context || "—"}\n`).join("\n");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: `# Successful Playbooks\n\n${text}` }] };
|
|
});
|
|
|
|
server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => {
|
|
const r = await api("GET", "/catalog/datasets") as any[];
|
|
const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
|
|
});
|
|
|
|
// ─── Dual mode: stdio (Claude Code) or HTTP (internal agents) ───
|
|
|
|
async function main() {
|
|
if (MODE === "stdio") {
|
|
const transport = new StdioServerTransport();
|
|
await server.connect(transport);
|
|
console.error(`Lakehouse MCP (stdio) → ${BASE}`);
|
|
return;
|
|
}
|
|
|
|
// HTTP mode — a REST gateway that internal agents call directly.
|
|
// No MCP protocol complexity for consumers — just POST JSON, get JSON.
|
|
// The MCP tool definitions above are reused for the stdio path; this
|
|
// HTTP path wraps the same lakehouse API with agent-friendly routing.
|
|
|
|
Bun.serve({
|
|
port: PORT,
|
|
async fetch(req) {
|
|
const url = new URL(req.url);
|
|
const json = async () => req.method === "POST" ? await req.json() : {};
|
|
|
|
// CORS — dashboard runs in the browser, gateway is a different origin
|
|
const cors = {
|
|
"Access-Control-Allow-Origin": "*",
|
|
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
|
"Access-Control-Allow-Headers": "Content-Type",
|
|
};
|
|
if (req.method === "OPTIONS") return new Response(null, { status: 204, headers: cors });
|
|
|
|
const ok = (data: any) => Response.json(data, { headers: cors });
|
|
const err = (msg: string, status = 400) => Response.json({ error: msg }, { status, headers: cors });
|
|
|
|
try {
|
|
// Health — no trace needed
|
|
if (url.pathname === "/health") return ok({ status: "ok", lakehouse: BASE, tools: 11 });
|
|
|
|
// Start a Langfuse trace for every non-static request
|
|
if (req.method === "POST" || !["/","/dashboard","/dashboard.css","/dashboard.ts","/dashboard.js"].includes(url.pathname)) {
|
|
activeTrace = startTrace(`gw:${url.pathname}`, { method: req.method, path: url.pathname });
|
|
}
|
|
|
|
// Self-orientation: any agent calls this first to understand the system
|
|
if (url.pathname === "/context") {
|
|
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "");
|
|
const datasets = await api("GET", "/catalog/datasets") as any[];
|
|
const indexes = await api("GET", "/vectors/indexes") as any[];
|
|
const vram = await api("GET", "/ai/vram");
|
|
return ok({
|
|
system: "Lakehouse Staffing Co-Pilot",
|
|
purpose: "AI anticipates staffing coordinator needs — pre-matches workers to contracts, surfaces alerts, builds playbooks from successful operations",
|
|
instructions: instructions.slice(0, 3000),
|
|
datasets: (datasets || []).map((d: any) => ({ name: d.name, rows: d.row_count })),
|
|
indexes: (indexes || []).map((i: any) => ({ name: i.index_name, chunks: i.chunk_count, backend: i.vector_backend })),
|
|
models: { qwen3: "8.2B reasoning+tools", qwen2_5: "7B fast SQL", mistral: "7B generation", nomic: "137M embedding" },
|
|
vram: vram?.gpu,
|
|
tools: ["/search","/sql","/match","/worker/:id","/ask","/log","/playbooks","/profile/:id","/vram","/context","/verify"],
|
|
rules: [
|
|
"Never hallucinate — only state facts from tool responses",
|
|
"SQL for counts/aggregations, hybrid /search for matching",
|
|
"Log every successful operation to /log",
|
|
"Check /playbooks before complex tasks",
|
|
"Verify worker details via /worker/:id before communicating",
|
|
],
|
|
});
|
|
}
|
|
|
|
// Verification endpoint — agent can check any claim against SQL
|
|
if (url.pathname === "/verify") {
|
|
const b = await json();
|
|
// b.claim: "worker 4925 is a Forklift Operator in IL with reliability 0.82"
|
|
// b.worker_id: 4925
|
|
// b.checks: { role: "Forklift Operator", state: "IL", reliability: 0.82 }
|
|
if (!b.worker_id) return err("worker_id required");
|
|
const r = await api("POST", "/query/sql", {
|
|
sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${b.worker_id}`
|
|
});
|
|
const worker = r?.rows?.[0];
|
|
if (!worker) return ok({ verified: false, reason: `worker ${b.worker_id} not found` });
|
|
|
|
const checks = b.checks || {};
|
|
const failures: string[] = [];
|
|
for (const [field, expected] of Object.entries(checks)) {
|
|
const actual = worker[field];
|
|
if (actual === undefined) continue;
|
|
if (typeof expected === "number") {
|
|
if (Math.abs(Number(actual) - expected) > 0.05) {
|
|
failures.push(`${field}: claimed=${expected} actual=${actual}`);
|
|
}
|
|
} else if (String(actual).toLowerCase() !== String(expected).toLowerCase()) {
|
|
failures.push(`${field}: claimed=${expected} actual=${actual}`);
|
|
}
|
|
}
|
|
return ok({
|
|
verified: failures.length === 0,
|
|
worker_id: b.worker_id,
|
|
worker_name: worker.name,
|
|
failures,
|
|
actual: worker,
|
|
});
|
|
}
|
|
|
|
// Tool: hybrid search
|
|
if (url.pathname === "/search") {
|
|
const b = await json();
|
|
return ok(await api("POST", "/vectors/hybrid", {
|
|
question: b.question, index_name: b.index || "ethereal_workers_v1",
|
|
sql_filter: b.sql_filter, filter_dataset: b.dataset || "ethereal_workers",
|
|
id_column: b.id_column || "worker_id", top_k: b.top_k || 5, generate: b.generate !== false,
|
|
}));
|
|
}
|
|
|
|
// Tool: SQL
|
|
if (url.pathname === "/sql") {
|
|
const b = await json();
|
|
return ok(await api("POST", "/query/sql", { sql: b.sql }));
|
|
}
|
|
|
|
// Tool: match contract
|
|
if (url.pathname === "/match") {
|
|
const b = await json();
|
|
let filter = `role = '${b.role}' AND state = '${b.state}' AND reliability >= ${b.min_reliability || 0.7}`;
|
|
if (b.city) filter += ` AND city = '${b.city}'`;
|
|
return ok(await api("POST", "/vectors/hybrid", {
|
|
question: `Best ${b.role} workers with relevant skills`,
|
|
index_name: b.index || "ethereal_workers_v1", sql_filter: filter,
|
|
filter_dataset: b.dataset || "ethereal_workers",
|
|
id_column: "worker_id", top_k: (b.headcount || 5) * 2, generate: false,
|
|
}));
|
|
}
|
|
|
|
// Tool: get worker
|
|
if (url.pathname.startsWith("/worker/")) {
|
|
const id = url.pathname.split("/")[2];
|
|
return ok(await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${id}` }));
|
|
}
|
|
|
|
// Tool: RAG
|
|
if (url.pathname === "/ask") {
|
|
const b = await json();
|
|
return ok(await api("POST", "/vectors/rag", { index_name: b.index || "ethereal_workers_v1", question: b.question, top_k: b.top_k || 5 }));
|
|
}
|
|
|
|
// Tool: log success
|
|
if (url.pathname === "/log") {
|
|
const b = await json();
|
|
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${(b.operation||"").replace(/"/g,'""')}","${(b.approach||"").replace(/"/g,'""')}","${(b.result||"").replace(/"/g,'""')}","${(b.context||"").replace(/"/g,'""')}"`;
|
|
const form = new FormData();
|
|
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
|
|
const r = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
|
|
return ok({ logged: true, response: await r.text() });
|
|
}
|
|
|
|
// Tool: get playbooks
|
|
if (url.pathname === "/playbooks") {
|
|
const kw = url.searchParams.get("keyword");
|
|
const limit = url.searchParams.get("limit") || "10";
|
|
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
if (kw) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${kw}%' OR approach LIKE '%${kw}%' ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
return ok(r.error ? { playbooks: [], note: "No playbooks yet" } : { playbooks: r.rows });
|
|
}
|
|
|
|
// Tool: swap profile
|
|
if (url.pathname.startsWith("/profile/")) {
|
|
const id = url.pathname.split("/")[2];
|
|
return ok(await api("POST", `/vectors/profile/${id}/activate`));
|
|
}
|
|
|
|
// Tool: VRAM
|
|
if (url.pathname === "/vram") return ok(await api("GET", "/ai/vram"));
|
|
|
|
// Pass-through to lakehouse for anything else
|
|
if (url.pathname.startsWith("/api/")) {
|
|
const path = url.pathname.replace("/api", "");
|
|
const body = req.method !== "GET" ? await req.text() : undefined;
|
|
const r = await fetch(`${BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body });
|
|
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
|
|
}
|
|
|
|
// Dashboard UI
|
|
if (url.pathname === "/" || url.pathname === "/dashboard") {
|
|
return new Response(Bun.file(import.meta.dir + "/dashboard.html"));
|
|
}
|
|
if (url.pathname === "/dashboard.css") {
|
|
return new Response(Bun.file(import.meta.dir + "/dashboard.css"), { headers: { "Content-Type": "text/css" } });
|
|
}
|
|
if (url.pathname === "/dashboard.ts" || url.pathname === "/dashboard.js") {
|
|
// Bun transpiles TS on the fly
|
|
const built = await Bun.build({ entrypoints: [import.meta.dir + "/dashboard.ts"], target: "browser" });
|
|
const js = await built.outputs[0].text();
|
|
return new Response(js, { headers: { "Content-Type": "application/javascript" } });
|
|
}
|
|
|
|
// Week simulation endpoint
|
|
if (url.pathname === "/simulation/run" && req.method === "POST") {
|
|
return ok(await runWeekSimulation());
|
|
}
|
|
|
|
activeTrace = null;
|
|
return err("Unknown path. Available: / /health /search /sql /match /worker/:id /ask /log /playbooks /profile/:id /vram /context /verify /simulation/run", 404);
|
|
} catch (e: any) {
|
|
if (activeTrace) { scoreTrace(activeTrace, "error", 0, e.message); }
|
|
activeTrace = null;
|
|
return err(e.message || String(e), 500);
|
|
} finally {
|
|
// Flush traces async — don't block the response
|
|
flushTraces().catch(() => {});
|
|
activeTrace = null;
|
|
}
|
|
},
|
|
});
|
|
|
|
console.error(`Lakehouse Agent Gateway :${PORT} → ${BASE}`);
|
|
}
|
|
|
|
main().catch(console.error);
|
|
|
|
// ─── Week simulation engine ───
|
|
|
|
const ROLES = ["Forklift Operator","Machine Operator","Assembler","Loader","Quality Tech","Welder","Sanitation Worker","Shipping Clerk","Production Worker","Maintenance Tech"];
|
|
const STATES = ["IL","IN","OH","MO","TN","KY","WI","MI"];
|
|
const CITIES: Record<string, string[]> = {
|
|
IL: ["Chicago","Springfield","Rockford","Peoria","Joliet"],
|
|
IN: ["Indianapolis","Fort Wayne","Evansville","South Bend"],
|
|
OH: ["Columbus","Cleveland","Cincinnati","Dayton"],
|
|
MO: ["St. Louis","Kansas City","Springfield"],
|
|
TN: ["Nashville","Memphis"], KY: ["Louisville","Lexington"],
|
|
WI: ["Milwaukee","Madison"], MI: ["Detroit","Grand Rapids"],
|
|
};
|
|
const CLIENTS = ["Midwest Logistics","Precision Mfg","Amazon DSP","CleanSpace","AutoParts Direct","Great Lakes Steel","Heartland Foods","Summit Packaging","Cardinal Health","TechFlow Assembly","River City Plastics","Prairie Wind Energy"];
|
|
const PRIORITIES = ["urgent","high","medium","medium","medium","low"];
|
|
const STARTS = ["5:00 AM","6:00 AM","6:30 AM","7:00 AM","7:30 AM","8:00 AM"];
|
|
const NOTES = [
|
|
"Warehouse expansion — need experienced workers",
|
|
"Peak season surge — client called last night",
|
|
"2nd shift, CNC preferred",
|
|
"Chemical plant — hazmat cert MANDATORY",
|
|
"ISO audit next week — need detail-oriented workers",
|
|
"Structural welding — experienced only",
|
|
"Regular fill — ongoing contract",
|
|
"Client doubled their order",
|
|
"Night shift coverage needed",
|
|
"Replacing 2 no-shows from yesterday",
|
|
];
|
|
|
|
function pick<T>(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; }
|
|
|
|
async function runWeekSimulation() {
|
|
const days = ["Monday","Tuesday","Wednesday","Thursday","Friday"];
|
|
const staffers = ["Sarah (Lead)","Mike (Senior)","Kim (Junior)"];
|
|
const results: any[] = [];
|
|
let totalFilled = 0, totalNeeded = 0, emergencies = 0, handoffs = 0, playbookEntries = 0;
|
|
|
|
for (let d = 0; d < days.length; d++) {
|
|
const dayLabel = days[d];
|
|
const numContracts = 4 + Math.floor(Math.random() * 5); // 4-8 per day
|
|
const contracts: any[] = [];
|
|
const staffer = staffers[d % staffers.length];
|
|
const handoffTo = staffers[(d + 1) % staffers.length];
|
|
|
|
for (let c = 0; c < numContracts; c++) {
|
|
const state = pick(STATES);
|
|
const city = pick(CITIES[state] || [state]);
|
|
const role = pick(ROLES);
|
|
const priority = pick(PRIORITIES) as string;
|
|
const headcount = priority === "urgent" ? 4 + Math.floor(Math.random() * 5) :
|
|
priority === "high" ? 3 + Math.floor(Math.random() * 3) :
|
|
2 + Math.floor(Math.random() * 3);
|
|
const minRel = priority === "urgent" ? 0.6 : priority === "high" ? 0.75 : 0.8;
|
|
const cid = `W${d+1}-${String(c+1).padStart(3,"0")}`;
|
|
|
|
if (priority === "urgent") emergencies++;
|
|
totalNeeded += headcount;
|
|
|
|
// Run hybrid search
|
|
let filled = 0;
|
|
let matches: any[] = [];
|
|
try {
|
|
const filt = `role = '${role}' AND state = '${state}' AND reliability >= ${minRel}`;
|
|
const r = await api("POST", "/vectors/hybrid", {
|
|
question: `Find ${role} workers for ${pick(NOTES)}`,
|
|
index_name: "ethereal_workers_v1",
|
|
sql_filter: filt,
|
|
filter_dataset: "ethereal_workers",
|
|
id_column: "worker_id",
|
|
top_k: headcount + 2,
|
|
generate: false,
|
|
});
|
|
matches = (r.sources || []).slice(0, headcount).map((s: any) => ({
|
|
doc_id: s.doc_id,
|
|
name: s.chunk_text?.split("—")[0]?.trim() || s.doc_id,
|
|
score: s.score,
|
|
}));
|
|
filled = matches.length;
|
|
} catch {}
|
|
totalFilled += Math.min(filled, headcount);
|
|
|
|
contracts.push({
|
|
id: cid, client: pick(CLIENTS), role, state, city,
|
|
headcount, filled: Math.min(filled, headcount), priority,
|
|
start: pick(STARTS), notes: pick(NOTES), matches,
|
|
staffer, handoff_to: d < 4 ? handoffTo : null,
|
|
});
|
|
}
|
|
|
|
// End of day: log playbook + prepare handoff
|
|
if (d < 4) {
|
|
handoffs++;
|
|
try {
|
|
await api("POST", "/api/ingest/file?name=successful_playbooks", null); // just trigger
|
|
} catch {}
|
|
}
|
|
playbookEntries++;
|
|
|
|
results.push({
|
|
label: dayLabel,
|
|
staffer,
|
|
handoff_to: d < 4 ? handoffTo : null,
|
|
contracts,
|
|
filled: contracts.reduce((s: number, c: any) => s + c.filled, 0),
|
|
needed: contracts.reduce((s: number, c: any) => s + c.headcount, 0),
|
|
});
|
|
}
|
|
|
|
const summary = {
|
|
total_contracts: results.reduce((s, d) => s + d.contracts.length, 0),
|
|
total_needed: totalNeeded,
|
|
total_filled: totalFilled,
|
|
fill_pct: Math.round(totalFilled / Math.max(totalNeeded, 1) * 100),
|
|
emergencies,
|
|
handoffs,
|
|
playbook_entries: playbookEntries,
|
|
};
|
|
|
|
// Log the week to playbooks
|
|
try {
|
|
const form = new FormData();
|
|
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","week_simulation: ${summary.total_contracts} contracts over 5 days","hybrid SQL+vector with multi-model routing","${summary.total_filled}/${summary.total_needed} filled (${summary.fill_pct}%)","${summary.emergencies} emergencies, ${summary.handoffs} handoffs"`;
|
|
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
|
|
await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
|
|
} catch {}
|
|
|
|
return { days: results, summary };
|
|
}
|