/** * Lakehouse MCP Server — bridges local LLMs to the data substrate. * * Tools: * - search_workers: hybrid SQL+vector (the core fix) * - query_sql: analytical SQL on any dataset * - match_contract: find workers for a job order * - get_worker: single worker by ID * - rag_question: full RAG pipeline * - log_success: record what worked → playbook DB * - get_playbooks: retrieve past successes * - swap_profile: hot-swap model + data context * - vram_status: GPU introspection */ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { z } from "zod"; import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js"; const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const PORT = parseInt(process.env.MCP_PORT || "3700"); const MODE = process.env.MCP_TRANSPORT || "http"; // "stdio" or "http" // Active trace for the current request — set per-request in the HTTP handler let activeTrace: ReturnType | null = null; async function api(method: string, path: string, body?: any) { const t0 = Date.now(); const resp = await fetch(`${BASE}${path}`, { method, headers: body ? { "Content-Type": "application/json" } : {}, body: body ? JSON.stringify(body) : undefined, }); const text = await resp.text(); const ms = Date.now() - t0; let parsed: any; try { parsed = JSON.parse(text); } catch { parsed = { raw: text, status: resp.status }; } // Trace the call if we have an active trace if (activeTrace) { const isGen = path.includes("/generate"); if (isGen) { logGeneration(activeTrace, `lakehouse${path}`, { model: body?.model || "unknown", prompt: typeof body?.prompt === "string" ? body.prompt.slice(0, 500) : JSON.stringify(body).slice(0, 300), completion: typeof parsed?.text === "string" ? parsed.text.slice(0, 500) : JSON.stringify(parsed).slice(0, 300), duration_ms: ms, tokens_in: parsed?.prompt_eval_count, tokens_out: parsed?.eval_count, }); } else { logSpan(activeTrace, `lakehouse${path}`, body, { rows: parsed?.row_count, sources: parsed?.sources?.length, sql_matches: parsed?.sql_matches, method: parsed?.method, }, ms); } } return parsed; } const server = new McpServer({ name: "lakehouse", version: "1.0.0" }); server.tool( "search_workers", "Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.", { question: z.string().describe("Natural language question about workers"), sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""), dataset: z.string().default("ethereal_workers"), id_column: z.string().default("worker_id"), top_k: z.number().default(5), }, async ({ question, sql_filter, dataset, id_column, top_k }) => { const body: any = { question, index_name: "workers_500k_v1", filter_dataset: dataset, id_column, top_k, generate: true, use_playbook_memory: true, }; if (sql_filter) body.sql_filter = sql_filter; const r = await api("POST", "/vectors/hybrid", body); return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] }; }, ); server.tool( "query_sql", "Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (1K), workers_500k (500K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).", { sql: z.string().describe("SQL query") }, async ({ sql }) => { const r = await api("POST", "/query/sql", { sql }); if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] }; return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] }; }, ); server.tool( "match_contract", "Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.", { role: z.string(), state: z.string(), city: z.string().optional(), min_reliability: z.number().default(0.7), required_certs: z.array(z.string()).default([]), headcount: z.number().default(5), }, async ({ role, state, city, min_reliability, required_certs, headcount }) => { let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`; if (city) filter += ` AND city = '${city}'`; const r = await api("POST", "/vectors/hybrid", { question: `Find the best ${role} workers with relevant skills and certifications`, index_name: "workers_500k_v1", sql_filter: filter, filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: headcount * 2, generate: false, use_playbook_memory: true, }); let matches = r.sources || []; if (required_certs.length > 0) { const req = new Set(required_certs.map((c: string) => c.toLowerCase())); matches = matches.filter((m: any) => { const certs = (m.chunk_text || "").toLowerCase(); return [...req].every(c => certs.includes(c)); }); } return { content: [{ type: "text" as const, text: JSON.stringify({ contract: { role, state, city, min_reliability, required_certs }, matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method, }, null, 2) }] }; }, ); server.tool( "get_worker", "Fetch one worker profile by ID — all fields including scores and comms.", { worker_id: z.number() }, async ({ worker_id }) => { const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` }); if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] }; return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] }; }, ); server.tool( "rag_question", "Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.", { question: z.string(), index: z.string().default("workers_500k_v1"), top_k: z.number().default(5) }, async ({ question, index, top_k }) => { const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k }); return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] }; }, ); server.tool( "log_success", "Record a successful operation to the playbook database. Small models query this later to learn what worked.", { operation: z.string().describe("What was done"), approach: z.string().describe("How it was done"), result: z.string().describe("Outcome"), context: z.string().optional(), }, async ({ operation, approach, result, context }) => { const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`; const form = new FormData(); form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv"); const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form }); return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] }; }, ); server.tool( "get_playbooks", "Retrieve past successful operations. Small models use this to learn what approaches worked.", { keyword: z.string().optional(), limit: z.number().default(10) }, async ({ keyword, limit }) => { let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`; if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`; const r = await api("POST", "/query/sql", { sql }); if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] }; return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] }; }, ); server.tool( "swap_profile", "Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).", { profile_id: z.string() }, async ({ profile_id }) => { const r = await api("POST", `/vectors/profile/${profile_id}/activate`); return { content: [{ type: "text" as const, text: JSON.stringify({ profile: r.profile_id, model: r.ollama_name, indexes: r.indexes_warmed?.length, vectors: r.total_vectors, previous: r.previous_profile, duration: r.duration_secs, }, null, 2) }] }; }, ); server.tool( "vram_status", "GPU VRAM usage + loaded Ollama models. Check before swapping profiles.", {}, async () => { const r = await api("GET", "/ai/vram"); return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] }; }, ); // Resources — these give any MCP client full context about the system server.resource("lakehouse://system", "lakehouse://system", async (uri) => { const health = await api("GET", "/health"); const datasets = await api("GET", "/catalog/datasets") as any[]; const indexes = await api("GET", "/vectors/indexes") as any[]; const vram = await api("GET", "/ai/vram"); const agent = await api("GET", "/vectors/agent/status"); const buckets = await api("GET", "/storage/buckets"); const text = `# Lakehouse System Status ## Health: ${health === "lakehouse ok" ? "OK" : JSON.stringify(health)} ## Datasets (${datasets.length}) ${datasets.map((d: any) => `- ${d.name}: ${d.row_count || "?"} rows`).join("\n")} ## Vector Indexes (${indexes.length}) ${(indexes as any[]).map((i: any) => `- ${i.index_name}: ${i.chunk_count} chunks (${i.vector_backend || "parquet"})`).join("\n")} ## GPU - Used: ${vram?.gpu?.used_mib || "?"}/${vram?.gpu?.total_mib || "?"} MiB - Models loaded: ${(vram?.ollama_loaded || []).map((m: any) => m.name).join(", ") || "none"} ## Autotune Agent - Running: ${agent?.running}, Trials: ${agent?.trials_run}, Promotions: ${agent?.promotions} ## Buckets (${(buckets as any[])?.length || 0}) ${(buckets as any[] || []).map((b: any) => `- ${b.name}: ${b.backend} (${b.reachable ? "reachable" : "DOWN"})`).join("\n")} ## Services - Lakehouse Gateway: :3100 - AI Sidecar: :3200 - Agent Gateway: :3700 - Langfuse: :3001 - MinIO S3: :9000 - Ollama: :11434 ## Available Models - qwen3: 8.2B, 40K context, thinking+tools (best for reasoning) - qwen2.5: 7B, 8K context (best for fast SQL generation) - mistral: 7B, 8K context (general generation) - nomic-embed-text: 137M (embedding, automatic) `; return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] }; }); server.resource("lakehouse://architecture", "lakehouse://architecture", async (uri) => { // Read the PRD directly const prd = await Bun.file("/home/profit/lakehouse/docs/PRD.md").text().catch(() => "PRD not found"); return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: prd }] }; }); server.resource("lakehouse://instructions", "lakehouse://instructions", async (uri) => { const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "Instructions not found"); return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: instructions }] }; }); server.resource("lakehouse://playbooks", "lakehouse://playbooks", async (uri) => { const r = await api("POST", "/query/sql", { sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20" }); const rows = r?.rows || []; const text = rows.length === 0 ? "No playbooks yet. Log successful operations with the log_success tool." : rows.map((p: any) => `## ${p.operation}\n- Approach: ${p.approach}\n- Result: ${p.result}\n- Context: ${p.context || "—"}\n`).join("\n"); return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: `# Successful Playbooks\n\n${text}` }] }; }); server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => { const r = await api("GET", "/catalog/datasets") as any[]; const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n"); return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] }; }); // ─── Dual mode: stdio (Claude Code) or HTTP (internal agents) ─── async function main() { if (MODE === "stdio") { const transport = new StdioServerTransport(); await server.connect(transport); console.error(`Lakehouse MCP (stdio) → ${BASE}`); return; } // HTTP mode — a REST gateway that internal agents call directly. // No MCP protocol complexity for consumers — just POST JSON, get JSON. // The MCP tool definitions above are reused for the stdio path; this // HTTP path wraps the same lakehouse API with agent-friendly routing. Bun.serve({ port: PORT, async fetch(req) { const url = new URL(req.url); const json = async () => req.method === "POST" ? await req.json() : {}; // CORS — dashboard runs in the browser, gateway is a different origin const cors = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type", }; if (req.method === "OPTIONS") return new Response(null, { status: 204, headers: cors }); const ok = (data: any) => Response.json(data, { headers: cors }); const err = (msg: string, status = 400) => Response.json({ error: msg }, { status, headers: cors }); try { // Health — no trace needed if (url.pathname === "/health") return ok({ status: "ok", lakehouse: BASE, tools: 11 }); // Start a Langfuse trace for every non-static request if (req.method === "POST" || !["/","/dashboard","/dashboard.css","/dashboard.ts","/dashboard.js"].includes(url.pathname)) { activeTrace = startTrace(`gw:${url.pathname}`, { method: req.method, path: url.pathname }); } // Self-orientation: any agent calls this first to understand the system if (url.pathname === "/context") { const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => ""); const datasets = await api("GET", "/catalog/datasets") as any[]; const indexes = await api("GET", "/vectors/indexes") as any[]; const vram = await api("GET", "/ai/vram"); return ok({ system: "Lakehouse Staffing Co-Pilot", purpose: "AI anticipates staffing coordinator needs — pre-matches workers to contracts, surfaces alerts, builds playbooks from successful operations", instructions: instructions.slice(0, 3000), datasets: (datasets || []).map((d: any) => ({ name: d.name, rows: d.row_count })), indexes: (indexes || []).map((i: any) => ({ name: i.index_name, chunks: i.chunk_count, backend: i.vector_backend })), models: { qwen3: "8.2B reasoning+tools", qwen2_5: "7B fast SQL", mistral: "7B generation", nomic: "137M embedding" }, vram: vram?.gpu, tools: ["/search","/sql","/match","/worker/:id","/ask","/log","/playbooks","/profile/:id","/vram","/context","/verify"], rules: [ "Never hallucinate — only state facts from tool responses", "SQL for counts/aggregations, hybrid /search for matching", "Log every successful operation to /log", "Check /playbooks before complex tasks", "Verify worker details via /worker/:id before communicating", ], }); } // Verification endpoint — agent can check any claim against SQL if (url.pathname === "/verify") { const b = await json(); // b.claim: "worker 4925 is a Forklift Operator in IL with reliability 0.82" // b.worker_id: 4925 // b.checks: { role: "Forklift Operator", state: "IL", reliability: 0.82 } if (!b.worker_id) return err("worker_id required"); const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${b.worker_id}` }); const worker = r?.rows?.[0]; if (!worker) return ok({ verified: false, reason: `worker ${b.worker_id} not found` }); const checks = b.checks || {}; const failures: string[] = []; for (const [field, expected] of Object.entries(checks)) { const actual = worker[field]; if (actual === undefined) continue; if (typeof expected === "number") { if (Math.abs(Number(actual) - expected) > 0.05) { failures.push(`${field}: claimed=${expected} actual=${actual}`); } } else if (String(actual).toLowerCase() !== String(expected).toLowerCase()) { failures.push(`${field}: claimed=${expected} actual=${actual}`); } } return ok({ verified: failures.length === 0, worker_id: b.worker_id, worker_name: worker.name, failures, actual: worker, }); } // Tool: hybrid search // ─── Client blacklists (feature #2) ─────────────────────────── // Per-client worker exclusion list. A worker blacklisted for // client X is hidden from /search and /match when the caller // passes `client: "X"`. Persisted to local JSON so it survives // Bun restarts. This is a trust-critical feature — if the // system recommends a worker the client already flagged, the // system's credibility is gone. if (url.pathname.startsWith("/clients/") && url.pathname.includes("/blacklist")) { const m = url.pathname.match(/^\/clients\/([^\/]+)\/blacklist\/?(.*)$/); if (m) { const client = decodeURIComponent(m[1]); const suffix = m[2]; // empty, or a worker_id to delete if (req.method === "GET") { const list = await loadClientBlacklist(client); return ok({ client, entries: list }); } if (req.method === "POST" && !suffix) { const b = await json(); if (!b.worker_id) return err("worker_id required", 400); const entry = { worker_id: String(b.worker_id), name: b.name || "", reason: b.reason || "", added_at: new Date().toISOString(), }; const list = await addToClientBlacklist(client, entry); return ok({ client, added: entry, total: list.length }); } if (req.method === "DELETE" && suffix) { const worker_id = decodeURIComponent(suffix); const { removed, total } = await removeFromClientBlacklist(client, worker_id); return ok({ client, removed, total }); } return err(`unsupported method ${req.method} for blacklist`, 405); } } if (url.pathname === "/search") { const b = await json(); // Availability soft-filter: if the caller didn't constrain // availability and isn't explicitly opting out, auto-append // `availability > 0.5`. Recruiters calling this route expect // "available workers" by default; surfacing someone who's on // an active placement breaks trust on the first call. let filter = b.sql_filter as (string | undefined); const optOut = b.include_unavailable === true; if (!optOut && filter && !/availability/i.test(filter)) { filter = `(${filter}) AND CAST(availability AS DOUBLE) > 0.5`; } // Client blacklist filter: if caller passes `client`, exclude // worker_ids that client has flagged. One SQL expression // added, no extra round-trip needed by the caller. if (b.client && filter) { const bl = await loadClientBlacklist(String(b.client)); const ids = bl.map(e => e.worker_id).filter(x => /^\d+$/.test(x)); if (ids.length > 0) { filter = `(${filter}) AND worker_id NOT IN (${ids.join(",")})`; } } const hybridRes = await api("POST", "/vectors/hybrid", { question: b.question, index_name: b.index || "workers_500k_v1", sql_filter: filter, filter_dataset: b.dataset || "ethereal_workers", id_column: b.id_column || "worker_id", top_k: b.top_k || 5, generate: b.generate !== false, use_playbook_memory: b.use_playbook_memory !== false, playbook_memory_k: b.playbook_memory_k ?? 200, }); // Rate enrichment + optional max_pay_rate filter (soft filter, // preserves result shape). Operator can opt out by omitting. if (hybridRes && Array.isArray(hybridRes.sources)) { enrichWithRates(hybridRes.sources); if (typeof b.max_pay_rate === "number" && b.max_pay_rate > 0) { const before = hybridRes.sources.length; hybridRes.sources = hybridRes.sources.filter((s: any) => s.implied_pay_rate <= b.max_pay_rate); (hybridRes as any).pay_rate_filtered_out = before - hybridRes.sources.length; } } return ok(hybridRes); } // Tool: SQL if (url.pathname === "/sql") { const b = await json(); return ok(await api("POST", "/query/sql", { sql: b.sql })); } // Tool: match contract if (url.pathname === "/match") { const b = await json(); let filter = `role = '${b.role}' AND state = '${b.state}' AND reliability >= ${b.min_reliability || 0.7}`; if (b.city) filter += ` AND city = '${b.city}'`; return ok(await api("POST", "/vectors/hybrid", { question: `Best ${b.role} workers with relevant skills`, index_name: b.index || "workers_500k_v1", sql_filter: filter, filter_dataset: b.dataset || "ethereal_workers", id_column: "worker_id", top_k: (b.headcount || 5) * 2, generate: false, use_playbook_memory: true, playbook_memory_k: 200, })); } // Tool: get worker if (url.pathname.startsWith("/worker/")) { const id = url.pathname.split("/")[2]; return ok(await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${id}` })); } // Tool: RAG if (url.pathname === "/ask") { const b = await json(); return ok(await api("POST", "/vectors/rag", { index_name: b.index || "workers_500k_v1", question: b.question, top_k: b.top_k || 5 })); } // Tool: log success. // // BUG FIX 2026-04-20: previously this also POSTed a 1-row CSV to // /ingest/file?name=successful_playbooks. That endpoint REPLACES // the dataset's object list rather than appending — so every /log // call destroyed all prior rows in the SQL-queryable // successful_playbooks table. Chain-of-custody trace caught it: // sp_rows went 33 → 1 in a single /log call. // // Until a proper append endpoint exists (Phase 8 delta write // surface for the SQL table), /log writes ONLY to playbook_memory // (in-memory append-only store, works correctly for boost). The // SQL successful_playbooks table is now treated as derived state // that gets rebuilt explicitly via /vectors/playbook_memory/rebuild // — never written to by the recruiter path. if (url.pathname === "/log") { const b = await json(); // Result format expected: "{filled}/{needed} filled → Name1, Name2, Name3" const result = String(b.result || ""); const arrowIdx = result.indexOf("→"); const namesPart = arrowIdx >= 0 ? result.slice(arrowIdx + 1) : ""; const rawEndorsed = namesPart.split(",").map(s => s.trim()).filter(Boolean); // Parse the contract's (city, state) from operation. Seed is // keyed by (city, state, name) so validation must match those // coordinates, not just the name. const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/); const city = opMatch ? opMatch[1].trim() : ""; const state = opMatch ? opMatch[2].trim() : ""; // Ghost-name guard — /log previously accepted any endorsed // names without verification. Those ghosts landed in // playbook_memory, grew the entry count, but boost silently // never fired because no real worker chunk ever matched the // stored (city, state, name) tuple. Real-test on 2026-04-20 // surfaced this. Validate against workers_500k before seeding. let endorsed: string[] = rawEndorsed; let rejected: string[] = []; if (rawEndorsed.length && city && state) { const quoted = rawEndorsed.map(n => `'${n.replace(/'/g, "''")}'`).join(","); const sql = `SELECT DISTINCT name FROM workers_500k ` + `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' ` + `AND state = '${state.replace(/'/g,"''")}'`; const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any; const found = new Set((vr.rows ?? []).map((r: any) => r.name)); endorsed = rawEndorsed.filter(n => found.has(n)); rejected = rawEndorsed.filter(n => !found.has(n)); } let seeded = 0; let persisted_rows = 0; if (endorsed.length && /fill:.+ in .+,.+/i.test(String(b.operation || ""))) { const canonicalApproach = `${(b.approach || "manual log").split(/[\.\n]/)[0]}`.slice(0, 80); const canonicalContext = `${(b.context || "").split(/[\.\n]/)[0]}`.slice(0, 80); const seedRes = await api("POST", "/vectors/playbook_memory/seed", { operation: b.operation, approach: canonicalApproach, context: canonicalContext, endorsed_names: endorsed, append: true, }).catch(() => null) as any; if (seedRes && seedRes.playbook_id) { seeded = endorsed.length; const pr = await api("POST", "/vectors/playbook_memory/persist_sql", {}).catch(() => null) as any; if (pr && typeof pr.rows_persisted === "number") persisted_rows = pr.rows_persisted; } } return ok({ logged: true, seeded, persisted_to_sql: persisted_rows, rejected_ghost_names: rejected, note: rejected.length ? `${rejected.length} endorsed name(s) not found in workers_500k for ${city}, ${state} — skipped seeding to prevent silent boost failure.` : "successful_playbooks_live is the SQL surface for live operator activity. /log is non-destructive and name-validated.", }); } // Tool: log FAILED fill — negative signal for Phase 19 boost. // Workers named here get a 0.5^n penalty on future positive // boosts in the same (city, state). Three failures effectively // zero the boost; five make the worker invisible to the re-rank. // Names are validated against workers_500k same as /log. if (url.pathname === "/log_failure") { const b = await json(); const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/); const city = opMatch ? opMatch[1].trim() : ""; const state = opMatch ? opMatch[2].trim() : ""; const rawNames: string[] = Array.isArray(b.failed_names) ? b.failed_names : []; if (!city || !state) { return err("operation must be 'fill: Role xN in City, ST'", 400); } if (rawNames.length === 0) return err("failed_names must be a non-empty array", 400); const quoted = rawNames.map((n: string) => `'${n.replace(/'/g, "''")}'`).join(","); const sql = `SELECT DISTINCT name FROM workers_500k ` + `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' ` + `AND state = '${state.replace(/'/g,"''")}'`; const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any; const found = new Set((vr.rows ?? []).map((r: any) => r.name)); const failed_names = rawNames.filter((n: string) => found.has(n)); const rejected = rawNames.filter((n: string) => !found.has(n)); if (failed_names.length === 0) { return ok({ marked: 0, rejected_ghost_names: rejected, note: "no failed_names matched workers_500k for this geo" }); } const mr = await api("POST", "/vectors/playbook_memory/mark_failed", { operation: b.operation, failed_names, reason: b.reason || "", }); return ok({ marked: mr?.added ?? 0, rejected_ghost_names: rejected, city, state, note: `Each marked worker's positive boost in ${city}, ${state} is halved per recorded failure.`, }); } // Tool: get playbooks if (url.pathname === "/playbooks") { const kw = url.searchParams.get("keyword"); const limit = url.searchParams.get("limit") || "10"; let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`; if (kw) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${kw}%' OR approach LIKE '%${kw}%' ORDER BY timestamp DESC LIMIT ${limit}`; const r = await api("POST", "/query/sql", { sql }); return ok(r.error ? { playbooks: [], note: "No playbooks yet" } : { playbooks: r.rows }); } // Tool: swap profile if (url.pathname.startsWith("/profile/")) { const id = url.pathname.split("/")[2]; return ok(await api("POST", `/vectors/profile/${id}/activate`)); } // Tool: VRAM if (url.pathname === "/vram") return ok(await api("GET", "/ai/vram")); // Pass-through to lakehouse for anything else if (url.pathname.startsWith("/api/")) { const path = url.pathname.replace("/api", ""); const body = req.method !== "GET" ? await req.text() : undefined; const r = await fetch(`${BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body }); return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } }); } // Proof — narrative HTML served from mcp-server/proof.html. // Live tests consumed client-side via /proof.json. if (url.pathname === "/proof") { return new Response(Bun.file(import.meta.dir + "/proof.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } // Spec — technical specification / README-equivalent document. // Long-form architecture doc: folder layout, ingest pipeline, // scale story, error surfaces, per-staffer context, a day in // the life. Intended for a skeptical reader who needs to // dispute or reproduce what the system claims to do. if (url.pathname === "/spec") { return new Response(Bun.file(import.meta.dir + "/spec.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } // Onboard — client-facing ingest wizard. Upload any CSV, preview // columns + PII + sample rows, commit via /ingest/file. Works // with a shipped sample roster so anyone can trial the flow // without real client data. if (url.pathname === "/onboard") { return new Response(Bun.file(import.meta.dir + "/onboard.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } // Workspaces — per-contract state (Phase 8.5). UI layer over the // gateway's /workspaces/* routes: list, create, detail, handoff, // save-search, shortlist, log-activity. All persisted on the // Rust side; this page is a pure viewer + editor. if (url.pathname === "/workspaces") { return new Response(Bun.file(import.meta.dir + "/workspaces.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } // Alerts — push/daemon settings page + config API + test-fire. if (url.pathname === "/alerts") { return new Response(Bun.file(import.meta.dir + "/alerts.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } if (url.pathname === "/alerts/config") { if (req.method === "GET") { const cfg = await loadAlertsConfig(); const state = await loadAlertsState(); return ok({ config: cfg, state: { last_run_at: state.last_run_at } }); } if (req.method === "POST") { const b = await json(); const prev = await loadAlertsConfig(); const next: AlertsConfig = { enabled: b.enabled ?? prev.enabled, interval_minutes: Math.max(1, Number(b.interval_minutes ?? prev.interval_minutes)), webhook_url: typeof b.webhook_url === "string" ? b.webhook_url.trim() || undefined : prev.webhook_url, webhook_label: typeof b.webhook_label === "string" ? b.webhook_label : prev.webhook_label, deadline_warn_days: Math.max(1, Number(b.deadline_warn_days ?? prev.deadline_warn_days)), }; await saveAlertsConfig(next); return ok({ saved: true, config: next, note: "Interval change requires server restart to apply. Current running interval unchanged this cycle." }); } } if (url.pathname === "/alerts/fire" && req.method === "POST") { const cfg = await loadAlertsConfig(); const d = await buildDigest(); if (!d) return ok({ fired: false, reason: "no events since last run" }); const res = await dispatchDigest(d, cfg); return ok({ fired: true, channels: res.channels, errors: res.errors, digest: d }); } if (url.pathname === "/alerts/recent" && req.method === "GET") { const f = Bun.file(ALERTS_LOG_PATH); if (!(await f.exists())) return ok({ entries: [] }); const text = await f.text(); const lines = text.split("\n").filter(l => l.trim()); const last = lines.slice(-10).reverse(); const entries: any[] = []; for (const l of last) { try { entries.push(JSON.parse(l)); } catch {} } return ok({ entries }); } // Onboard ingest — forwards multipart/form-data correctly to // the Rust gateway /ingest/file. The generic /api/* passthrough // can't handle multipart because it reads as text and forwards // as JSON, losing the boundary. This route preserves the body // and Content-Type. if (url.pathname === "/onboard/ingest" && req.method === "POST") { const name = url.searchParams.get("name"); if (!name || !/^[a-z][a-z0-9_]*$/.test(name)) { return err("dataset name required (lowercase+underscores)", 400); } const contentType = req.headers.get("content-type") || ""; const upstream = await fetch(`${BASE}/ingest/file?name=${encodeURIComponent(name)}`, { method: "POST", headers: { "Content-Type": contentType }, body: await req.arrayBuffer(), }); const body = await upstream.text(); return new Response(body, { status: upstream.status, headers: { ...cors, "Content-Type": upstream.headers.get("content-type") || "application/json" }, }); } // Sample CSV — generated fresh on every request so content-hash // dedup on the ingest side always sees a new payload (two uploads // in a row would otherwise be a no-op). Each generation has // unique worker_ids (timestamp-prefixed), randomized names + roles // + geos from realistic pools, and a random size (~120-180 rows) // so the demo looks different every time and numbers actually // update visibly in the dashboard after onboarding. if (url.pathname.startsWith("/samples/")) { const name = url.pathname.slice("/samples/".length); if (!/^[a-zA-Z0-9_\-\.]+\.csv$/.test(name)) { return err("invalid sample filename", 400); } if (name === "staffing_roster_sample.csv") { const csv = generateSampleRosterCSV(); return new Response(csv, { headers: { ...cors, "Content-Type": "text/csv", "Content-Disposition": `attachment; filename="${name}"`, "Cache-Control": "no-store", }, }); } // Other sample filenames fall through to the static dir const path = `${import.meta.dir}/samples/${name}`; const file = Bun.file(path); if (!(await file.exists())) return err("sample not found", 404); return new Response(file, { headers: { ...cors, "Content-Type": "text/csv", "Content-Disposition": `attachment; filename="${name}"` }, }); } // System-wide scale summary — truthful numbers for the UI. // Pulls row counts via SQL (COUNT(*) from parquet footers) for // the key datasets rather than trusting catalog manifests, which // can go stale when data changes without re-registering. The // workers_500k manifest is correct (500K); candidates manifest // lied (said 100K, actual 1K) — the audit caught it. // Everything else uses manifest row_count since it's O(1). if (url.pathname === "/system/summary") { const [ds, indexes, workersCount, candsCount] = await Promise.all([ api("GET", "/catalog/datasets").catch(() => [] as any), api("GET", "/vectors/indexes").catch(() => [] as any), api("POST", "/query/sql", { sql: "SELECT COUNT(*) AS c FROM workers_500k" }) .catch(() => null as any), api("POST", "/query/sql", { sql: "SELECT COUNT(*) AS c FROM candidates" }) .catch(() => null as any), ]); const datasets = Array.isArray(ds) ? ds : []; const idxs = Array.isArray(indexes) ? indexes : []; const workers = Number(workersCount?.rows?.[0]?.c ?? 0); const candidates = Number(candsCount?.rows?.[0]?.c ?? 0); // Sum manifest row_counts EXCLUDING workers_500k + candidates, // then add the truthful SQL counts. This gives a total that // reflects live state for the two most-quoted tables. const otherManifest = datasets .filter((d: any) => d?.name !== "workers_500k" && d?.name !== "candidates") .reduce((s: number, d: any) => s + (d?.row_count || 0), 0); const totalRows = otherManifest + workers + candidates; const totalChunks = idxs.reduce((s: number, i: any) => s + (i?.chunk_count || 0), 0); // Manifest drift audit — surface any cases where manifest // disagrees with SQL for the two spot-checked tables so the UI // can note it if ever meaningful. const drift: any[] = []; const workersManifest = datasets.find((d: any) => d?.name === "workers_500k")?.row_count; const candidatesManifest = datasets.find((d: any) => d?.name === "candidates")?.row_count; if (workersManifest !== undefined && workersManifest !== workers) { drift.push({ dataset: "workers_500k", manifest: workersManifest, actual: workers }); } if (candidatesManifest !== undefined && candidatesManifest !== candidates) { drift.push({ dataset: "candidates", manifest: candidatesManifest, actual: candidates }); } return ok({ datasets: datasets.length, total_rows: totalRows, total_chunks: totalChunks, workers_500k_rows: workers, candidates_rows: candidates, indexes: idxs.length, manifest_drift: drift, }); } // Model matrix — read config/models.json and expose read-only. // Strips internal notes that could drift; the source of truth is // the file itself. UI can render tiers, rate budgets, and the // experimental rotation list from this endpoint. if (url.pathname === "/models/matrix") { try { const raw = await Bun.file("../config/models.json").text(); return ok(JSON.parse(raw)); } catch (e) { return new Response(JSON.stringify({ error: `models.json not found: ${(e as Error).message}` }), { status: 404, headers: { "content-type": "application/json" }, }); } } // Proof JSON API (same data, no HTML) if (url.pathname === "/proof.json") { const ds = await api("GET", "/catalog/datasets") as any[]; const indexes = await api("GET", "/vectors/indexes") as any[]; const vram = await api("GET", "/ai/vram"); const totalRows = (ds || []).reduce((s: number, d: any) => s + (d.row_count || 0), 0); const totalChunks = (indexes || []).reduce((s: number, i: any) => s + i.chunk_count, 0); // Run live SQL tests const tests: any[] = []; const sqls = [ ["COUNT 500K workers", "SELECT COUNT(*) FROM workers_500k"], ["COUNT 1M timesheets", "SELECT COUNT(*) FROM timesheets"], ["Filter+aggregate 500K", "SELECT role, COUNT(*) cnt FROM workers_500k WHERE state='IL' AND CAST(reliability AS DOUBLE)>0.8 GROUP BY role ORDER BY cnt DESC LIMIT 3"], ["Cross-table JOIN", "SELECT COUNT(*) FROM candidates c JOIN (SELECT candidate_id, COUNT(*) calls FROM call_log GROUP BY candidate_id HAVING COUNT(*)>=5) cl ON c.candidate_id=cl.candidate_id WHERE c.city='Chicago'"], ]; for (const [name, sql] of sqls) { const t0 = Date.now(); const r = await api("POST", "/query/sql", { sql }); const ms = Date.now() - t0; tests.push({ name, ms, result: r.rows?.[0] || r.error, pass: !r.error }); } // Hybrid test const ht0 = Date.now(); const hybrid = await api("POST", "/vectors/hybrid", { question: "reliable forklift operator", index_name: "workers_500k_v1", sql_filter: "role = 'Forklift Operator' AND state = 'IL' AND CAST(reliability AS DOUBLE) > 0.8", filter_dataset: "workers_500k", id_column: "worker_id", top_k: 5, generate: false, use_playbook_memory: true, }); tests.push({ name: "Hybrid SQL+Vector", ms: Date.now() - ht0, result: `sql=${hybrid.sql_matches} → ${hybrid.vector_reranked} verified results`, pass: (hybrid.vector_reranked || 0) > 0, sources: hybrid.sources?.slice(0, 3), }); return ok({ title: "Lakehouse Proof of Work", generated: new Date().toISOString(), server: "192.168.1.177 (i9 + 128GB RAM + A4000 16GB)", scale: { datasets: ds?.length, total_rows: totalRows, indexes: indexes?.length, total_chunks: totalChunks }, gpu: vram?.gpu, tests, recall: { hnsw: 0.98, lance: 0.94, note: "Measured on 50K real nomic-embed-text embeddings, 30 queries" }, lance_10m: { vectors: 10_000_000, disk_gb: 32.9, search_p50_ms: 5, note: "Past HNSW RAM ceiling" }, verify: "SSH into server, run: curl http://localhost:3100/health — or open http://192.168.1.177:3700/proof", }); } // Dashboard — calls lakehouse /vectors/hybrid directly (no gateway hop) if (url.pathname === "/" || url.pathname === "/dashboard") { return new Response(Bun.file(import.meta.dir + "/search.html"), { headers: { ...cors, "Content-Type": "text/html" }, }); } if (url.pathname === "/dashboard.css") { return new Response(Bun.file(import.meta.dir + "/dashboard.css"), { headers: { "Content-Type": "text/css" } }); } if (url.pathname === "/dashboard.ts" || url.pathname === "/dashboard.js") { // Bun transpiles TS on the fly const built = await Bun.build({ entrypoints: [import.meta.dir + "/dashboard.ts"], target: "browser" }); const js = await built.outputs[0].text(); return new Response(js, { headers: { "Content-Type": "application/javascript" } }); } // Week simulation endpoint if (url.pathname === "/simulation/run" && req.method === "POST") { return ok(await runWeekSimulation()); } // ─── Staffing Intelligence Console ─── if (url.pathname === "/console") { return new Response(Bun.file(import.meta.dir + "/console.html")); } // Intelligence: Market data — public building permits → staffing demand forecast if (url.pathname === "/intelligence/market" && req.method === "POST") { const start = Date.now(); try { // Fetch Chicago building permits (public Socrata API — real data) const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json"; const [bigR, byTypeR, recentR, benchR] = await Promise.all([ // Top 8 largest permits by cost fetch(`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,latitude,longitude&$where=reported_cost>1000000 AND issue_date>'2025-06-01'&$order=reported_cost DESC&$limit=50`).then(r => r.json()), // Permits grouped by work type fetch(`${permitUrl}?$select=work_type,count(*) as cnt,sum(reported_cost) as total_cost&$where=reported_cost>10000 AND issue_date>'2025-06-01'&$group=work_type&$order=total_cost DESC&$limit=10`).then(r => r.json()), // Most recent permits fetch(`${permitUrl}?$select=work_type,work_description,reported_cost,street_name,issue_date&$where=reported_cost>50000&$order=issue_date DESC&$limit=5`).then(r => r.json()), // Our worker bench in IL (cross-reference) api("POST", "/query/sql", { sql: "SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k WHERE state='IL' GROUP BY role ORDER BY supply DESC" }), ]); // Map construction types to staffing roles const typeToRoles: Record = { "Electrical Work": ["Electrician","Maintenance Tech"], "Masonry Work": ["Production Worker","Loader","Material Handler"], "Mechanical Work": ["Maintenance Tech","Machine Operator","Welder"], "Reroofing": ["Production Worker","Loader"], "Plumbing Work": ["Maintenance Tech"], "": ["Forklift Operator","Loader","Material Handler","Production Worker","Warehouse Associate"], }; // Build demand forecast from permit types const forecast: any[] = []; for (const t of (byTypeR || [])) { const wtype = t.work_type || "(general construction)"; const totalCost = parseFloat(t.total_cost || 0); const cnt = parseInt(t.cnt || 0); const estWorkers = Math.round(totalCost / 150000); // industry heuristic const roles = typeToRoles[t.work_type || ""] || typeToRoles[""]; forecast.push({ work_type: wtype, permits: cnt, total_cost: totalCost, estimated_workers: estWorkers, needed_roles: roles }); } // Cross-reference with our bench const ilBench = (benchR.rows || []).reduce((m: any, r: any) => { m[r.role] = r; return m; }, {}); const gaps: any[] = []; for (const f of forecast) { for (const role of f.needed_roles) { const b = ilBench[role]; if (b) { const coverage = Math.round((b.available / Math.max(f.estimated_workers, 1)) * 100); gaps.push({ role, demand: f.estimated_workers, supply: b.supply, available: b.available, reliable: b.reliable, coverage_pct: Math.min(coverage, 999), source: f.work_type }); } } } return ok({ major_permits: (bigR || []).map((p: any) => ({ cost: parseFloat(p.reported_cost || 0), description: (p.work_description || "").substring(0, 100), address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(), type: p.work_type || p.permit_type || "", date: (p.issue_date || "").substring(0, 10), lat: p.latitude, lng: p.longitude, })), by_type: forecast, recent: (recentR || []).map((p: any) => ({ type: p.work_type || "", description: (p.work_description || "").substring(0, 80), cost: parseFloat(p.reported_cost || 0), street: p.street_name || "", date: (p.issue_date || "").substring(0, 10), })), il_bench: benchR.rows || [], gaps, total_construction_value: forecast.reduce((s: number, f: any) => s + f.total_cost, 0), total_estimated_workers: forecast.reduce((s: number, f: any) => s + f.estimated_workers, 0), duration_ms: Date.now() - start, }); } catch (e: any) { return ok({ error: e.message, duration_ms: Date.now() - start }); } } // Predictive staffing forecast — aggregate demand inferred from // recent Chicago permits, compared to our bench supply. Answers // "what's coming in the next 30-60 days and can we cover it?" // — the contextual-awareness dimension beyond retrospective rank. if (url.pathname === "/intelligence/staffing_forecast" && req.method === "POST") { const start = Date.now(); try { const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json"; // Last 30 days of permits — that's our forward demand window const thirtyDaysAgo = new Date(Date.now() - 30 * 86400e3).toISOString().slice(0, 10); const permits: any[] = await fetch( `${permitUrl}?$select=work_type,reported_cost,issue_date` + `&$where=reported_cost>100000 AND issue_date>'${thirtyDaysAgo}'` + `&$limit=200` ).then(r => r.json()).catch(() => []); // Construction heuristic: permit filing → construction start // averages ~45 days. Staffing window opens 14 days before. const typeToRole: Record = { "Electrical Work": "Electrician", "Masonry Work": "Production Worker", "Mechanical Work": "Maintenance Tech", "Reroofing": "Production Worker", "Plumbing Work": "Maintenance Tech", }; // Aggregate demand by role const demandByRole: Record = {}; for (const p of permits) { const role = typeToRole[p.work_type || ""] || "Production Worker"; const cost = parseFloat(p.reported_cost || 0); const workers = Math.max(2, Math.min(Math.round(cost / 150000), 8)); const issueDate = new Date(p.issue_date); const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3); // 45d - 14d window if (!demandByRole[role]) { demandByRole[role] = { permits: 0, total_cost: 0, est_workers: 0, earliest_need: stagingDate.toISOString().slice(0, 10) }; } demandByRole[role].permits += 1; demandByRole[role].total_cost += cost; demandByRole[role].est_workers += workers; const cur = new Date(demandByRole[role].earliest_need); if (stagingDate < cur) demandByRole[role].earliest_need = stagingDate.toISOString().slice(0, 10); } // Bench supply in IL const benchR = await api("POST", "/query/sql", { sql: `SELECT role, COUNT(*) as total, ` + `SUM(CASE WHEN CAST(availability AS DOUBLE) > 0.5 THEN 1 ELSE 0 END) as available, ` + `SUM(CASE WHEN CAST(reliability AS DOUBLE) > 0.8 THEN 1 ELSE 0 END) as reliable ` + `FROM workers_500k WHERE state = 'IL' ` + `GROUP BY role`, }); const bench: Record = {}; for (const r of (benchR.rows || [])) bench[r.role] = r; // Past playbook fill-speed + success signal per role const playbookR = await api("POST", "/query/sql", { sql: `SELECT operation, COUNT(*) as fills ` + `FROM successful_playbooks_live ` + `WHERE operation LIKE '%Chicago, IL%' ` + `GROUP BY operation ORDER BY fills DESC LIMIT 20`, }); const recentChicagoOps = playbookR.rows || []; // Build forecast entries with risk flag const forecast: any[] = []; for (const [role, d] of Object.entries(demandByRole)) { const b = bench[role] || { total: 0, available: 0, reliable: 0 }; const coverage = d.est_workers > 0 ? Math.round((b.available / d.est_workers) * 100) : 999; const reliable_coverage = d.est_workers > 0 ? Math.round((b.reliable / d.est_workers) * 100) : 999; let risk = "ok"; if (coverage < 100) risk = "critical"; else if (coverage < 300) risk = "tight"; else if (reliable_coverage < 200) risk = "watch"; // Days until earliest staffing deadline const days_to_deadline = Math.round((new Date(d.earliest_need).getTime() - Date.now()) / 86400e3); forecast.push({ role, demand_permits: d.permits, demand_workers: d.est_workers, demand_total_cost: d.total_cost, earliest_staffing_deadline: d.earliest_need, days_to_deadline, bench_total: b.total, bench_available: b.available, bench_reliable: b.reliable, coverage_pct: Math.min(coverage, 9999), reliable_coverage_pct: Math.min(reliable_coverage, 9999), risk, }); } forecast.sort((a, b) => { const order: Record = { critical: 0, tight: 1, watch: 2, ok: 3 }; if (order[a.risk] !== order[b.risk]) return order[a.risk] - order[b.risk]; return a.days_to_deadline - b.days_to_deadline; }); return ok({ generated_at: new Date().toISOString(), window_days: 30, permit_count: permits.length, total_cost: permits.reduce((s, p) => s + parseFloat(p.reported_cost || 0), 0), total_estimated_workers: forecast.reduce((s, f) => s + f.demand_workers, 0), critical_roles: forecast.filter(f => f.risk === "critical").length, tight_roles: forecast.filter(f => f.risk === "tight").length, forecast, recent_chicago_operations: recentChicagoOps, duration_ms: Date.now() - start, note: "Demand inferred from Chicago permit filings last 30 days. Construction starts ~45d after permit. Staffing window opens ~14d before construction. Supply = IL bench in workers_500k.", }); } catch (e: any) { return err(`staffing_forecast: ${e.message}`, 500); } } // Intelligence: Chicago permits → assumed staffing contracts with // Phase 19-ranked candidates and Path-2 discovered patterns. Each // card pairs a REAL permit (live from data.cityofchicago.org) with // a PROPOSED fill drawn from our 500K worker bench. Surfaces the // meta-index dimension directly: "what past similar fills had in // common" for this role + geo. if (url.pathname === "/intelligence/permit_contracts" && req.method === "POST") { const start = Date.now(); try { const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json"; // Recent + substantial permits only — skip tiny ones that // don't imply real staffing demand. const permits: any[] = await fetch( `${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date&` + `$where=reported_cost>250000 AND issue_date>'2025-06-01'` + `&$order=issue_date DESC&$limit=6` ).then(r => r.json()).catch(() => []); const typeToRole: Record = { "Electrical Work": "Electrician", "Masonry Work": "Production Worker", "Mechanical Work": "Maintenance Tech", "Reroofing": "Production Worker", "Plumbing Work": "Maintenance Tech", }; const contracts: any[] = []; for (const p of permits) { const cost = parseFloat(p.reported_cost || 0); // Industry heuristic — one worker per $150K of permit value, // capped at 8 per contract for staffing realism. const count = Math.min(Math.max(Math.round(cost / 150000), 2), 8); const role = typeToRole[p.work_type || ""] || "Production Worker"; const city = "Chicago"; const state = "IL"; // Phase 19 ranked candidates. Soft availability filter // auto-applied by /search — this mirrors the real recruiter // query path exactly. k=200 to ensure boost fires across // the full memory surface (the embedding-discrimination // narrowness means under-k silently misses endorsements). const searchRes = await api("POST", "/vectors/hybrid", { index_name: "workers_500k_v1", filter_dataset: "workers_500k", id_column: "worker_id", sql_filter: `role = '${role}' AND state = '${state}' AND city = '${city}' AND CAST(availability AS DOUBLE) > 0.5`, question: `${role} for ${p.work_type || "construction"} in ${city}`, top_k: 5, generate: false, use_playbook_memory: true, playbook_memory_k: 200, }).catch(() => ({ sources: [] as any[] })); // Path 2 — discovered patterns for this role in this city. const patternRes = await api("POST", "/vectors/playbook_memory/patterns", { query: `${role} in ${city}, ${state}`, top_k_playbooks: 25, min_trait_frequency: 0.3, }).catch(() => ({} as any)); // Enrich with implied pay rate before taking the top-5 enrichWithRates(searchRes.sources || []); const contractBillRate = impliedBillRate(role); const sources = (searchRes.sources || []).slice(0, 5).map((s: any) => { const name = String(s.chunk_text || "").split("—")[0]?.trim() || s.doc_id; return { doc_id: s.doc_id, name, score: s.score, playbook_boost: s.playbook_boost || 0, playbook_citations: s.playbook_citations || [], implied_pay_rate: s.implied_pay_rate ?? null, over_bill_rate: (s.implied_pay_rate ?? 0) > contractBillRate, }; }); // Timeline heuristic — permits filed now → construction // starts ~45d later → staffing window opens ~14d before // start. days_to_deadline is negative when we're past the // window (fill urgency is imminent). const issueDate = new Date(p.issue_date || Date.now()); const estStart = new Date(issueDate.getTime() + 45 * 86400e3); const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3); const daysToDeadline = Math.round((stagingDate.getTime() - Date.now()) / 86400e3); let urgency = "scheduled"; if (daysToDeadline < 0) urgency = "overdue"; else if (daysToDeadline <= 7) urgency = "urgent"; else if (daysToDeadline <= 21) urgency = "soon"; else urgency = "scheduled"; contracts.push({ permit: { cost, work_type: p.work_type || "General construction", description: (p.work_description || "").substring(0, 140), address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(), community_area: p.community_area, issue_date: (p.issue_date || "").substring(0, 10), }, implied_bill_rate: contractBillRate, timeline: { estimated_construction_start: estStart.toISOString().slice(0, 10), staffing_window_opens: stagingDate.toISOString().slice(0, 10), days_to_deadline: daysToDeadline, urgency, }, proposed: { role, count, city, state, pool_size: searchRes.sql_matches, candidates: sources, }, discovered_pattern: patternRes.discovered_pattern, pattern_matched: patternRes.matched_playbooks ?? 0, pattern_workers_examined: patternRes.total_workers_examined ?? 0, }); } return ok({ generated_at: new Date().toISOString(), count: contracts.length, contracts, duration_ms: Date.now() - start, note: "Live Chicago permits paired with workers_500k-ranked candidates and playbook_memory discovered patterns. The permit is real public data; the proposed fill is derived per industry heuristic (~$150K → 1 worker).", }); } catch (e: any) { return err(`permit_contracts: ${e.message}`, 500); } } // Removed 2026-04-20: /intelligence/learn was a legacy CSV writer // that destructively re-wrote successful_playbooks. /log and // /log_failure replace it cleanly via /vectors/playbook_memory/seed // and /mark_failed. Keeping the endpoint would only mislead // future callers — dead code rots. // Intelligence: Activity feed — what the system has learned if (url.pathname === "/intelligence/activity" && req.method === "POST") { const start = Date.now(); const [playbooksR, searchCountR, fillCountR, totalR] = await Promise.all([ api("POST", "/query/sql", { sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20" }).catch(() => ({ rows: [] })), api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'search:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })), api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'fill:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })), api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks" }).catch(() => ({ rows: [{ cnt: 0 }] })), ]); // Extract learned patterns — which roles+cities get filled most const patterns: Record = {}; for (const p of (playbooksR.rows || [])) { if (p.operation?.startsWith("fill:") || p.operation?.startsWith("search:")) { const key = p.operation.replace(/^(fill|search): ?/, "").trim(); patterns[key] = (patterns[key] || 0) + 1; } } return ok({ playbooks: playbooksR.rows || [], search_count: searchCountR.rows?.[0]?.cnt || 0, fill_count: fillCountR.rows?.[0]?.cnt || 0, total_operations: totalR.rows?.[0]?.cnt || 0, learned_patterns: Object.entries(patterns).map(([q, c]) => ({ query: q, times: c })).sort((a, b) => b.times - a.times), duration_ms: Date.now() - start, }); } // Intelligence Brief — parallel analytics across 500K profiles if (url.pathname === "/intelligence/brief" && req.method === "POST") { const start = Date.now(); const [poolR, benchR, supplyR, gemsR, risksR, untappedR, archetypeR] = await Promise.all([ api("POST", "/query/sql", { sql: `SELECT COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.9 THEN 1 ELSE 0 END) elite, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN archetype='erratic' THEN 1 ELSE 0 END) erratic, SUM(CASE WHEN archetype='silent' THEN 1 ELSE 0 END) silent_cnt, SUM(CASE WHEN archetype='improving' THEN 1 ELSE 0 END) improving FROM workers_500k` }), api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k GROUP BY state ORDER BY total DESC` }), api("POST", "/query/sql", { sql: `SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY role ORDER BY supply DESC` }), api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, archetype, skills FROM workers_500k WHERE archetype='improving' AND CAST(reliability AS DOUBLE)>0.8 ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT 5` }), api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 5` }), api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(reliability AS DOUBLE),2) rel, skills, archetype FROM workers_500k WHERE CAST(availability AS DOUBLE)>0.8 AND CAST(reliability AS DOUBLE)>0.85 ORDER BY CAST(availability AS DOUBLE) DESC LIMIT 5` }), api("POST", "/query/sql", { sql: `SELECT archetype, COUNT(*) cnt, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY archetype ORDER BY cnt DESC` }), ]); return ok({ pool: poolR.rows?.[0] || {}, bench: benchR.rows || [], supply: supplyR.rows || [], gems: gemsR.rows || [], risks: risksR.rows || [], untapped: untappedR.rows || [], archetypes: archetypeR.rows || [], duration_ms: Date.now() - start, }); } // Intelligence Chat — natural language → routed queries → structured results if (url.pathname === "/intelligence/chat" && req.method === "POST") { const b = await json(); const q = (b.message || "").trim(); const lower = q.toLowerCase(); const start = Date.now(); const queries: string[] = []; // Route 1: "Find someone like [Name]" const likeMatch = q.match(/(?:like|similar to)\s+([A-Z][a-z]+(?:\s+[A-Z]\.?\s*)?(?:[A-Z][a-z]+)?)/i); if (likeMatch) { const name = likeMatch[1].trim(); queries.push(`SQL: Looking up ${name}'s profile`); const profileR = await api("POST", "/query/sql", { sql: `SELECT * FROM workers_500k WHERE name LIKE '%${name.replace(/'/g,"''")}%' LIMIT 1` }); if (profileR.rows?.length) { const worker = profileR.rows[0]; const stateMatch = lower.match(/\b(?:in|from)\s+([A-Z]{2})\b/i) || lower.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i); const stateFilter = stateMatch ? `state = '${stateMatch[1].toUpperCase()}'` : `state != '${worker.state}'`; queries.push(`Vector: Semantic similarity on ${worker.name}'s full profile → ${stateFilter}`); const searchR = await api("POST", "/vectors/hybrid", { question: worker.resume_text || `${worker.role} in ${worker.city} with skills ${worker.skills}`, index_name: "workers_500k_v1", sql_filter: stateFilter + ` AND CAST(reliability AS DOUBLE) >= 0.7`, filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 5, generate: false, }); return ok({ type: "similar", summary: `Found ${(searchR.sources||[]).length} workers similar to ${worker.name}${stateMatch ? ' in '+stateMatch[1].toUpperCase() : ' (other states)'}`, source: { name: worker.name, role: worker.role, city: worker.city, state: worker.state, rel: worker.reliability, skills: worker.skills, archetype: worker.archetype }, results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })), sql_matches: searchR.sql_matches, queries_run: queries, duration_ms: Date.now() - start }); } return ok({ type: "error", summary: `Couldn't find "${name}" in the database. Try a full name.`, queries_run: queries, duration_ms: Date.now() - start }); } // Route 2: "What if we lose" if (/what if|lose|happens if/i.test(lower)) { const roleMatch = lower.match(/(?:lose|lost?)\s+(?:our\s+)?(?:top\s+)?(\d+)?\s*(.+?)(?:\?|$)/i); if (roleMatch) { const count = parseInt(roleMatch[1]) || 5; const subject = roleMatch[2].trim().replace(/\s*workers?\s*$/,'').replace(/s$/,''); queries.push(`SQL: Top ${count} ${subject}s by reliability`); const topR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, skills FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT ${count}` }); if (topR.rows?.length) { const states = [...new Set(topR.rows.map((r:any) => r.state))]; queries.push(`SQL: Bench depth for ${subject}s in ${states.join(', ')}`); const benchR = await api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' AND state IN (${states.map((s:string)=>`'${s}'`).join(',')}) GROUP BY state` }); const totalInRole = (benchR.rows||[]).reduce((s:number,r:any) => s + r.total, 0); const reliableRemaining = (benchR.rows||[]).reduce((s:number,r:any) => s + r.reliable, 0) - topR.rows.length; return ok({ type: "whatif", summary: `Impact: losing top ${topR.rows.length} ${subject} workers`, lost: topR.rows, bench: benchR.rows||[], total_in_role: totalInRole, reliable_remaining: Math.max(0, reliableRemaining), risk_level: reliableRemaining < count * 2 ? "HIGH" : reliableRemaining < count * 5 ? "MEDIUM" : "LOW", queries_run: queries, duration_ms: Date.now() - start }); } return ok({ type: "error", summary: `Couldn't find workers in the "${subject}" role. Try: welder, forklift operator, assembler, etc.`, queries_run: queries, duration_ms: Date.now() - start }); } } // Route 3: "Who could handle" — semantic role discovery if (/could handle|capable of|suitable for|qualified for|try.*for|can do/i.test(lower)) { const roleDesc = q.replace(/^.*?(?:handle|capable of|suitable for|qualified for|try\s+\w+\s+for|can do)\s*/i,'').replace(/\?$/,'').trim(); queries.push(`Vector: Semantic search for "${roleDesc}" — no exact role match needed`); const searchR = await api("POST", "/vectors/hybrid", { question: `Worker experienced in ${roleDesc}, relevant skills and certifications`, index_name: "workers_500k_v1", sql_filter: "CAST(reliability AS DOUBLE) >= 0.75", filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 8, generate: false, }); return ok({ type: "discovery", summary: `${(searchR.sources||[]).length} workers found through semantic skill matching for: "${roleDesc}"`, role_searched: roleDesc, results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })), sql_matches: searchR.sql_matches, note: "None of these workers have this exact role title. They were found because their skills, certifications, and experience are semantically similar. This is talent discovery — finding people for roles that don't exist in your database yet.", queries_run: queries, duration_ms: Date.now() - start }); } // Route 4: "Stop placing" / risk workers if (/stop placing|worst|problem|flag|risk|underperform|fire|let go/i.test(lower)) { queries.push("SQL: erratic/silent workers with reliability < 50%"); const riskR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 10` }); const countR = await api("POST", "/query/sql", { sql: `SELECT COUNT(*) cnt FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5` }); return ok({ type: "risk", summary: `${countR.rows?.[0]?.cnt || 0} workers flagged — showing the 10 lowest performers`, results: riskR.rows||[], total_flagged: countR.rows?.[0]?.cnt || 0, queries_run: queries, duration_ms: Date.now() - start }); } // Route 5: Analytics / counts if (/how many|count|total|percentage|average|breakdown/i.test(lower)) { queries.push("RAG: analytical question → vector retrieval + LLM reasoning"); const ragR = await api("POST", "/vectors/rag", { index_name: "workers_500k_v1", question: q, top_k: 3 }); return ok({ type: "answer", summary: ragR.answer || "Couldn't determine the answer from the data", sources: (ragR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, text: s.chunk_text, score: s.score })), queries_run: queries, duration_ms: Date.now() - start }); } // Default: smart search — extract role, location, availability from natural language { const filters: string[] = ["CAST(reliability AS DOUBLE) >= 0.5"]; const understood: string[] = []; // Extract role keywords const roleKeywords: Record = { "warehouse": "warehouse", "forklift": "forklift", "welder": "weld", "assembler": "assembl", "loader": "loader", "machine operator": "machine operator", "shipping": "shipping", "quality": "quality", "maintenance": "maintenance", "production": "production", "material handler": "material handler", "sanitation": "sanitation", "inventory": "inventory", "line lead": "line lead", "electrician": "electric", "packaging": "packaging", "tool and die": "tool", "logistics": "logistics", "safety": "safety", "cnc": "cnc", }; for (const [kw, sqlPart] of Object.entries(roleKeywords)) { if (lower.includes(kw)) { filters.push(`LOWER(role) LIKE '%${sqlPart}%'`); understood.push(`role: ${kw}`); break; } } // Extract city const cities = ["chicago","springfield","rockford","peoria","joliet","indianapolis","fort wayne", "evansville","south bend","columbus","cleveland","cincinnati","dayton","akron","toledo", "st. louis","st louis","kansas city","nashville","memphis","knoxville","louisville","lexington", "milwaukee","madison","detroit","grand rapids","lansing","des moines","minneapolis","terre haute", "bloomington","decatur","mattoon","galesburg","danville","champaign"]; for (const city of cities) { if (lower.includes(city)) { const sqlCity = city.split(' ').map(w => w[0].toUpperCase() + w.slice(1)).join(' '); filters.push(`city = '${sqlCity}'`); understood.push(`city: ${sqlCity}`); break; } } // Extract state const stateNames: Record = { "illinois":"IL","indiana":"IN","ohio":"OH","missouri":"MO","tennessee":"TN", "kentucky":"KY","wisconsin":"WI","michigan":"MI","iowa":"IA","minnesota":"MN" }; const stateMatch = lower.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i); if (stateMatch && !understood.some(u => u.startsWith('city'))) { filters.push(`state = '${stateMatch[1].toUpperCase()}'`); understood.push(`state: ${stateMatch[1].toUpperCase()}`); } else { for (const [name, abbr] of Object.entries(stateNames)) { if (lower.includes(name)) { filters.push(`state = '${abbr}'`); understood.push(`state: ${abbr}`); break; } } } // Extract availability if (/available|open|ready|today|now|immediate|asap|right away/i.test(lower)) { filters.push("CAST(availability AS DOUBLE) > 0.5"); understood.push("available now"); } // Extract reliability preference if (/reliable|dependable|best|top|trusted|proven/i.test(lower)) { filters[0] = "CAST(reliability AS DOUBLE) >= 0.8"; understood.push("high reliability"); } const filterStr = filters.join(" AND "); queries.push("Smart parse: " + (understood.length ? understood.join(", ") : "general search")); queries.push("SQL filter: " + filterStr); queries.push("Vector: semantic search for best skill match"); // Also run a direct SQL query to get exact counts and zip codes const sqlFields = "name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, skills, certifications, archetype"; const directSql = `SELECT ${sqlFields} FROM workers_500k WHERE ${filterStr} ORDER BY CAST(availability AS DOUBLE) DESC, CAST(reliability AS DOUBLE) DESC LIMIT 10`; // Derive role+geo for the pattern query so the meta-index // surface lines up with what the user actually asked for. const roleForPatterns = understood.find(u => u.startsWith('role:'))?.split(': ')[1] || q; const cityForPatterns = understood.find(u => u.startsWith('city:'))?.split(': ')[1] || 'Chicago'; const stateForPatterns = understood.find(u => u.startsWith('state:'))?.split(': ')[1] || 'IL'; const [searchR, directR, patternR] = await Promise.all([ api("POST", "/vectors/hybrid", { question: q, index_name: "workers_500k_v1", sql_filter: filterStr, filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 8, generate: false, // k=200 to catch compounding — direct measurement shows // boost reliably fires only when ~all memory is scanned // due to the narrow 0.55-0.67 cosine band in the 768d // nomic-embed-text space. Brute force at 200 entries // is sub-ms; no reason to underscan. use_playbook_memory: true, playbook_memory_k: 200, }), api("POST", "/query/sql", { sql: directSql }), api("POST", "/vectors/playbook_memory/patterns", { query: `${roleForPatterns} in ${cityForPatterns}, ${stateForPatterns}`, top_k_playbooks: 25, min_trait_frequency: 0.3, }).catch(() => ({})), ]); // Merge: use SQL results for structured data (zip, avail), vector for ranking const sqlWorkers = directR.rows || []; const vectorWorkers = (searchR.sources || []).map((s: any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text, playbook_boost: s.playbook_boost || 0, playbook_citations: s.playbook_citations || [], })); return ok({ type: "smart_search", summary: `Found ${searchR.sql_matches || 0} workers matching your criteria${understood.length ? ' (' + understood.join(', ') + ')' : ''}`, understood, sql_results: sqlWorkers, vector_results: vectorWorkers, sql_matches: searchR.sql_matches, queries_run: queries, duration_ms: Date.now() - start, // Meta-index signal — what similar past fills had in common. // Non-empty when memory has ≥1 relevant playbook. discovered_pattern: (patternR as any)?.discovered_pattern, pattern_playbooks_matched: (patternR as any)?.matched_playbooks ?? 0, }); } } activeTrace = null; return err("Unknown path. Available: / /health /search /sql /match /worker/:id /ask /log /playbooks /profile/:id /vram /context /verify /simulation/run /console /intelligence/brief /intelligence/chat", 404); } catch (e: any) { if (activeTrace) { scoreTrace(activeTrace, "error", 0, e.message); } activeTrace = null; return err(e.message || String(e), 500); } finally { // Flush traces async — don't block the response flushTraces().catch(() => {}); activeTrace = null; } }, }); console.error(`Lakehouse Agent Gateway :${PORT} → ${BASE}`); } main().catch(console.error); // ─── Week simulation engine ─── const ROLES = ["Forklift Operator","Machine Operator","Assembler","Loader","Quality Tech","Welder","Sanitation Worker","Shipping Clerk","Production Worker","Maintenance Tech"]; const STATES = ["IL","IN","OH","MO","TN","KY","WI","MI"]; const CITIES: Record = { IL: ["Chicago","Springfield","Rockford","Peoria","Joliet"], IN: ["Indianapolis","Fort Wayne","Evansville","South Bend"], OH: ["Columbus","Cleveland","Cincinnati","Dayton"], MO: ["St. Louis","Kansas City","Springfield"], TN: ["Nashville","Memphis"], KY: ["Louisville","Lexington"], WI: ["Milwaukee","Madison"], MI: ["Detroit","Grand Rapids"], }; const CLIENT_PREFIXES = ["Midwest","Great Lakes","Prairie","Heartland","Summit","Valley","Central","Lakeside","Tri-State","Heritage","National","Premier","Metro","Capitol","Crossroads","Keystone","Riverfront","Gateway","Pinnacle","Cornerstone"]; const CLIENT_SUFFIXES = ["Logistics","Manufacturing","Assembly","Foods","Steel","Packaging","Health","Plastics","Energy","Solutions","Distribution","Services","Industries","Supply","Warehousing","Materials","Products","Corp","Group","Enterprises"]; function makeClient(): string { return pick(CLIENT_PREFIXES) + " " + pick(CLIENT_SUFFIXES); } const STARTS = ["5:00 AM","6:00 AM","6:30 AM","7:00 AM","7:30 AM","8:00 AM"]; // Diverse scenarios — each tells a different story about WHY this contract exists const SCENARIOS = [ // URGENT — real emergencies that need immediate action { priority: "urgent", weight: 8, note: "Worker walked off the job at 3 PM yesterday — client needs replacement by morning", situation: "walkoff", action: "Replacement needed ASAP — previous worker quit mid-shift" }, { priority: "urgent", weight: 5, note: "Client emailed at 11 PM — their regular crew has COVID exposure, entire team quarantined", situation: "quarantine", action: "Full crew replacement — health emergency at job site" }, { priority: "urgent", weight: 5, note: "2 no-shows this morning — client is short-staffed on the floor right now", situation: "noshow", action: "Immediate backfill — client waiting on the phone" }, // HIGH — important but not crisis { priority: "high", weight: 10, note: "New contract starting Monday — client wants to meet workers this week", situation: "new_client", action: "New client onboarding — first impression matters" }, { priority: "high", weight: 8, note: "Client expanding to 2nd shift — need additional crew by next week", situation: "expansion", action: "Growth opportunity — client adding a shift" }, { priority: "high", weight: 6, note: "Worker's OSHA certification expires Friday — need certified replacement lined up", situation: "cert_expiry", action: "Cert compliance — current worker can't continue without renewal" }, { priority: "high", weight: 5, note: "Client requested specific workers back from last month's project", situation: "client_request", action: "Client relationship — they asked for specific people" }, // MEDIUM — standard day-to-day operations { priority: "medium", weight: 15, note: "Ongoing weekly fill — same client, same role, reliable pipeline", situation: "recurring", action: "Recurring contract — steady work" }, { priority: "medium", weight: 12, note: "Seasonal uptick — warehouse volume increasing ahead of holidays", situation: "seasonal", action: "Seasonal planning — volume ramping up" }, { priority: "medium", weight: 10, note: "Backfill for worker on approved medical leave — returns in 3 weeks", situation: "medical_leave", action: "Temporary coverage — worker returning soon" }, { priority: "medium", weight: 8, note: "Client testing new role — wants to try 2 workers for a week before committing", situation: "trial", action: "Trial placement — client evaluating the role" }, { priority: "medium", weight: 6, note: "Cross-training opportunity — client wants workers who can learn a new skill", situation: "cross_train", action: "Development opportunity — workers can learn new skills" }, // LOW — planning ahead { priority: "low", weight: 10, note: "Future fill — project starts in 2 weeks, gathering candidates now", situation: "future", action: "Pipeline building — no rush, quality over speed" }, { priority: "low", weight: 8, note: "Client exploring staffing options — not committed yet, just want to see who's available", situation: "exploratory", action: "Exploratory — client shopping, impress them with quality" }, { priority: "low", weight: 5, note: "Internal transfer — moving a worker from one site to another, need replacement at original", situation: "transfer", action: "Planned transition — smooth handoff between sites" }, ]; function pick(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; } // ─── Client-blacklist persistence (feature #2) ────────────────────────── // Simple JSON file under mcp-server/data/. Synchronous writes are fine // at the expected rate (a handful of blacklist adds per day). const BLACKLIST_PATH = `${import.meta.dir}/data/client_blacklists.json`; interface BlacklistEntry { worker_id: string; name: string; reason: string; added_at: string; } async function loadAllBlacklists(): Promise> { try { const f = Bun.file(BLACKLIST_PATH); if (!(await f.exists())) return {}; return await f.json() as Record; } catch { return {}; } } async function saveAllBlacklists(all: Record): Promise { await Bun.write(BLACKLIST_PATH, JSON.stringify(all, null, 2)); } async function loadClientBlacklist(client: string): Promise { const all = await loadAllBlacklists(); return all[client] || []; } async function addToClientBlacklist(client: string, entry: BlacklistEntry): Promise { const all = await loadAllBlacklists(); const list = all[client] || []; // De-dupe: same worker_id replaces prior entry with fresher reason. const filtered = list.filter(e => e.worker_id !== entry.worker_id); filtered.push(entry); all[client] = filtered; await saveAllBlacklists(all); return filtered; } async function removeFromClientBlacklist(client: string, worker_id: string): Promise<{ removed: boolean; total: number }> { const all = await loadAllBlacklists(); const list = all[client] || []; const filtered = list.filter(e => e.worker_id !== worker_id); const removed = filtered.length < list.length; all[client] = filtered; await saveAllBlacklists(all); return { removed, total: filtered.length }; } // ─── Push daemon (alerts) ─────────────────────────────────────────────── // Background interval that detects notification-worthy events, assembles // a digest, and dispatches to configured channels. Converts the app from // "dashboard you visit" to "system that finds you" — essential for the // phone-first shop that won't remember to open a URL. const ALERTS_CFG_PATH = `${import.meta.dir}/data/notification_config.json`; const ALERTS_STATE_PATH = `${import.meta.dir}/data/notification_state.json`; const ALERTS_LOG_PATH = `${import.meta.dir}/data/notifications.jsonl`; interface AlertsConfig { enabled: boolean; interval_minutes: number; webhook_url?: string; webhook_label?: string; deadline_warn_days: number; } interface AlertsState { last_run_at?: string; last_forecast_by_role?: Record; last_playbook_entries?: number; last_digest?: any; } async function loadAlertsConfig(): Promise { const f = Bun.file(ALERTS_CFG_PATH); if (!(await f.exists())) { return { enabled: true, interval_minutes: 15, deadline_warn_days: 7 }; } try { return await f.json() as AlertsConfig; } catch { return { enabled: true, interval_minutes: 15, deadline_warn_days: 7 }; } } async function saveAlertsConfig(c: AlertsConfig): Promise { await Bun.write(ALERTS_CFG_PATH, JSON.stringify(c, null, 2)); } async function loadAlertsState(): Promise { const f = Bun.file(ALERTS_STATE_PATH); if (!(await f.exists())) return {}; try { return await f.json() as AlertsState; } catch { return {}; } } async function saveAlertsState(s: AlertsState): Promise { await Bun.write(ALERTS_STATE_PATH, JSON.stringify(s, null, 2)); } // Build a digest by diffing current state against last-observed state. // Returns null if there's nothing worth sending. async function buildDigest(): Promise { const cfg = await loadAlertsConfig(); const state = await loadAlertsState(); // Pull current snapshots in parallel. /intelligence/staffing_forecast // is a BUN route (our localhost), not on the Rust gateway — reach it // via in-process fetch. /vectors/playbook_memory/stats is on the // gateway and gets there via api(). const bunPort = process.env.PORT || "3700"; const [forecast, memStats] = await Promise.all([ fetch(`http://localhost:${bunPort}/intelligence/staffing_forecast`, { method: "POST", headers: { "Content-Type": "application/json" }, body: "{}" }).then(r => r.json()).catch(() => null as any), api("GET", "/vectors/playbook_memory/stats").catch(() => null as any), ]); const events: any[] = []; // Event: role risk status changed (new critical/tight) const currentByRole: Record = {}; const priorByRole = state.last_forecast_by_role || {}; if (forecast && Array.isArray(forecast.forecast)) { for (const f of forecast.forecast) { currentByRole[f.role] = { risk: f.risk, coverage_pct: f.coverage_pct, earliest_staffing_deadline: f.earliest_staffing_deadline, }; const prior = priorByRole[f.role]; const rank: Record = { ok: 0, watch: 1, tight: 2, critical: 3 }; if (!prior || (rank[f.risk] ?? 0) > (rank[prior.risk] ?? 0)) { // Risk got worse (or new role we haven't seen) if (f.risk === "critical" || f.risk === "tight") { events.push({ kind: "risk_escalation", role: f.role, risk: f.risk, coverage_pct: f.coverage_pct, demand: f.demand_workers, available: f.bench_available, prior_risk: prior?.risk ?? null, }); } } // Event: staffing deadline within N days that wasn't there before const d = f.days_to_deadline; if (d !== undefined && d >= 0 && d <= cfg.deadline_warn_days) { const priorD = prior?.earliest_staffing_deadline; if (priorD !== f.earliest_staffing_deadline) { events.push({ kind: "deadline_approaching", role: f.role, days_to_deadline: d, date: f.earliest_staffing_deadline, demand: f.demand_workers, }); } } } } // Event: playbook memory grew significantly since last check const nowEntries = memStats?.entries ?? 0; const priorEntries = state.last_playbook_entries ?? 0; const grewBy = nowEntries - priorEntries; if (grewBy >= 5) { events.push({ kind: "memory_growth", new_entries: grewBy, total_entries: nowEntries, total_endorsed_names: memStats?.total_names_endorsed ?? 0, }); } // Only return a digest if there's something to say. First-ever run is // a special case: surface the snapshot as a "welcome" digest. const isFirstRun = !state.last_run_at; if (events.length === 0 && !isFirstRun) return null; const digest = { generated_at: new Date().toISOString(), is_first_run: isFirstRun, events, snapshot: { forecast_roles: Object.keys(currentByRole).length, critical: forecast?.critical_roles ?? 0, tight: forecast?.tight_roles ?? 0, playbook_entries: nowEntries, permits_30d: forecast?.permit_count ?? 0, construction_pipeline_usd: forecast?.total_cost ?? 0, }, }; // Persist the updated state for next diff await saveAlertsState({ last_run_at: digest.generated_at, last_forecast_by_role: currentByRole, last_playbook_entries: nowEntries, last_digest: digest, }); return digest; } function formatDigestText(d: any): string { const lines: string[] = []; lines.push(`LAKEHOUSE DIGEST — ${d.generated_at.slice(0, 16).replace("T", " ")}`); lines.push(""); if (d.is_first_run) { lines.push(`[initial snapshot] · ${d.snapshot.forecast_roles} roles tracked · ` + `${d.snapshot.playbook_entries} playbooks in memory · ` + `${d.snapshot.permits_30d} permits last 30d`); lines.push(""); } const risk = d.events.filter((e: any) => e.kind === "risk_escalation"); if (risk.length) { lines.push(`${risk.length} role${risk.length !== 1 ? "s" : ""} escalated to ${risk.map((r: any) => r.risk).filter((v: string, i: number, a: string[]) => a.indexOf(v) === i).join("/")}:`); for (const e of risk.slice(0, 5)) { lines.push(` • ${e.role} — coverage ${e.coverage_pct}% (${e.available}/${e.demand})${e.prior_risk ? ` · was ${e.prior_risk}` : " · new"}`); } lines.push(""); } const dead = d.events.filter((e: any) => e.kind === "deadline_approaching"); if (dead.length) { lines.push(`${dead.length} staffing deadline${dead.length !== 1 ? "s" : ""} within window:`); for (const e of dead.slice(0, 5)) { lines.push(` • ${e.role} — ${e.days_to_deadline}d to ${e.date} · demand ${e.demand}`); } lines.push(""); } const mem = d.events.filter((e: any) => e.kind === "memory_growth"); for (const e of mem) { lines.push(`+${e.new_entries} new playbooks (total ${e.total_entries}, ${e.total_endorsed_names} endorsed names)`); } lines.push(`snapshot: ${d.snapshot.critical} critical · ${d.snapshot.tight} tight · ` + `$${(d.snapshot.construction_pipeline_usd || 0).toLocaleString("en-US", { maximumFractionDigits: 0 })} pipeline`); return lines.join("\n"); } async function dispatchDigest(d: any, cfg: AlertsConfig): Promise<{ channels: string[]; errors: string[] }> { const channels: string[] = []; const errors: string[] = []; const text = formatDigestText(d); // Channel 1: console console.log(`[alerts] ${text.split("\n").join(" | ")}`); channels.push("console"); // Channel 2: JSONL file (always-on audit) try { await Bun.write(ALERTS_LOG_PATH, (await Bun.file(ALERTS_LOG_PATH).exists() ? await Bun.file(ALERTS_LOG_PATH).text() : "") + JSON.stringify({ at: d.generated_at, text, digest: d }) + "\n" ); channels.push("file"); } catch (e: any) { errors.push(`file: ${e.message}`); } // Channel 3: webhook (opt-in) if (cfg.webhook_url) { try { const r = await fetch(cfg.webhook_url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ text, digest: d }), }); if (r.ok) channels.push("webhook"); else errors.push(`webhook ${r.status}: ${(await r.text()).slice(0, 200)}`); } catch (e: any) { errors.push(`webhook: ${e.message}`); } } return { channels, errors }; } // Background daemon — kicked off once on module init. Guard via a // globalThis sentinel so the startAlertsDaemon() call from near the // top of the file (before this block evaluates) doesn't hit a temporal // dead zone on a let/const binding. async function startAlertsDaemon() { const g = globalThis as any; if (g.__lakehouse_alerts_armed) return; g.__lakehouse_alerts_armed = true; const cfg = await loadAlertsConfig(); if (!cfg.enabled) { console.log("[alerts] daemon disabled via config"); return; } const ms = Math.max(60, cfg.interval_minutes * 60) * 1000; console.log(`[alerts] daemon armed · interval ${cfg.interval_minutes}min · webhook ${cfg.webhook_url ? "configured" : "disabled"}`); // Fire once shortly after startup, then on interval. setTimeout(runAlertsOnce, 10_000); setInterval(runAlertsOnce, ms); } async function runAlertsOnce() { try { const cfg = await loadAlertsConfig(); if (!cfg.enabled) return; const d = await buildDigest(); if (!d) return; await dispatchDigest(d, cfg); } catch (e: any) { console.error(`[alerts] cycle error: ${e.message}`); } } // Seed playbook_memory from a filled contract so the next hybrid query // ranks against it. Used by both runWeekSimulation (per-day) and the /log // endpoint (per manual logging). Fail-soft — seeding is best-effort. // ─── Sample CSV generator ─────────────────────────────────────────────── // Fresh randomized staffing roster per request. Prevents the "upload // same file twice and it's a no-op" problem from the static sample, // and makes the dashboard numbers visibly update after onboarding. const SAMPLE_FIRST_NAMES = [ "Sarah","Michael","Maria","David","Jennifer","Robert","Amanda","Carlos", "Kim","James","Priya","Thomas","Lisa","Brandon","Emily","Marcus","Anita", "Dmitri","Rachel","Samuel","Jordan","Natalia","Henry","Ava","Tyler", "Hannah","Luis","Aisha","Victor","Monica","Derek","Yuki","Fatima","Kwame", "Isabel","Rafael","Elena","Hiroshi","Nadia","Oscar","Sofia","Anders", "Leila","Jamal","Chioma","Pavel","Bianca","Tariq","Inez","Reuben","Mira", ]; const SAMPLE_LAST_NAMES = [ "Johnson","Chen","Rodriguez","Park","Lopez","Williams","Taylor","Mendoza", "Nguyen","O'Brien","Patel","Anderson","Nakamura","Moore","Zhang","Brooks", "Volkov","Kim","Thompson","Martinez","Soto","Robinson","Clark","Hayes", "Reyes","Brown","Wright","Diaz","Powell","Green","Castillo","Iwu", "Kowalski","Lindström","Oyelaran","Saitō","Abebe","Mehta","Blanchard", ]; const SAMPLE_ROLES = [ "Forklift Operator","Welder","Warehouse Associate","Machine Operator", "Loader","Maintenance Tech","Quality Tech","Electrician","Line Lead", "Material Handler","Production Worker","Assembler","Shipping Clerk", ]; const SAMPLE_CITY_STATE: Array<[string, string]> = [ ["Chicago","IL"],["Springfield","IL"],["Rockford","IL"],["Peoria","IL"], ["Indianapolis","IN"],["Fort Wayne","IN"],["Evansville","IN"],["South Bend","IN"], ["Columbus","OH"],["Cleveland","OH"],["Cincinnati","OH"],["Toledo","OH"], ["St. Louis","MO"],["Kansas City","MO"],["Springfield","MO"], ["Nashville","TN"],["Memphis","TN"],["Knoxville","TN"], ["Louisville","KY"],["Lexington","KY"], ["Milwaukee","WI"],["Madison","WI"],["Green Bay","WI"], ["Detroit","MI"],["Grand Rapids","MI"],["Lansing","MI"], ]; const SAMPLE_SKILL_POOLS: Record = { "Forklift Operator": ["pallet jack","hazmat","loading dock","overhead crane","cold storage","shipping","team lead"], "Welder": ["TIG","MIG","pipe welding","blueprint reading","grinder","confined space"], "Warehouse Associate": ["inventory","RF scanner","pick-to-light","Excel","packaging","team lead"], "Machine Operator": ["CNC","SPC","gauge R&R","lean manufacturing","conveyor ops","first article"], "Loader": ["loading dock","team lead","cold storage","first aid","bilingual"], "Maintenance Tech": ["electrical","PLC","hydraulics","CMMS","LOTO","troubleshooting"], "Quality Tech": ["ISO 9001","calibration","root cause analysis","SPC","Six Sigma"], "Electrician": ["conduit","motor controls","troubleshooting","PLC","NEC"], "Line Lead": ["team lead","training","SPC","scheduling"], "Material Handler": ["RF scanner","pallet jack","receiving","packaging"], "Production Worker": ["line work","first article","labeling","packaging","quality inspection"], "Assembler": ["assembly","gauge R&R","line lead","first article"], "Shipping Clerk": ["shipping","receiving","RF scanner","bilingual"], }; const SAMPLE_CERT_POOL = ["OSHA-10","OSHA-30","Forklift","Hazmat","First Aid","LOTO","Confined Space","AWS D1.1","ServSafe","Six Sigma Green"]; const SAMPLE_ARCHETYPES = ["reliable","specialist","leader","communicator","flexible"]; function pick(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; } function pickN(arr: T[], n: number): T[] { const copy = arr.slice(); const out: T[] = []; for (let i = 0; i < n && copy.length > 0; i++) { out.push(copy.splice(Math.floor(Math.random() * copy.length), 1)[0]); } return out; } function csvEscape(s: string): string { if (s.indexOf(",") >= 0 || s.indexOf('"') >= 0 || s.indexOf("\n") >= 0) { return `"${s.replace(/"/g, '""')}"`; } return s; } function generateSampleRosterCSV(): string { const count = 120 + Math.floor(Math.random() * 61); // 120-180 const ts = Date.now(); const lines: string[] = [ "worker_id,name,role,city,state,email,phone,skills,certifications,availability,reliability,archetype", ]; for (let i = 0; i < count; i++) { const first = pick(SAMPLE_FIRST_NAMES); const last = pick(SAMPLE_LAST_NAMES); const name = `${first} ${last}`; const role = pick(SAMPLE_ROLES); const [city, state] = pick(SAMPLE_CITY_STATE); const handle = `${first}.${last}`.toLowerCase().replace(/[^a-z\.]/g, ""); const email = `${handle}${Math.floor(Math.random() * 1000)}@example.com`; const area = ["312","773","630","708","331","815","217","219","260","614","216","513","419","314","816","615","901","502","414","608","313","616"][Math.floor(Math.random() * 22)]; const phone = `(${area}) 555-${String(1000 + Math.floor(Math.random() * 9000))}`; const skillPool = SAMPLE_SKILL_POOLS[role] || ["general"]; const skills = pickN(skillPool, 2 + Math.floor(Math.random() * 3)).join("|"); const certs = pickN(SAMPLE_CERT_POOL, 1 + Math.floor(Math.random() * 3)).join("|"); const availability = (0.3 + Math.random() * 0.69).toFixed(2); const reliability = (0.55 + Math.random() * 0.44).toFixed(2); const archetype = pick(SAMPLE_ARCHETYPES); lines.push([ `W-${ts}-${String(i).padStart(4, "0")}`, csvEscape(name), csvEscape(role), csvEscape(city), state, email, phone, csvEscape(skills), csvEscape(certs), availability, reliability, archetype, ].join(",")); } return lines.join("\n") + "\n"; } // ─── Rate/margin awareness ────────────────────────────────────────────── // Derive implied pay and bill rates per worker / per contract without // schema changes. Numbers are industry heuristics — a real deployment // would replace these with the client's actual ATS pay_rate column and // contract bill_rate. The shape stays the same; only the source changes. const ROLE_BASE_PAY_RATE: Record = { "Electrician": 28, "Welder": 26, "Machine Operator": 24, "Maintenance Tech": 26, "Forklift Operator": 20, "Loader": 17, "Warehouse Associate": 17, "Material Handler": 18, "Production Worker": 18, "Quality Tech": 23, "Line Lead": 22, "Assembler": 18, "Shipping Clerk": 19, }; const DEFAULT_BASE_PAY = 19; // Staffing firm typically marks up pay to bill by 35-45% to cover // overhead, insurance, and margin. Using 40% as the midpoint. const BILL_MARKUP = 1.4; function impliedPayRate(w: { role?: string | null; reliability?: number | string | null; archetype?: string | null }): number { const role = w.role || ""; const base = ROLE_BASE_PAY_RATE[role] ?? DEFAULT_BASE_PAY; const rel = typeof w.reliability === "string" ? parseFloat(w.reliability) : (w.reliability ?? 0.5); const relBump = (isFinite(rel) ? rel : 0.5) * 4; const arch = (w.archetype || "").toLowerCase(); const archBump = arch === "specialist" ? 4 : arch === "leader" ? 3 : arch === "reliable" ? 1 : 0; return Math.round((base + relBump + archBump) * 100) / 100; } function impliedBillRate(role: string | null | undefined): number { const base = ROLE_BASE_PAY_RATE[role || ""] ?? DEFAULT_BASE_PAY; // Contract bill rate = base pay × markup. This is what a staffing firm // would typically quote for this role — the worker's rate has to be // below this to keep margin. return Math.round((base * BILL_MARKUP) * 100) / 100; } // Parse a worker's role / reliability / archetype from a vector chunk // shaped like "Name — Role in City, ST. Skills: ... . Certs: ... . // Archetype: reliable. Reliability: 0.93, Availability: 0.73" function parseWorkerChunk(chunk: string): { role?: string; reliability?: number; archetype?: string } { if (!chunk) return {}; const out: any = {}; const roleMatch = chunk.match(/—\s*([^\.]+?)\s+in\s+/); if (roleMatch) out.role = roleMatch[1].trim(); const relMatch = chunk.match(/Reliability:\s*([\d\.]+)/i); if (relMatch) out.reliability = parseFloat(relMatch[1]); const archMatch = chunk.match(/Archetype:\s*([A-Za-z]+)/i); if (archMatch) out.archetype = archMatch[1]; return out; } // Attach implied_pay_rate to each hybrid source in place, using either // the row's native fields (from sql_results) or parsed from chunk_text. function enrichWithRates(sources: any[]): void { for (const s of sources || []) { const parsed = parseWorkerChunk(s.chunk_text || ""); const w = { role: s.role ?? parsed.role, reliability: s.reliability ?? s.rel ?? parsed.reliability, archetype: s.archetype ?? s.arch ?? parsed.archetype, }; s.implied_pay_rate = impliedPayRate(w); } } async function seedPlaybookFromContract(c: any) { const names = (c.matches || []).slice(0, 5) .map((m: any) => m.name || m.doc_id) .filter((n: string) => n && !n.startsWith("W500-")); if (!names.length) return; const op = `fill: ${c.role} x${c.headcount} in ${c.city}, ${c.state}`; try { await api("POST", "/vectors/playbook_memory/seed", { operation: op, approach: `${c.situation || c.priority || "fill"} → hybrid search`, context: `client=${c.client || ""} start=${c.start || ""}`, endorsed_names: names, append: true, }); } catch {} } async function runWeekSimulation() { const days = ["Monday","Tuesday","Wednesday","Thursday","Friday"]; const staffers = ["Sarah (Lead)","Mike (Senior)","Kim (Junior)"]; const results: any[] = []; let totalFilled = 0, totalNeeded = 0, emergencies = 0, handoffs = 0, playbookEntries = 0; for (let d = 0; d < days.length; d++) { const dayLabel = days[d]; const numContracts = 4 + Math.floor(Math.random() * 5); // 4-8 per day const contracts: any[] = []; const staffer = staffers[d % staffers.length]; const handoffTo = staffers[(d + 1) % staffers.length]; for (let c = 0; c < numContracts; c++) { const state = pick(STATES); const city = pick(CITIES[state] || [state]); const role = pick(ROLES); // Weighted scenario selection const totalWeight = SCENARIOS.reduce((s, sc) => s + sc.weight, 0); let r = Math.random() * totalWeight; let scenario = SCENARIOS[0]; for (const sc of SCENARIOS) { r -= sc.weight; if (r <= 0) { scenario = sc; break; } } const priority = scenario.priority; const headcount = priority === "urgent" ? 3 + Math.floor(Math.random() * 4) : priority === "high" ? 2 + Math.floor(Math.random() * 3) : priority === "medium" ? 2 + Math.floor(Math.random() * 3) : 1 + Math.floor(Math.random() * 2); const minRel = priority === "urgent" ? 0.6 : priority === "high" ? 0.75 : 0.8; const cid = `W${d+1}-${String(c+1).padStart(3,"0")}`; if (priority === "urgent") emergencies++; totalNeeded += headcount; // Run hybrid search — Phase 19: boost on so past playbooks shape ranking let filled = 0; let matches: any[] = []; try { const filt = `role = '${role}' AND state = '${state}' AND reliability >= ${minRel}`; const r = await api("POST", "/vectors/hybrid", { question: `Find ${role} workers in ${city}, ${state} for ${scenario.situation}`, index_name: "workers_500k_v1", sql_filter: filt, filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: headcount + 2, generate: false, use_playbook_memory: true, }); matches = (r.sources || []).slice(0, headcount).map((s: any) => ({ doc_id: s.doc_id, name: s.chunk_text?.split("—")[0]?.trim() || s.doc_id, score: s.score, chunk_text: s.chunk_text || "", playbook_boost: s.playbook_boost || 0, playbook_citations: s.playbook_citations || [], })); filled = matches.length; } catch {} totalFilled += Math.min(filled, headcount); contracts.push({ id: cid, client: makeClient(), role, state, city, headcount, filled: Math.min(filled, headcount), priority, start: pick(STARTS), notes: scenario.note, situation: scenario.situation, action: scenario.action, matches, staffer, handoff_to: d < 4 ? handoffTo : null, }); } // End of day: seed playbook_memory with TODAY's filled contracts so // tomorrow's hybrid search ranks against them. This is the in-week // feedback loop — without this, day 5 doesn't benefit from day 1. for (const c of contracts) { if (c.matches && c.matches.length) { await seedPlaybookFromContract(c).catch(() => {}); } } if (d < 4) { handoffs++; try { await api("POST", "/api/ingest/file?name=successful_playbooks", null); // just trigger } catch {} } playbookEntries++; results.push({ label: dayLabel, staffer, handoff_to: d < 4 ? handoffTo : null, contracts, filled: contracts.reduce((s: number, c: any) => s + c.filled, 0), needed: contracts.reduce((s: number, c: any) => s + c.headcount, 0), }); } const summary = { total_contracts: results.reduce((s, d) => s + d.contracts.length, 0), total_needed: totalNeeded, total_filled: totalFilled, fill_pct: Math.round(totalFilled / Math.max(totalNeeded, 1) * 100), emergencies, handoffs, playbook_entries: playbookEntries, }; // BUG FIX 2026-04-20: previously this POSTed a multi-row CSV to // /ingest/file?name=successful_playbooks at end of every simulation. // That endpoint REPLACES the dataset's object list — so each // /simulation/run wiped the prior simulation's rows. The SQL // successful_playbooks table was never accumulating; it always reflected // only the most-recent simulation batch. // // Per-day per-contract seeding via /vectors/playbook_memory/seed // (added Pass 1, runs inside the day loop above) is the path that // actually accumulates feedback. The SQL successful_playbooks table is // intentionally not written by /simulation/run anymore until a proper // append surface exists. return { days: results, summary }; } // Kick off the push/alerts daemon once per process. Placed at the END of // the module so all const/let declarations in the alerts block (paths, // helpers, etc.) have evaluated before the daemon reads them. Calling // from earlier in the file would hit a temporal dead zone on these // bindings. startAlertsDaemon().catch(e => console.error(`[alerts] startup error: ${e.message}`));