Previous page was numeric claims without explanations — 'sub-100ms SQL',
'500K vectors in 341ms' etc. Accurate but undefendable without math,
code paths, and ADR references. Expanded to 8 chapters:
Ch1 — Live receipts (unchanged: real gateway tests, pass/fail, timing)
Ch2 — Architecture. 13-crate diagram with per-crate responsibility
table and file paths. gateway → catalogd/queryd/vectord/ingestd
+ aibridge → object_store. References ADRs 1-20.
Ch3 — Dual-agent recursive consensus loop (NEW)
- Role specialization (executor=optimist, reviewer=pessimist)
- Parallel orchestration via Promise.all
- Recursive: sealed playbooks feed playbook_memory → next query
- Termination math: sealed | tool-error abort | drift abort |
turn-cap abort — every path dumps forensic log
- File refs: tests/multi-agent/agent.ts, orchestrator.ts,
scenario.ts, run_e2e_rated.ts
Ch4 — Playbook memory feedback loop (NEW)
- PlaybookEntry shape with embedding
- Full boost math: similarity * base_weight * decay * penalty
/ n_workers, capped at MAX_BOOST_PER_WORKER
- Temporal decay (e^-age/30, 30d half-life)
- Negative signal (0.5^failures)
- Why k=200: narrow cosine discrimination in nomic-embed-text
- Evidence: compounding test 0 → 0.250 cap in 3 seeds
- persist_sql write-through
- Pattern discovery (Path 2 meta-index)
- File: crates/vectord/src/playbook_memory.rs
Ch5 — ADR citations for each key choice
ADR-001, 008, 012, 015, 019, 020 + Phase 19 design note
Ch6 — Live scale data (unchanged: pulled from /proof.json)
Ch7 — Reproduction recipes: curl for health, sql, hybrid with boost,
patterns, pm stats, and the full dual-agent scenario run
Ch8 — Honest limits (unchanged: synthetic workers_500k, 1K candidates
misaligned to call_log, 7B model imperfection, no rate/margin)
Every architectural claim now cites either the code path
(crates/.../src/file.rs::fn_name) or the ADR (docs/DECISIONS.md).
Someone disputing the system has specific targets to attack.
Mechanism unchanged: /proof serves mcp-server/proof.html via
Bun.file. /proof.json still returns the live test data the page
consumes client-side.
1567 lines
84 KiB
TypeScript
1567 lines
84 KiB
TypeScript
/**
|
|
* Lakehouse MCP Server — bridges local LLMs to the data substrate.
|
|
*
|
|
* Tools:
|
|
* - search_workers: hybrid SQL+vector (the core fix)
|
|
* - query_sql: analytical SQL on any dataset
|
|
* - match_contract: find workers for a job order
|
|
* - get_worker: single worker by ID
|
|
* - rag_question: full RAG pipeline
|
|
* - log_success: record what worked → playbook DB
|
|
* - get_playbooks: retrieve past successes
|
|
* - swap_profile: hot-swap model + data context
|
|
* - vram_status: GPU introspection
|
|
*/
|
|
|
|
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
|
|
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
|
|
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
|
|
import { z } from "zod";
|
|
import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js";
|
|
|
|
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
|
const PORT = parseInt(process.env.MCP_PORT || "3700");
|
|
const MODE = process.env.MCP_TRANSPORT || "http"; // "stdio" or "http"
|
|
|
|
// Active trace for the current request — set per-request in the HTTP handler
|
|
let activeTrace: ReturnType<typeof startTrace> | null = null;
|
|
|
|
async function api(method: string, path: string, body?: any) {
|
|
const t0 = Date.now();
|
|
const resp = await fetch(`${BASE}${path}`, {
|
|
method,
|
|
headers: body ? { "Content-Type": "application/json" } : {},
|
|
body: body ? JSON.stringify(body) : undefined,
|
|
});
|
|
const text = await resp.text();
|
|
const ms = Date.now() - t0;
|
|
let parsed: any;
|
|
try { parsed = JSON.parse(text); } catch { parsed = { raw: text, status: resp.status }; }
|
|
|
|
// Trace the call if we have an active trace
|
|
if (activeTrace) {
|
|
const isGen = path.includes("/generate");
|
|
if (isGen) {
|
|
logGeneration(activeTrace, `lakehouse${path}`, {
|
|
model: body?.model || "unknown",
|
|
prompt: typeof body?.prompt === "string" ? body.prompt.slice(0, 500) : JSON.stringify(body).slice(0, 300),
|
|
completion: typeof parsed?.text === "string" ? parsed.text.slice(0, 500) : JSON.stringify(parsed).slice(0, 300),
|
|
duration_ms: ms,
|
|
tokens_in: parsed?.prompt_eval_count,
|
|
tokens_out: parsed?.eval_count,
|
|
});
|
|
} else {
|
|
logSpan(activeTrace, `lakehouse${path}`, body, {
|
|
rows: parsed?.row_count, sources: parsed?.sources?.length,
|
|
sql_matches: parsed?.sql_matches, method: parsed?.method,
|
|
}, ms);
|
|
}
|
|
}
|
|
|
|
return parsed;
|
|
}
|
|
|
|
const server = new McpServer({ name: "lakehouse", version: "1.0.0" });
|
|
|
|
server.tool(
|
|
"search_workers",
|
|
"Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.",
|
|
{
|
|
question: z.string().describe("Natural language question about workers"),
|
|
sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""),
|
|
dataset: z.string().default("ethereal_workers"),
|
|
id_column: z.string().default("worker_id"),
|
|
top_k: z.number().default(5),
|
|
},
|
|
async ({ question, sql_filter, dataset, id_column, top_k }) => {
|
|
const body: any = {
|
|
question, index_name: "workers_500k_v1", filter_dataset: dataset, id_column, top_k, generate: true,
|
|
use_playbook_memory: true,
|
|
};
|
|
if (sql_filter) body.sql_filter = sql_filter;
|
|
const r = await api("POST", "/vectors/hybrid", body);
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"query_sql",
|
|
"Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (100K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).",
|
|
{ sql: z.string().describe("SQL query") },
|
|
async ({ sql }) => {
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] };
|
|
return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"match_contract",
|
|
"Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.",
|
|
{
|
|
role: z.string(), state: z.string(), city: z.string().optional(),
|
|
min_reliability: z.number().default(0.7),
|
|
required_certs: z.array(z.string()).default([]),
|
|
headcount: z.number().default(5),
|
|
},
|
|
async ({ role, state, city, min_reliability, required_certs, headcount }) => {
|
|
let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`;
|
|
if (city) filter += ` AND city = '${city}'`;
|
|
const r = await api("POST", "/vectors/hybrid", {
|
|
question: `Find the best ${role} workers with relevant skills and certifications`,
|
|
index_name: "workers_500k_v1", sql_filter: filter,
|
|
filter_dataset: "ethereal_workers", id_column: "worker_id",
|
|
top_k: headcount * 2, generate: false,
|
|
use_playbook_memory: true,
|
|
});
|
|
let matches = r.sources || [];
|
|
if (required_certs.length > 0) {
|
|
const req = new Set(required_certs.map((c: string) => c.toLowerCase()));
|
|
matches = matches.filter((m: any) => {
|
|
const certs = (m.chunk_text || "").toLowerCase();
|
|
return [...req].every(c => certs.includes(c));
|
|
});
|
|
}
|
|
return { content: [{ type: "text" as const, text: JSON.stringify({
|
|
contract: { role, state, city, min_reliability, required_certs },
|
|
matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method,
|
|
}, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"get_worker",
|
|
"Fetch one worker profile by ID — all fields including scores and comms.",
|
|
{ worker_id: z.number() },
|
|
async ({ worker_id }) => {
|
|
const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` });
|
|
if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] };
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"rag_question",
|
|
"Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.",
|
|
{ question: z.string(), index: z.string().default("workers_500k_v1"), top_k: z.number().default(5) },
|
|
async ({ question, index, top_k }) => {
|
|
const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k });
|
|
return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"log_success",
|
|
"Record a successful operation to the playbook database. Small models query this later to learn what worked.",
|
|
{
|
|
operation: z.string().describe("What was done"),
|
|
approach: z.string().describe("How it was done"),
|
|
result: z.string().describe("Outcome"),
|
|
context: z.string().optional(),
|
|
},
|
|
async ({ operation, approach, result, context }) => {
|
|
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`;
|
|
const form = new FormData();
|
|
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
|
|
const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
|
|
return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"get_playbooks",
|
|
"Retrieve past successful operations. Small models use this to learn what approaches worked.",
|
|
{ keyword: z.string().optional(), limit: z.number().default(10) },
|
|
async ({ keyword, limit }) => {
|
|
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] };
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"swap_profile",
|
|
"Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).",
|
|
{ profile_id: z.string() },
|
|
async ({ profile_id }) => {
|
|
const r = await api("POST", `/vectors/profile/${profile_id}/activate`);
|
|
return { content: [{ type: "text" as const, text: JSON.stringify({
|
|
profile: r.profile_id, model: r.ollama_name,
|
|
indexes: r.indexes_warmed?.length, vectors: r.total_vectors,
|
|
previous: r.previous_profile, duration: r.duration_secs,
|
|
}, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
server.tool(
|
|
"vram_status",
|
|
"GPU VRAM usage + loaded Ollama models. Check before swapping profiles.",
|
|
{},
|
|
async () => {
|
|
const r = await api("GET", "/ai/vram");
|
|
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
|
|
},
|
|
);
|
|
|
|
// Resources — these give any MCP client full context about the system
|
|
|
|
server.resource("lakehouse://system", "lakehouse://system", async (uri) => {
|
|
const health = await api("GET", "/health");
|
|
const datasets = await api("GET", "/catalog/datasets") as any[];
|
|
const indexes = await api("GET", "/vectors/indexes") as any[];
|
|
const vram = await api("GET", "/ai/vram");
|
|
const agent = await api("GET", "/vectors/agent/status");
|
|
const buckets = await api("GET", "/storage/buckets");
|
|
|
|
const text = `# Lakehouse System Status
|
|
|
|
## Health: ${health === "lakehouse ok" ? "OK" : JSON.stringify(health)}
|
|
|
|
## Datasets (${datasets.length})
|
|
${datasets.map((d: any) => `- ${d.name}: ${d.row_count || "?"} rows`).join("\n")}
|
|
|
|
## Vector Indexes (${indexes.length})
|
|
${(indexes as any[]).map((i: any) => `- ${i.index_name}: ${i.chunk_count} chunks (${i.vector_backend || "parquet"})`).join("\n")}
|
|
|
|
## GPU
|
|
- Used: ${vram?.gpu?.used_mib || "?"}/${vram?.gpu?.total_mib || "?"} MiB
|
|
- Models loaded: ${(vram?.ollama_loaded || []).map((m: any) => m.name).join(", ") || "none"}
|
|
|
|
## Autotune Agent
|
|
- Running: ${agent?.running}, Trials: ${agent?.trials_run}, Promotions: ${agent?.promotions}
|
|
|
|
## Buckets (${(buckets as any[])?.length || 0})
|
|
${(buckets as any[] || []).map((b: any) => `- ${b.name}: ${b.backend} (${b.reachable ? "reachable" : "DOWN"})`).join("\n")}
|
|
|
|
## Services
|
|
- Lakehouse Gateway: :3100
|
|
- AI Sidecar: :3200
|
|
- Agent Gateway: :3700
|
|
- Langfuse: :3001
|
|
- MinIO S3: :9000
|
|
- Ollama: :11434
|
|
|
|
## Available Models
|
|
- qwen3: 8.2B, 40K context, thinking+tools (best for reasoning)
|
|
- qwen2.5: 7B, 8K context (best for fast SQL generation)
|
|
- mistral: 7B, 8K context (general generation)
|
|
- nomic-embed-text: 137M (embedding, automatic)
|
|
`;
|
|
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
|
|
});
|
|
|
|
server.resource("lakehouse://architecture", "lakehouse://architecture", async (uri) => {
|
|
// Read the PRD directly
|
|
const prd = await Bun.file("/home/profit/lakehouse/docs/PRD.md").text().catch(() => "PRD not found");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: prd }] };
|
|
});
|
|
|
|
server.resource("lakehouse://instructions", "lakehouse://instructions", async (uri) => {
|
|
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "Instructions not found");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: instructions }] };
|
|
});
|
|
|
|
server.resource("lakehouse://playbooks", "lakehouse://playbooks", async (uri) => {
|
|
const r = await api("POST", "/query/sql", {
|
|
sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20"
|
|
});
|
|
const rows = r?.rows || [];
|
|
const text = rows.length === 0
|
|
? "No playbooks yet. Log successful operations with the log_success tool."
|
|
: rows.map((p: any) => `## ${p.operation}\n- Approach: ${p.approach}\n- Result: ${p.result}\n- Context: ${p.context || "—"}\n`).join("\n");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: `# Successful Playbooks\n\n${text}` }] };
|
|
});
|
|
|
|
server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => {
|
|
const r = await api("GET", "/catalog/datasets") as any[];
|
|
const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n");
|
|
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
|
|
});
|
|
|
|
// ─── Dual mode: stdio (Claude Code) or HTTP (internal agents) ───
|
|
|
|
async function main() {
|
|
if (MODE === "stdio") {
|
|
const transport = new StdioServerTransport();
|
|
await server.connect(transport);
|
|
console.error(`Lakehouse MCP (stdio) → ${BASE}`);
|
|
return;
|
|
}
|
|
|
|
// HTTP mode — a REST gateway that internal agents call directly.
|
|
// No MCP protocol complexity for consumers — just POST JSON, get JSON.
|
|
// The MCP tool definitions above are reused for the stdio path; this
|
|
// HTTP path wraps the same lakehouse API with agent-friendly routing.
|
|
|
|
Bun.serve({
|
|
port: PORT,
|
|
async fetch(req) {
|
|
const url = new URL(req.url);
|
|
const json = async () => req.method === "POST" ? await req.json() : {};
|
|
|
|
// CORS — dashboard runs in the browser, gateway is a different origin
|
|
const cors = {
|
|
"Access-Control-Allow-Origin": "*",
|
|
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
|
"Access-Control-Allow-Headers": "Content-Type",
|
|
};
|
|
if (req.method === "OPTIONS") return new Response(null, { status: 204, headers: cors });
|
|
|
|
const ok = (data: any) => Response.json(data, { headers: cors });
|
|
const err = (msg: string, status = 400) => Response.json({ error: msg }, { status, headers: cors });
|
|
|
|
try {
|
|
// Health — no trace needed
|
|
if (url.pathname === "/health") return ok({ status: "ok", lakehouse: BASE, tools: 11 });
|
|
|
|
// Start a Langfuse trace for every non-static request
|
|
if (req.method === "POST" || !["/","/dashboard","/dashboard.css","/dashboard.ts","/dashboard.js"].includes(url.pathname)) {
|
|
activeTrace = startTrace(`gw:${url.pathname}`, { method: req.method, path: url.pathname });
|
|
}
|
|
|
|
// Self-orientation: any agent calls this first to understand the system
|
|
if (url.pathname === "/context") {
|
|
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "");
|
|
const datasets = await api("GET", "/catalog/datasets") as any[];
|
|
const indexes = await api("GET", "/vectors/indexes") as any[];
|
|
const vram = await api("GET", "/ai/vram");
|
|
return ok({
|
|
system: "Lakehouse Staffing Co-Pilot",
|
|
purpose: "AI anticipates staffing coordinator needs — pre-matches workers to contracts, surfaces alerts, builds playbooks from successful operations",
|
|
instructions: instructions.slice(0, 3000),
|
|
datasets: (datasets || []).map((d: any) => ({ name: d.name, rows: d.row_count })),
|
|
indexes: (indexes || []).map((i: any) => ({ name: i.index_name, chunks: i.chunk_count, backend: i.vector_backend })),
|
|
models: { qwen3: "8.2B reasoning+tools", qwen2_5: "7B fast SQL", mistral: "7B generation", nomic: "137M embedding" },
|
|
vram: vram?.gpu,
|
|
tools: ["/search","/sql","/match","/worker/:id","/ask","/log","/playbooks","/profile/:id","/vram","/context","/verify"],
|
|
rules: [
|
|
"Never hallucinate — only state facts from tool responses",
|
|
"SQL for counts/aggregations, hybrid /search for matching",
|
|
"Log every successful operation to /log",
|
|
"Check /playbooks before complex tasks",
|
|
"Verify worker details via /worker/:id before communicating",
|
|
],
|
|
});
|
|
}
|
|
|
|
// Verification endpoint — agent can check any claim against SQL
|
|
if (url.pathname === "/verify") {
|
|
const b = await json();
|
|
// b.claim: "worker 4925 is a Forklift Operator in IL with reliability 0.82"
|
|
// b.worker_id: 4925
|
|
// b.checks: { role: "Forklift Operator", state: "IL", reliability: 0.82 }
|
|
if (!b.worker_id) return err("worker_id required");
|
|
const r = await api("POST", "/query/sql", {
|
|
sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${b.worker_id}`
|
|
});
|
|
const worker = r?.rows?.[0];
|
|
if (!worker) return ok({ verified: false, reason: `worker ${b.worker_id} not found` });
|
|
|
|
const checks = b.checks || {};
|
|
const failures: string[] = [];
|
|
for (const [field, expected] of Object.entries(checks)) {
|
|
const actual = worker[field];
|
|
if (actual === undefined) continue;
|
|
if (typeof expected === "number") {
|
|
if (Math.abs(Number(actual) - expected) > 0.05) {
|
|
failures.push(`${field}: claimed=${expected} actual=${actual}`);
|
|
}
|
|
} else if (String(actual).toLowerCase() !== String(expected).toLowerCase()) {
|
|
failures.push(`${field}: claimed=${expected} actual=${actual}`);
|
|
}
|
|
}
|
|
return ok({
|
|
verified: failures.length === 0,
|
|
worker_id: b.worker_id,
|
|
worker_name: worker.name,
|
|
failures,
|
|
actual: worker,
|
|
});
|
|
}
|
|
|
|
// Tool: hybrid search
|
|
// ─── Client blacklists (feature #2) ───────────────────────────
|
|
// Per-client worker exclusion list. A worker blacklisted for
|
|
// client X is hidden from /search and /match when the caller
|
|
// passes `client: "X"`. Persisted to local JSON so it survives
|
|
// Bun restarts. This is a trust-critical feature — if the
|
|
// system recommends a worker the client already flagged, the
|
|
// system's credibility is gone.
|
|
if (url.pathname.startsWith("/clients/") && url.pathname.includes("/blacklist")) {
|
|
const m = url.pathname.match(/^\/clients\/([^\/]+)\/blacklist\/?(.*)$/);
|
|
if (m) {
|
|
const client = decodeURIComponent(m[1]);
|
|
const suffix = m[2]; // empty, or a worker_id to delete
|
|
|
|
if (req.method === "GET") {
|
|
const list = await loadClientBlacklist(client);
|
|
return ok({ client, entries: list });
|
|
}
|
|
if (req.method === "POST" && !suffix) {
|
|
const b = await json();
|
|
if (!b.worker_id) return err("worker_id required", 400);
|
|
const entry = {
|
|
worker_id: String(b.worker_id),
|
|
name: b.name || "",
|
|
reason: b.reason || "",
|
|
added_at: new Date().toISOString(),
|
|
};
|
|
const list = await addToClientBlacklist(client, entry);
|
|
return ok({ client, added: entry, total: list.length });
|
|
}
|
|
if (req.method === "DELETE" && suffix) {
|
|
const worker_id = decodeURIComponent(suffix);
|
|
const { removed, total } = await removeFromClientBlacklist(client, worker_id);
|
|
return ok({ client, removed, total });
|
|
}
|
|
return err(`unsupported method ${req.method} for blacklist`, 405);
|
|
}
|
|
}
|
|
|
|
if (url.pathname === "/search") {
|
|
const b = await json();
|
|
// Availability soft-filter: if the caller didn't constrain
|
|
// availability and isn't explicitly opting out, auto-append
|
|
// `availability > 0.5`. Recruiters calling this route expect
|
|
// "available workers" by default; surfacing someone who's on
|
|
// an active placement breaks trust on the first call.
|
|
let filter = b.sql_filter as (string | undefined);
|
|
const optOut = b.include_unavailable === true;
|
|
if (!optOut && filter && !/availability/i.test(filter)) {
|
|
filter = `(${filter}) AND CAST(availability AS DOUBLE) > 0.5`;
|
|
}
|
|
// Client blacklist filter: if caller passes `client`, exclude
|
|
// worker_ids that client has flagged. One SQL expression
|
|
// added, no extra round-trip needed by the caller.
|
|
if (b.client && filter) {
|
|
const bl = await loadClientBlacklist(String(b.client));
|
|
const ids = bl.map(e => e.worker_id).filter(x => /^\d+$/.test(x));
|
|
if (ids.length > 0) {
|
|
filter = `(${filter}) AND worker_id NOT IN (${ids.join(",")})`;
|
|
}
|
|
}
|
|
return ok(await api("POST", "/vectors/hybrid", {
|
|
question: b.question, index_name: b.index || "workers_500k_v1",
|
|
sql_filter: filter, filter_dataset: b.dataset || "ethereal_workers",
|
|
id_column: b.id_column || "worker_id", top_k: b.top_k || 5, generate: b.generate !== false,
|
|
use_playbook_memory: b.use_playbook_memory !== false,
|
|
playbook_memory_k: b.playbook_memory_k ?? 200,
|
|
}));
|
|
}
|
|
|
|
// Tool: SQL
|
|
if (url.pathname === "/sql") {
|
|
const b = await json();
|
|
return ok(await api("POST", "/query/sql", { sql: b.sql }));
|
|
}
|
|
|
|
// Tool: match contract
|
|
if (url.pathname === "/match") {
|
|
const b = await json();
|
|
let filter = `role = '${b.role}' AND state = '${b.state}' AND reliability >= ${b.min_reliability || 0.7}`;
|
|
if (b.city) filter += ` AND city = '${b.city}'`;
|
|
return ok(await api("POST", "/vectors/hybrid", {
|
|
question: `Best ${b.role} workers with relevant skills`,
|
|
index_name: b.index || "workers_500k_v1", sql_filter: filter,
|
|
filter_dataset: b.dataset || "ethereal_workers",
|
|
id_column: "worker_id", top_k: (b.headcount || 5) * 2, generate: false,
|
|
use_playbook_memory: true,
|
|
playbook_memory_k: 200,
|
|
}));
|
|
}
|
|
|
|
// Tool: get worker
|
|
if (url.pathname.startsWith("/worker/")) {
|
|
const id = url.pathname.split("/")[2];
|
|
return ok(await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${id}` }));
|
|
}
|
|
|
|
// Tool: RAG
|
|
if (url.pathname === "/ask") {
|
|
const b = await json();
|
|
return ok(await api("POST", "/vectors/rag", { index_name: b.index || "workers_500k_v1", question: b.question, top_k: b.top_k || 5 }));
|
|
}
|
|
|
|
// Tool: log success.
|
|
//
|
|
// BUG FIX 2026-04-20: previously this also POSTed a 1-row CSV to
|
|
// /ingest/file?name=successful_playbooks. That endpoint REPLACES
|
|
// the dataset's object list rather than appending — so every /log
|
|
// call destroyed all prior rows in the SQL-queryable
|
|
// successful_playbooks table. Chain-of-custody trace caught it:
|
|
// sp_rows went 33 → 1 in a single /log call.
|
|
//
|
|
// Until a proper append endpoint exists (Phase 8 delta write
|
|
// surface for the SQL table), /log writes ONLY to playbook_memory
|
|
// (in-memory append-only store, works correctly for boost). The
|
|
// SQL successful_playbooks table is now treated as derived state
|
|
// that gets rebuilt explicitly via /vectors/playbook_memory/rebuild
|
|
// — never written to by the recruiter path.
|
|
if (url.pathname === "/log") {
|
|
const b = await json();
|
|
// Result format expected: "{filled}/{needed} filled → Name1, Name2, Name3"
|
|
const result = String(b.result || "");
|
|
const arrowIdx = result.indexOf("→");
|
|
const namesPart = arrowIdx >= 0 ? result.slice(arrowIdx + 1) : "";
|
|
const rawEndorsed = namesPart.split(",").map(s => s.trim()).filter(Boolean);
|
|
|
|
// Parse the contract's (city, state) from operation. Seed is
|
|
// keyed by (city, state, name) so validation must match those
|
|
// coordinates, not just the name.
|
|
const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/);
|
|
const city = opMatch ? opMatch[1].trim() : "";
|
|
const state = opMatch ? opMatch[2].trim() : "";
|
|
|
|
// Ghost-name guard — /log previously accepted any endorsed
|
|
// names without verification. Those ghosts landed in
|
|
// playbook_memory, grew the entry count, but boost silently
|
|
// never fired because no real worker chunk ever matched the
|
|
// stored (city, state, name) tuple. Real-test on 2026-04-20
|
|
// surfaced this. Validate against workers_500k before seeding.
|
|
let endorsed: string[] = rawEndorsed;
|
|
let rejected: string[] = [];
|
|
if (rawEndorsed.length && city && state) {
|
|
const quoted = rawEndorsed.map(n => `'${n.replace(/'/g, "''")}'`).join(",");
|
|
const sql = `SELECT DISTINCT name FROM workers_500k `
|
|
+ `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' `
|
|
+ `AND state = '${state.replace(/'/g,"''")}'`;
|
|
const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any;
|
|
const found = new Set((vr.rows ?? []).map((r: any) => r.name));
|
|
endorsed = rawEndorsed.filter(n => found.has(n));
|
|
rejected = rawEndorsed.filter(n => !found.has(n));
|
|
}
|
|
|
|
let seeded = 0;
|
|
let persisted_rows = 0;
|
|
if (endorsed.length && /fill:.+ in .+,.+/i.test(String(b.operation || ""))) {
|
|
const canonicalApproach = `${(b.approach || "manual log").split(/[\.\n]/)[0]}`.slice(0, 80);
|
|
const canonicalContext = `${(b.context || "").split(/[\.\n]/)[0]}`.slice(0, 80);
|
|
const seedRes = await api("POST", "/vectors/playbook_memory/seed", {
|
|
operation: b.operation,
|
|
approach: canonicalApproach,
|
|
context: canonicalContext,
|
|
endorsed_names: endorsed,
|
|
append: true,
|
|
}).catch(() => null) as any;
|
|
if (seedRes && seedRes.playbook_id) {
|
|
seeded = endorsed.length;
|
|
const pr = await api("POST", "/vectors/playbook_memory/persist_sql", {}).catch(() => null) as any;
|
|
if (pr && typeof pr.rows_persisted === "number") persisted_rows = pr.rows_persisted;
|
|
}
|
|
}
|
|
return ok({
|
|
logged: true,
|
|
seeded,
|
|
persisted_to_sql: persisted_rows,
|
|
rejected_ghost_names: rejected,
|
|
note: rejected.length
|
|
? `${rejected.length} endorsed name(s) not found in workers_500k for ${city}, ${state} — skipped seeding to prevent silent boost failure.`
|
|
: "successful_playbooks_live is the SQL surface for live operator activity. /log is non-destructive and name-validated.",
|
|
});
|
|
}
|
|
|
|
// Tool: log FAILED fill — negative signal for Phase 19 boost.
|
|
// Workers named here get a 0.5^n penalty on future positive
|
|
// boosts in the same (city, state). Three failures effectively
|
|
// zero the boost; five make the worker invisible to the re-rank.
|
|
// Names are validated against workers_500k same as /log.
|
|
if (url.pathname === "/log_failure") {
|
|
const b = await json();
|
|
const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/);
|
|
const city = opMatch ? opMatch[1].trim() : "";
|
|
const state = opMatch ? opMatch[2].trim() : "";
|
|
const rawNames: string[] = Array.isArray(b.failed_names) ? b.failed_names : [];
|
|
if (!city || !state) {
|
|
return err("operation must be 'fill: Role xN in City, ST'", 400);
|
|
}
|
|
if (rawNames.length === 0) return err("failed_names must be a non-empty array", 400);
|
|
|
|
const quoted = rawNames.map((n: string) => `'${n.replace(/'/g, "''")}'`).join(",");
|
|
const sql = `SELECT DISTINCT name FROM workers_500k `
|
|
+ `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' `
|
|
+ `AND state = '${state.replace(/'/g,"''")}'`;
|
|
const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any;
|
|
const found = new Set((vr.rows ?? []).map((r: any) => r.name));
|
|
const failed_names = rawNames.filter((n: string) => found.has(n));
|
|
const rejected = rawNames.filter((n: string) => !found.has(n));
|
|
|
|
if (failed_names.length === 0) {
|
|
return ok({ marked: 0, rejected_ghost_names: rejected,
|
|
note: "no failed_names matched workers_500k for this geo" });
|
|
}
|
|
const mr = await api("POST", "/vectors/playbook_memory/mark_failed", {
|
|
operation: b.operation,
|
|
failed_names,
|
|
reason: b.reason || "",
|
|
});
|
|
return ok({
|
|
marked: mr?.added ?? 0,
|
|
rejected_ghost_names: rejected,
|
|
city, state,
|
|
note: `Each marked worker's positive boost in ${city}, ${state} is halved per recorded failure.`,
|
|
});
|
|
}
|
|
|
|
// Tool: get playbooks
|
|
if (url.pathname === "/playbooks") {
|
|
const kw = url.searchParams.get("keyword");
|
|
const limit = url.searchParams.get("limit") || "10";
|
|
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
if (kw) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${kw}%' OR approach LIKE '%${kw}%' ORDER BY timestamp DESC LIMIT ${limit}`;
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
return ok(r.error ? { playbooks: [], note: "No playbooks yet" } : { playbooks: r.rows });
|
|
}
|
|
|
|
// Tool: swap profile
|
|
if (url.pathname.startsWith("/profile/")) {
|
|
const id = url.pathname.split("/")[2];
|
|
return ok(await api("POST", `/vectors/profile/${id}/activate`));
|
|
}
|
|
|
|
// Tool: VRAM
|
|
if (url.pathname === "/vram") return ok(await api("GET", "/ai/vram"));
|
|
|
|
// Pass-through to lakehouse for anything else
|
|
if (url.pathname.startsWith("/api/")) {
|
|
const path = url.pathname.replace("/api", "");
|
|
const body = req.method !== "GET" ? await req.text() : undefined;
|
|
const r = await fetch(`${BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body });
|
|
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
|
|
}
|
|
|
|
// Proof — narrative HTML served from mcp-server/proof.html.
|
|
// Live tests consumed client-side via /proof.json.
|
|
if (url.pathname === "/proof") {
|
|
return new Response(Bun.file(import.meta.dir + "/proof.html"), {
|
|
headers: { ...cors, "Content-Type": "text/html" },
|
|
});
|
|
}
|
|
|
|
|
|
// Proof JSON API (same data, no HTML)
|
|
if (url.pathname === "/proof.json") {
|
|
const ds = await api("GET", "/catalog/datasets") as any[];
|
|
const indexes = await api("GET", "/vectors/indexes") as any[];
|
|
const vram = await api("GET", "/ai/vram");
|
|
const totalRows = (ds || []).reduce((s: number, d: any) => s + (d.row_count || 0), 0);
|
|
const totalChunks = (indexes || []).reduce((s: number, i: any) => s + i.chunk_count, 0);
|
|
|
|
// Run live SQL tests
|
|
const tests: any[] = [];
|
|
const sqls = [
|
|
["COUNT 500K workers", "SELECT COUNT(*) FROM workers_500k"],
|
|
["COUNT 1M timesheets", "SELECT COUNT(*) FROM timesheets"],
|
|
["Filter+aggregate 500K", "SELECT role, COUNT(*) cnt FROM workers_500k WHERE state='IL' AND CAST(reliability AS DOUBLE)>0.8 GROUP BY role ORDER BY cnt DESC LIMIT 3"],
|
|
["Cross-table JOIN", "SELECT COUNT(*) FROM candidates c JOIN (SELECT candidate_id, COUNT(*) calls FROM call_log GROUP BY candidate_id HAVING COUNT(*)>=5) cl ON c.candidate_id=cl.candidate_id WHERE c.city='Chicago'"],
|
|
];
|
|
for (const [name, sql] of sqls) {
|
|
const t0 = Date.now();
|
|
const r = await api("POST", "/query/sql", { sql });
|
|
const ms = Date.now() - t0;
|
|
tests.push({ name, ms, result: r.rows?.[0] || r.error, pass: !r.error });
|
|
}
|
|
|
|
// Hybrid test
|
|
const ht0 = Date.now();
|
|
const hybrid = await api("POST", "/vectors/hybrid", {
|
|
question: "reliable forklift operator", index_name: "workers_500k_v1",
|
|
sql_filter: "role = 'Forklift Operator' AND state = 'IL' AND CAST(reliability AS DOUBLE) > 0.8",
|
|
filter_dataset: "workers_500k", id_column: "worker_id", top_k: 5, generate: false,
|
|
use_playbook_memory: true,
|
|
});
|
|
tests.push({
|
|
name: "Hybrid SQL+Vector", ms: Date.now() - ht0,
|
|
result: `sql=${hybrid.sql_matches} → ${hybrid.vector_reranked} verified results`,
|
|
pass: (hybrid.vector_reranked || 0) > 0,
|
|
sources: hybrid.sources?.slice(0, 3),
|
|
});
|
|
|
|
return ok({
|
|
title: "Lakehouse Proof of Work",
|
|
generated: new Date().toISOString(),
|
|
server: "192.168.1.177 (i9 + 128GB RAM + A4000 16GB)",
|
|
scale: { datasets: ds?.length, total_rows: totalRows, indexes: indexes?.length, total_chunks: totalChunks },
|
|
gpu: vram?.gpu,
|
|
tests,
|
|
recall: { hnsw: 0.98, lance: 0.94, note: "Measured on 50K real nomic-embed-text embeddings, 30 queries" },
|
|
lance_10m: { vectors: 10_000_000, disk_gb: 32.9, search_p50_ms: 5, note: "Past HNSW RAM ceiling" },
|
|
verify: "SSH into server, run: curl http://localhost:3100/health — or open http://192.168.1.177:3700/proof",
|
|
});
|
|
}
|
|
|
|
// Dashboard — calls lakehouse /vectors/hybrid directly (no gateway hop)
|
|
if (url.pathname === "/" || url.pathname === "/dashboard") {
|
|
return new Response(Bun.file(import.meta.dir + "/search.html"), {
|
|
headers: { ...cors, "Content-Type": "text/html" },
|
|
});
|
|
}
|
|
if (url.pathname === "/dashboard.css") {
|
|
return new Response(Bun.file(import.meta.dir + "/dashboard.css"), { headers: { "Content-Type": "text/css" } });
|
|
}
|
|
if (url.pathname === "/dashboard.ts" || url.pathname === "/dashboard.js") {
|
|
// Bun transpiles TS on the fly
|
|
const built = await Bun.build({ entrypoints: [import.meta.dir + "/dashboard.ts"], target: "browser" });
|
|
const js = await built.outputs[0].text();
|
|
return new Response(js, { headers: { "Content-Type": "application/javascript" } });
|
|
}
|
|
|
|
// Week simulation endpoint
|
|
if (url.pathname === "/simulation/run" && req.method === "POST") {
|
|
return ok(await runWeekSimulation());
|
|
}
|
|
|
|
// ─── Staffing Intelligence Console ───
|
|
if (url.pathname === "/console") {
|
|
return new Response(Bun.file(import.meta.dir + "/console.html"));
|
|
}
|
|
|
|
// Intelligence: Market data — public building permits → staffing demand forecast
|
|
if (url.pathname === "/intelligence/market" && req.method === "POST") {
|
|
const start = Date.now();
|
|
try {
|
|
// Fetch Chicago building permits (public Socrata API — real data)
|
|
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
|
|
const [bigR, byTypeR, recentR, benchR] = await Promise.all([
|
|
// Top 8 largest permits by cost
|
|
fetch(`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,latitude,longitude&$where=reported_cost>1000000 AND issue_date>'2025-06-01'&$order=reported_cost DESC&$limit=50`).then(r => r.json()),
|
|
// Permits grouped by work type
|
|
fetch(`${permitUrl}?$select=work_type,count(*) as cnt,sum(reported_cost) as total_cost&$where=reported_cost>10000 AND issue_date>'2025-06-01'&$group=work_type&$order=total_cost DESC&$limit=10`).then(r => r.json()),
|
|
// Most recent permits
|
|
fetch(`${permitUrl}?$select=work_type,work_description,reported_cost,street_name,issue_date&$where=reported_cost>50000&$order=issue_date DESC&$limit=5`).then(r => r.json()),
|
|
// Our worker bench in IL (cross-reference)
|
|
api("POST", "/query/sql", { sql: "SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k WHERE state='IL' GROUP BY role ORDER BY supply DESC" }),
|
|
]);
|
|
|
|
// Map construction types to staffing roles
|
|
const typeToRoles: Record<string, string[]> = {
|
|
"Electrical Work": ["Electrician","Maintenance Tech"],
|
|
"Masonry Work": ["Production Worker","Loader","Material Handler"],
|
|
"Mechanical Work": ["Maintenance Tech","Machine Operator","Welder"],
|
|
"Reroofing": ["Production Worker","Loader"],
|
|
"Plumbing Work": ["Maintenance Tech"],
|
|
"": ["Forklift Operator","Loader","Material Handler","Production Worker","Warehouse Associate"],
|
|
};
|
|
|
|
// Build demand forecast from permit types
|
|
const forecast: any[] = [];
|
|
for (const t of (byTypeR || [])) {
|
|
const wtype = t.work_type || "(general construction)";
|
|
const totalCost = parseFloat(t.total_cost || 0);
|
|
const cnt = parseInt(t.cnt || 0);
|
|
const estWorkers = Math.round(totalCost / 150000); // industry heuristic
|
|
const roles = typeToRoles[t.work_type || ""] || typeToRoles[""];
|
|
forecast.push({ work_type: wtype, permits: cnt, total_cost: totalCost, estimated_workers: estWorkers, needed_roles: roles });
|
|
}
|
|
|
|
// Cross-reference with our bench
|
|
const ilBench = (benchR.rows || []).reduce((m: any, r: any) => { m[r.role] = r; return m; }, {});
|
|
const gaps: any[] = [];
|
|
for (const f of forecast) {
|
|
for (const role of f.needed_roles) {
|
|
const b = ilBench[role];
|
|
if (b) {
|
|
const coverage = Math.round((b.available / Math.max(f.estimated_workers, 1)) * 100);
|
|
gaps.push({ role, demand: f.estimated_workers, supply: b.supply, available: b.available, reliable: b.reliable, coverage_pct: Math.min(coverage, 999), source: f.work_type });
|
|
}
|
|
}
|
|
}
|
|
|
|
return ok({
|
|
major_permits: (bigR || []).map((p: any) => ({
|
|
cost: parseFloat(p.reported_cost || 0),
|
|
description: (p.work_description || "").substring(0, 100),
|
|
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
|
|
type: p.work_type || p.permit_type || "",
|
|
date: (p.issue_date || "").substring(0, 10),
|
|
lat: p.latitude, lng: p.longitude,
|
|
})),
|
|
by_type: forecast,
|
|
recent: (recentR || []).map((p: any) => ({
|
|
type: p.work_type || "", description: (p.work_description || "").substring(0, 80),
|
|
cost: parseFloat(p.reported_cost || 0), street: p.street_name || "", date: (p.issue_date || "").substring(0, 10),
|
|
})),
|
|
il_bench: benchR.rows || [],
|
|
gaps,
|
|
total_construction_value: forecast.reduce((s: number, f: any) => s + f.total_cost, 0),
|
|
total_estimated_workers: forecast.reduce((s: number, f: any) => s + f.estimated_workers, 0),
|
|
duration_ms: Date.now() - start,
|
|
});
|
|
} catch (e: any) {
|
|
return ok({ error: e.message, duration_ms: Date.now() - start });
|
|
}
|
|
}
|
|
|
|
// Predictive staffing forecast — aggregate demand inferred from
|
|
// recent Chicago permits, compared to our bench supply. Answers
|
|
// "what's coming in the next 30-60 days and can we cover it?"
|
|
// — the contextual-awareness dimension beyond retrospective rank.
|
|
if (url.pathname === "/intelligence/staffing_forecast" && req.method === "POST") {
|
|
const start = Date.now();
|
|
try {
|
|
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
|
|
// Last 30 days of permits — that's our forward demand window
|
|
const thirtyDaysAgo = new Date(Date.now() - 30 * 86400e3).toISOString().slice(0, 10);
|
|
const permits: any[] = await fetch(
|
|
`${permitUrl}?$select=work_type,reported_cost,issue_date`
|
|
+ `&$where=reported_cost>100000 AND issue_date>'${thirtyDaysAgo}'`
|
|
+ `&$limit=200`
|
|
).then(r => r.json()).catch(() => []);
|
|
|
|
// Construction heuristic: permit filing → construction start
|
|
// averages ~45 days. Staffing window opens 14 days before.
|
|
const typeToRole: Record<string, string> = {
|
|
"Electrical Work": "Electrician",
|
|
"Masonry Work": "Production Worker",
|
|
"Mechanical Work": "Maintenance Tech",
|
|
"Reroofing": "Production Worker",
|
|
"Plumbing Work": "Maintenance Tech",
|
|
};
|
|
|
|
// Aggregate demand by role
|
|
const demandByRole: Record<string, { permits: number; total_cost: number; est_workers: number; earliest_need: string }> = {};
|
|
for (const p of permits) {
|
|
const role = typeToRole[p.work_type || ""] || "Production Worker";
|
|
const cost = parseFloat(p.reported_cost || 0);
|
|
const workers = Math.max(2, Math.min(Math.round(cost / 150000), 8));
|
|
const issueDate = new Date(p.issue_date);
|
|
const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3); // 45d - 14d window
|
|
if (!demandByRole[role]) {
|
|
demandByRole[role] = { permits: 0, total_cost: 0, est_workers: 0,
|
|
earliest_need: stagingDate.toISOString().slice(0, 10) };
|
|
}
|
|
demandByRole[role].permits += 1;
|
|
demandByRole[role].total_cost += cost;
|
|
demandByRole[role].est_workers += workers;
|
|
const cur = new Date(demandByRole[role].earliest_need);
|
|
if (stagingDate < cur) demandByRole[role].earliest_need = stagingDate.toISOString().slice(0, 10);
|
|
}
|
|
|
|
// Bench supply in IL
|
|
const benchR = await api("POST", "/query/sql", {
|
|
sql: `SELECT role, COUNT(*) as total, `
|
|
+ `SUM(CASE WHEN CAST(availability AS DOUBLE) > 0.5 THEN 1 ELSE 0 END) as available, `
|
|
+ `SUM(CASE WHEN CAST(reliability AS DOUBLE) > 0.8 THEN 1 ELSE 0 END) as reliable `
|
|
+ `FROM workers_500k WHERE state = 'IL' `
|
|
+ `GROUP BY role`,
|
|
});
|
|
const bench: Record<string, any> = {};
|
|
for (const r of (benchR.rows || [])) bench[r.role] = r;
|
|
|
|
// Past playbook fill-speed + success signal per role
|
|
const playbookR = await api("POST", "/query/sql", {
|
|
sql: `SELECT operation, COUNT(*) as fills `
|
|
+ `FROM successful_playbooks_live `
|
|
+ `WHERE operation LIKE '%Chicago, IL%' `
|
|
+ `GROUP BY operation ORDER BY fills DESC LIMIT 20`,
|
|
});
|
|
const recentChicagoOps = playbookR.rows || [];
|
|
|
|
// Build forecast entries with risk flag
|
|
const forecast: any[] = [];
|
|
for (const [role, d] of Object.entries(demandByRole)) {
|
|
const b = bench[role] || { total: 0, available: 0, reliable: 0 };
|
|
const coverage = d.est_workers > 0 ? Math.round((b.available / d.est_workers) * 100) : 999;
|
|
const reliable_coverage = d.est_workers > 0 ? Math.round((b.reliable / d.est_workers) * 100) : 999;
|
|
let risk = "ok";
|
|
if (coverage < 100) risk = "critical";
|
|
else if (coverage < 300) risk = "tight";
|
|
else if (reliable_coverage < 200) risk = "watch";
|
|
// Days until earliest staffing deadline
|
|
const days_to_deadline = Math.round((new Date(d.earliest_need).getTime() - Date.now()) / 86400e3);
|
|
forecast.push({
|
|
role,
|
|
demand_permits: d.permits,
|
|
demand_workers: d.est_workers,
|
|
demand_total_cost: d.total_cost,
|
|
earliest_staffing_deadline: d.earliest_need,
|
|
days_to_deadline,
|
|
bench_total: b.total,
|
|
bench_available: b.available,
|
|
bench_reliable: b.reliable,
|
|
coverage_pct: Math.min(coverage, 9999),
|
|
reliable_coverage_pct: Math.min(reliable_coverage, 9999),
|
|
risk,
|
|
});
|
|
}
|
|
forecast.sort((a, b) => {
|
|
const order: Record<string, number> = { critical: 0, tight: 1, watch: 2, ok: 3 };
|
|
if (order[a.risk] !== order[b.risk]) return order[a.risk] - order[b.risk];
|
|
return a.days_to_deadline - b.days_to_deadline;
|
|
});
|
|
|
|
return ok({
|
|
generated_at: new Date().toISOString(),
|
|
window_days: 30,
|
|
permit_count: permits.length,
|
|
total_cost: permits.reduce((s, p) => s + parseFloat(p.reported_cost || 0), 0),
|
|
total_estimated_workers: forecast.reduce((s, f) => s + f.demand_workers, 0),
|
|
critical_roles: forecast.filter(f => f.risk === "critical").length,
|
|
tight_roles: forecast.filter(f => f.risk === "tight").length,
|
|
forecast,
|
|
recent_chicago_operations: recentChicagoOps,
|
|
duration_ms: Date.now() - start,
|
|
note: "Demand inferred from Chicago permit filings last 30 days. Construction starts ~45d after permit. Staffing window opens ~14d before construction. Supply = IL bench in workers_500k.",
|
|
});
|
|
} catch (e: any) {
|
|
return err(`staffing_forecast: ${e.message}`, 500);
|
|
}
|
|
}
|
|
|
|
// Intelligence: Chicago permits → assumed staffing contracts with
|
|
// Phase 19-ranked candidates and Path-2 discovered patterns. Each
|
|
// card pairs a REAL permit (live from data.cityofchicago.org) with
|
|
// a PROPOSED fill drawn from our 500K worker bench. Surfaces the
|
|
// meta-index dimension directly: "what past similar fills had in
|
|
// common" for this role + geo.
|
|
if (url.pathname === "/intelligence/permit_contracts" && req.method === "POST") {
|
|
const start = Date.now();
|
|
try {
|
|
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
|
|
// Recent + substantial permits only — skip tiny ones that
|
|
// don't imply real staffing demand.
|
|
const permits: any[] = await fetch(
|
|
`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date&`
|
|
+ `$where=reported_cost>250000 AND issue_date>'2025-06-01'`
|
|
+ `&$order=issue_date DESC&$limit=6`
|
|
).then(r => r.json()).catch(() => []);
|
|
|
|
const typeToRole: Record<string, string> = {
|
|
"Electrical Work": "Electrician",
|
|
"Masonry Work": "Production Worker",
|
|
"Mechanical Work": "Maintenance Tech",
|
|
"Reroofing": "Production Worker",
|
|
"Plumbing Work": "Maintenance Tech",
|
|
};
|
|
|
|
const contracts: any[] = [];
|
|
for (const p of permits) {
|
|
const cost = parseFloat(p.reported_cost || 0);
|
|
// Industry heuristic — one worker per $150K of permit value,
|
|
// capped at 8 per contract for staffing realism.
|
|
const count = Math.min(Math.max(Math.round(cost / 150000), 2), 8);
|
|
const role = typeToRole[p.work_type || ""] || "Production Worker";
|
|
const city = "Chicago";
|
|
const state = "IL";
|
|
|
|
// Phase 19 ranked candidates. Soft availability filter
|
|
// auto-applied by /search — this mirrors the real recruiter
|
|
// query path exactly. k=200 to ensure boost fires across
|
|
// the full memory surface (the embedding-discrimination
|
|
// narrowness means under-k silently misses endorsements).
|
|
const searchRes = await api("POST", "/vectors/hybrid", {
|
|
index_name: "workers_500k_v1",
|
|
filter_dataset: "workers_500k",
|
|
id_column: "worker_id",
|
|
sql_filter: `role = '${role}' AND state = '${state}' AND city = '${city}' AND CAST(availability AS DOUBLE) > 0.5`,
|
|
question: `${role} for ${p.work_type || "construction"} in ${city}`,
|
|
top_k: 5, generate: false,
|
|
use_playbook_memory: true, playbook_memory_k: 200,
|
|
}).catch(() => ({ sources: [] as any[] }));
|
|
|
|
// Path 2 — discovered patterns for this role in this city.
|
|
const patternRes = await api("POST", "/vectors/playbook_memory/patterns", {
|
|
query: `${role} in ${city}, ${state}`,
|
|
top_k_playbooks: 25,
|
|
min_trait_frequency: 0.3,
|
|
}).catch(() => ({} as any));
|
|
|
|
const sources = (searchRes.sources || []).slice(0, 5).map((s: any) => {
|
|
const name = String(s.chunk_text || "").split("—")[0]?.trim() || s.doc_id;
|
|
return {
|
|
doc_id: s.doc_id,
|
|
name,
|
|
score: s.score,
|
|
playbook_boost: s.playbook_boost || 0,
|
|
playbook_citations: s.playbook_citations || [],
|
|
};
|
|
});
|
|
|
|
// Timeline heuristic — permits filed now → construction
|
|
// starts ~45d later → staffing window opens ~14d before
|
|
// start. days_to_deadline is negative when we're past the
|
|
// window (fill urgency is imminent).
|
|
const issueDate = new Date(p.issue_date || Date.now());
|
|
const estStart = new Date(issueDate.getTime() + 45 * 86400e3);
|
|
const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3);
|
|
const daysToDeadline = Math.round((stagingDate.getTime() - Date.now()) / 86400e3);
|
|
let urgency = "scheduled";
|
|
if (daysToDeadline < 0) urgency = "overdue";
|
|
else if (daysToDeadline <= 7) urgency = "urgent";
|
|
else if (daysToDeadline <= 21) urgency = "soon";
|
|
else urgency = "scheduled";
|
|
|
|
contracts.push({
|
|
permit: {
|
|
cost,
|
|
work_type: p.work_type || "General construction",
|
|
description: (p.work_description || "").substring(0, 140),
|
|
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
|
|
community_area: p.community_area,
|
|
issue_date: (p.issue_date || "").substring(0, 10),
|
|
},
|
|
timeline: {
|
|
estimated_construction_start: estStart.toISOString().slice(0, 10),
|
|
staffing_window_opens: stagingDate.toISOString().slice(0, 10),
|
|
days_to_deadline: daysToDeadline,
|
|
urgency,
|
|
},
|
|
proposed: {
|
|
role,
|
|
count,
|
|
city, state,
|
|
pool_size: searchRes.sql_matches,
|
|
candidates: sources,
|
|
},
|
|
discovered_pattern: patternRes.discovered_pattern,
|
|
pattern_matched: patternRes.matched_playbooks ?? 0,
|
|
pattern_workers_examined: patternRes.total_workers_examined ?? 0,
|
|
});
|
|
}
|
|
|
|
return ok({
|
|
generated_at: new Date().toISOString(),
|
|
count: contracts.length,
|
|
contracts,
|
|
duration_ms: Date.now() - start,
|
|
note: "Live Chicago permits paired with workers_500k-ranked candidates and playbook_memory discovered patterns. The permit is real public data; the proposed fill is derived per industry heuristic (~$150K → 1 worker).",
|
|
});
|
|
} catch (e: any) {
|
|
return err(`permit_contracts: ${e.message}`, 500);
|
|
}
|
|
}
|
|
|
|
// Removed 2026-04-20: /intelligence/learn was a legacy CSV writer
|
|
// that destructively re-wrote successful_playbooks. /log and
|
|
// /log_failure replace it cleanly via /vectors/playbook_memory/seed
|
|
// and /mark_failed. Keeping the endpoint would only mislead
|
|
// future callers — dead code rots.
|
|
|
|
// Intelligence: Activity feed — what the system has learned
|
|
if (url.pathname === "/intelligence/activity" && req.method === "POST") {
|
|
const start = Date.now();
|
|
const [playbooksR, searchCountR, fillCountR, totalR] = await Promise.all([
|
|
api("POST", "/query/sql", { sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20" }).catch(() => ({ rows: [] })),
|
|
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'search:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })),
|
|
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'fill:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })),
|
|
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks" }).catch(() => ({ rows: [{ cnt: 0 }] })),
|
|
]);
|
|
// Extract learned patterns — which roles+cities get filled most
|
|
const patterns: Record<string, number> = {};
|
|
for (const p of (playbooksR.rows || [])) {
|
|
if (p.operation?.startsWith("fill:") || p.operation?.startsWith("search:")) {
|
|
const key = p.operation.replace(/^(fill|search): ?/, "").trim();
|
|
patterns[key] = (patterns[key] || 0) + 1;
|
|
}
|
|
}
|
|
return ok({
|
|
playbooks: playbooksR.rows || [],
|
|
search_count: searchCountR.rows?.[0]?.cnt || 0,
|
|
fill_count: fillCountR.rows?.[0]?.cnt || 0,
|
|
total_operations: totalR.rows?.[0]?.cnt || 0,
|
|
learned_patterns: Object.entries(patterns).map(([q, c]) => ({ query: q, times: c })).sort((a, b) => b.times - a.times),
|
|
duration_ms: Date.now() - start,
|
|
});
|
|
}
|
|
|
|
// Intelligence Brief — parallel analytics across 500K profiles
|
|
if (url.pathname === "/intelligence/brief" && req.method === "POST") {
|
|
const start = Date.now();
|
|
const [poolR, benchR, supplyR, gemsR, risksR, untappedR, archetypeR] = await Promise.all([
|
|
api("POST", "/query/sql", { sql: `SELECT COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.9 THEN 1 ELSE 0 END) elite, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN archetype='erratic' THEN 1 ELSE 0 END) erratic, SUM(CASE WHEN archetype='silent' THEN 1 ELSE 0 END) silent_cnt, SUM(CASE WHEN archetype='improving' THEN 1 ELSE 0 END) improving FROM workers_500k` }),
|
|
api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k GROUP BY state ORDER BY total DESC` }),
|
|
api("POST", "/query/sql", { sql: `SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY role ORDER BY supply DESC` }),
|
|
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, archetype, skills FROM workers_500k WHERE archetype='improving' AND CAST(reliability AS DOUBLE)>0.8 ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT 5` }),
|
|
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 5` }),
|
|
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(reliability AS DOUBLE),2) rel, skills, archetype FROM workers_500k WHERE CAST(availability AS DOUBLE)>0.8 AND CAST(reliability AS DOUBLE)>0.85 ORDER BY CAST(availability AS DOUBLE) DESC LIMIT 5` }),
|
|
api("POST", "/query/sql", { sql: `SELECT archetype, COUNT(*) cnt, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY archetype ORDER BY cnt DESC` }),
|
|
]);
|
|
return ok({
|
|
pool: poolR.rows?.[0] || {},
|
|
bench: benchR.rows || [],
|
|
supply: supplyR.rows || [],
|
|
gems: gemsR.rows || [],
|
|
risks: risksR.rows || [],
|
|
untapped: untappedR.rows || [],
|
|
archetypes: archetypeR.rows || [],
|
|
duration_ms: Date.now() - start,
|
|
});
|
|
}
|
|
|
|
// Intelligence Chat — natural language → routed queries → structured results
|
|
if (url.pathname === "/intelligence/chat" && req.method === "POST") {
|
|
const b = await json();
|
|
const q = (b.message || "").trim();
|
|
const lower = q.toLowerCase();
|
|
const start = Date.now();
|
|
const queries: string[] = [];
|
|
|
|
// Route 1: "Find someone like [Name]"
|
|
const likeMatch = q.match(/(?:like|similar to)\s+([A-Z][a-z]+(?:\s+[A-Z]\.?\s*)?(?:[A-Z][a-z]+)?)/i);
|
|
if (likeMatch) {
|
|
const name = likeMatch[1].trim();
|
|
queries.push(`SQL: Looking up ${name}'s profile`);
|
|
const profileR = await api("POST", "/query/sql", { sql: `SELECT * FROM workers_500k WHERE name LIKE '%${name.replace(/'/g,"''")}%' LIMIT 1` });
|
|
if (profileR.rows?.length) {
|
|
const worker = profileR.rows[0];
|
|
const stateMatch = lower.match(/\b(?:in|from)\s+([A-Z]{2})\b/i) || lower.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i);
|
|
const stateFilter = stateMatch ? `state = '${stateMatch[1].toUpperCase()}'` : `state != '${worker.state}'`;
|
|
queries.push(`Vector: Semantic similarity on ${worker.name}'s full profile → ${stateFilter}`);
|
|
const searchR = await api("POST", "/vectors/hybrid", {
|
|
question: worker.resume_text || `${worker.role} in ${worker.city} with skills ${worker.skills}`,
|
|
index_name: "workers_500k_v1",
|
|
sql_filter: stateFilter + ` AND CAST(reliability AS DOUBLE) >= 0.7`,
|
|
filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 5, generate: false,
|
|
});
|
|
return ok({ type: "similar", summary: `Found ${(searchR.sources||[]).length} workers similar to ${worker.name}${stateMatch ? ' in '+stateMatch[1].toUpperCase() : ' (other states)'}`,
|
|
source: { name: worker.name, role: worker.role, city: worker.city, state: worker.state, rel: worker.reliability, skills: worker.skills, archetype: worker.archetype },
|
|
results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })),
|
|
sql_matches: searchR.sql_matches, queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
return ok({ type: "error", summary: `Couldn't find "${name}" in the database. Try a full name.`, queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
|
|
// Route 2: "What if we lose"
|
|
if (/what if|lose|happens if/i.test(lower)) {
|
|
const roleMatch = lower.match(/(?:lose|lost?)\s+(?:our\s+)?(?:top\s+)?(\d+)?\s*(.+?)(?:\?|$)/i);
|
|
if (roleMatch) {
|
|
const count = parseInt(roleMatch[1]) || 5;
|
|
const subject = roleMatch[2].trim().replace(/\s*workers?\s*$/,'').replace(/s$/,'');
|
|
queries.push(`SQL: Top ${count} ${subject}s by reliability`);
|
|
const topR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, skills FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT ${count}` });
|
|
if (topR.rows?.length) {
|
|
const states = [...new Set(topR.rows.map((r:any) => r.state))];
|
|
queries.push(`SQL: Bench depth for ${subject}s in ${states.join(', ')}`);
|
|
const benchR = await api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' AND state IN (${states.map((s:string)=>`'${s}'`).join(',')}) GROUP BY state` });
|
|
const totalInRole = (benchR.rows||[]).reduce((s:number,r:any) => s + r.total, 0);
|
|
const reliableRemaining = (benchR.rows||[]).reduce((s:number,r:any) => s + r.reliable, 0) - topR.rows.length;
|
|
return ok({ type: "whatif", summary: `Impact: losing top ${topR.rows.length} ${subject} workers`,
|
|
lost: topR.rows, bench: benchR.rows||[], total_in_role: totalInRole, reliable_remaining: Math.max(0, reliableRemaining),
|
|
risk_level: reliableRemaining < count * 2 ? "HIGH" : reliableRemaining < count * 5 ? "MEDIUM" : "LOW",
|
|
queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
return ok({ type: "error", summary: `Couldn't find workers in the "${subject}" role. Try: welder, forklift operator, assembler, etc.`, queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
}
|
|
|
|
// Route 3: "Who could handle" — semantic role discovery
|
|
if (/could handle|capable of|suitable for|qualified for|try.*for|can do/i.test(lower)) {
|
|
const roleDesc = q.replace(/^.*?(?:handle|capable of|suitable for|qualified for|try\s+\w+\s+for|can do)\s*/i,'').replace(/\?$/,'').trim();
|
|
queries.push(`Vector: Semantic search for "${roleDesc}" — no exact role match needed`);
|
|
const searchR = await api("POST", "/vectors/hybrid", {
|
|
question: `Worker experienced in ${roleDesc}, relevant skills and certifications`,
|
|
index_name: "workers_500k_v1", sql_filter: "CAST(reliability AS DOUBLE) >= 0.75",
|
|
filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 8, generate: false,
|
|
});
|
|
return ok({ type: "discovery", summary: `${(searchR.sources||[]).length} workers found through semantic skill matching for: "${roleDesc}"`,
|
|
role_searched: roleDesc, results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })),
|
|
sql_matches: searchR.sql_matches,
|
|
note: "None of these workers have this exact role title. They were found because their skills, certifications, and experience are semantically similar. This is talent discovery — finding people for roles that don't exist in your database yet.",
|
|
queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
|
|
// Route 4: "Stop placing" / risk workers
|
|
if (/stop placing|worst|problem|flag|risk|underperform|fire|let go/i.test(lower)) {
|
|
queries.push("SQL: erratic/silent workers with reliability < 50%");
|
|
const riskR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 10` });
|
|
const countR = await api("POST", "/query/sql", { sql: `SELECT COUNT(*) cnt FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5` });
|
|
return ok({ type: "risk", summary: `${countR.rows?.[0]?.cnt || 0} workers flagged — showing the 10 lowest performers`,
|
|
results: riskR.rows||[], total_flagged: countR.rows?.[0]?.cnt || 0,
|
|
queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
|
|
// Route 5: Analytics / counts
|
|
if (/how many|count|total|percentage|average|breakdown/i.test(lower)) {
|
|
queries.push("RAG: analytical question → vector retrieval + LLM reasoning");
|
|
const ragR = await api("POST", "/vectors/rag", { index_name: "workers_500k_v1", question: q, top_k: 3 });
|
|
return ok({ type: "answer", summary: ragR.answer || "Couldn't determine the answer from the data",
|
|
sources: (ragR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, text: s.chunk_text, score: s.score })),
|
|
queries_run: queries, duration_ms: Date.now() - start });
|
|
}
|
|
|
|
// Default: smart search — extract role, location, availability from natural language
|
|
{
|
|
const filters: string[] = ["CAST(reliability AS DOUBLE) >= 0.5"];
|
|
const understood: string[] = [];
|
|
|
|
// Extract role keywords
|
|
const roleKeywords: Record<string, string> = {
|
|
"warehouse": "warehouse", "forklift": "forklift", "welder": "weld", "assembler": "assembl",
|
|
"loader": "loader", "machine operator": "machine operator", "shipping": "shipping",
|
|
"quality": "quality", "maintenance": "maintenance", "production": "production",
|
|
"material handler": "material handler", "sanitation": "sanitation", "inventory": "inventory",
|
|
"line lead": "line lead", "electrician": "electric", "packaging": "packaging",
|
|
"tool and die": "tool", "logistics": "logistics", "safety": "safety", "cnc": "cnc",
|
|
};
|
|
for (const [kw, sqlPart] of Object.entries(roleKeywords)) {
|
|
if (lower.includes(kw)) { filters.push(`LOWER(role) LIKE '%${sqlPart}%'`); understood.push(`role: ${kw}`); break; }
|
|
}
|
|
|
|
// Extract city
|
|
const cities = ["chicago","springfield","rockford","peoria","joliet","indianapolis","fort wayne",
|
|
"evansville","south bend","columbus","cleveland","cincinnati","dayton","akron","toledo",
|
|
"st. louis","st louis","kansas city","nashville","memphis","knoxville","louisville","lexington",
|
|
"milwaukee","madison","detroit","grand rapids","lansing","des moines","minneapolis","terre haute",
|
|
"bloomington","decatur","mattoon","galesburg","danville","champaign"];
|
|
for (const city of cities) {
|
|
if (lower.includes(city)) {
|
|
const sqlCity = city.split(' ').map(w => w[0].toUpperCase() + w.slice(1)).join(' ');
|
|
filters.push(`city = '${sqlCity}'`);
|
|
understood.push(`city: ${sqlCity}`);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Extract state
|
|
const stateNames: Record<string, string> = {
|
|
"illinois":"IL","indiana":"IN","ohio":"OH","missouri":"MO","tennessee":"TN",
|
|
"kentucky":"KY","wisconsin":"WI","michigan":"MI","iowa":"IA","minnesota":"MN"
|
|
};
|
|
const stateMatch = lower.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i);
|
|
if (stateMatch && !understood.some(u => u.startsWith('city'))) {
|
|
filters.push(`state = '${stateMatch[1].toUpperCase()}'`);
|
|
understood.push(`state: ${stateMatch[1].toUpperCase()}`);
|
|
} else {
|
|
for (const [name, abbr] of Object.entries(stateNames)) {
|
|
if (lower.includes(name)) { filters.push(`state = '${abbr}'`); understood.push(`state: ${abbr}`); break; }
|
|
}
|
|
}
|
|
|
|
// Extract availability
|
|
if (/available|open|ready|today|now|immediate|asap|right away/i.test(lower)) {
|
|
filters.push("CAST(availability AS DOUBLE) > 0.5");
|
|
understood.push("available now");
|
|
}
|
|
|
|
// Extract reliability preference
|
|
if (/reliable|dependable|best|top|trusted|proven/i.test(lower)) {
|
|
filters[0] = "CAST(reliability AS DOUBLE) >= 0.8";
|
|
understood.push("high reliability");
|
|
}
|
|
|
|
const filterStr = filters.join(" AND ");
|
|
queries.push("Smart parse: " + (understood.length ? understood.join(", ") : "general search"));
|
|
queries.push("SQL filter: " + filterStr);
|
|
queries.push("Vector: semantic search for best skill match");
|
|
|
|
// Also run a direct SQL query to get exact counts and zip codes
|
|
const sqlFields = "name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, skills, certifications, archetype";
|
|
const directSql = `SELECT ${sqlFields} FROM workers_500k WHERE ${filterStr} ORDER BY CAST(availability AS DOUBLE) DESC, CAST(reliability AS DOUBLE) DESC LIMIT 10`;
|
|
|
|
// Derive role+geo for the pattern query so the meta-index
|
|
// surface lines up with what the user actually asked for.
|
|
const roleForPatterns = understood.find(u => u.startsWith('role:'))?.split(': ')[1] || q;
|
|
const cityForPatterns = understood.find(u => u.startsWith('city:'))?.split(': ')[1] || 'Chicago';
|
|
const stateForPatterns = understood.find(u => u.startsWith('state:'))?.split(': ')[1] || 'IL';
|
|
|
|
const [searchR, directR, patternR] = await Promise.all([
|
|
api("POST", "/vectors/hybrid", {
|
|
question: q, index_name: "workers_500k_v1", sql_filter: filterStr,
|
|
filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 8, generate: false,
|
|
// k=200 to catch compounding — direct measurement shows
|
|
// boost reliably fires only when ~all memory is scanned
|
|
// due to the narrow 0.55-0.67 cosine band in the 768d
|
|
// nomic-embed-text space. Brute force at 200 entries
|
|
// is sub-ms; no reason to underscan.
|
|
use_playbook_memory: true, playbook_memory_k: 200,
|
|
}),
|
|
api("POST", "/query/sql", { sql: directSql }),
|
|
api("POST", "/vectors/playbook_memory/patterns", {
|
|
query: `${roleForPatterns} in ${cityForPatterns}, ${stateForPatterns}`,
|
|
top_k_playbooks: 25, min_trait_frequency: 0.3,
|
|
}).catch(() => ({})),
|
|
]);
|
|
|
|
// Merge: use SQL results for structured data (zip, avail), vector for ranking
|
|
const sqlWorkers = directR.rows || [];
|
|
const vectorWorkers = (searchR.sources || []).map((s: any) => ({
|
|
doc_id: s.doc_id, score: s.score, text: s.chunk_text,
|
|
playbook_boost: s.playbook_boost || 0,
|
|
playbook_citations: s.playbook_citations || [],
|
|
}));
|
|
|
|
return ok({
|
|
type: "smart_search",
|
|
summary: `Found ${searchR.sql_matches || 0} workers matching your criteria${understood.length ? ' (' + understood.join(', ') + ')' : ''}`,
|
|
understood,
|
|
sql_results: sqlWorkers,
|
|
vector_results: vectorWorkers,
|
|
sql_matches: searchR.sql_matches,
|
|
queries_run: queries,
|
|
duration_ms: Date.now() - start,
|
|
// Meta-index signal — what similar past fills had in common.
|
|
// Non-empty when memory has ≥1 relevant playbook.
|
|
discovered_pattern: (patternR as any)?.discovered_pattern,
|
|
pattern_playbooks_matched: (patternR as any)?.matched_playbooks ?? 0,
|
|
});
|
|
}
|
|
}
|
|
|
|
activeTrace = null;
|
|
return err("Unknown path. Available: / /health /search /sql /match /worker/:id /ask /log /playbooks /profile/:id /vram /context /verify /simulation/run /console /intelligence/brief /intelligence/chat", 404);
|
|
} catch (e: any) {
|
|
if (activeTrace) { scoreTrace(activeTrace, "error", 0, e.message); }
|
|
activeTrace = null;
|
|
return err(e.message || String(e), 500);
|
|
} finally {
|
|
// Flush traces async — don't block the response
|
|
flushTraces().catch(() => {});
|
|
activeTrace = null;
|
|
}
|
|
},
|
|
});
|
|
|
|
console.error(`Lakehouse Agent Gateway :${PORT} → ${BASE}`);
|
|
}
|
|
|
|
main().catch(console.error);
|
|
|
|
// ─── Week simulation engine ───
|
|
|
|
const ROLES = ["Forklift Operator","Machine Operator","Assembler","Loader","Quality Tech","Welder","Sanitation Worker","Shipping Clerk","Production Worker","Maintenance Tech"];
|
|
const STATES = ["IL","IN","OH","MO","TN","KY","WI","MI"];
|
|
const CITIES: Record<string, string[]> = {
|
|
IL: ["Chicago","Springfield","Rockford","Peoria","Joliet"],
|
|
IN: ["Indianapolis","Fort Wayne","Evansville","South Bend"],
|
|
OH: ["Columbus","Cleveland","Cincinnati","Dayton"],
|
|
MO: ["St. Louis","Kansas City","Springfield"],
|
|
TN: ["Nashville","Memphis"], KY: ["Louisville","Lexington"],
|
|
WI: ["Milwaukee","Madison"], MI: ["Detroit","Grand Rapids"],
|
|
};
|
|
const CLIENT_PREFIXES = ["Midwest","Great Lakes","Prairie","Heartland","Summit","Valley","Central","Lakeside","Tri-State","Heritage","National","Premier","Metro","Capitol","Crossroads","Keystone","Riverfront","Gateway","Pinnacle","Cornerstone"];
|
|
const CLIENT_SUFFIXES = ["Logistics","Manufacturing","Assembly","Foods","Steel","Packaging","Health","Plastics","Energy","Solutions","Distribution","Services","Industries","Supply","Warehousing","Materials","Products","Corp","Group","Enterprises"];
|
|
function makeClient(): string { return pick(CLIENT_PREFIXES) + " " + pick(CLIENT_SUFFIXES); }
|
|
const STARTS = ["5:00 AM","6:00 AM","6:30 AM","7:00 AM","7:30 AM","8:00 AM"];
|
|
|
|
// Diverse scenarios — each tells a different story about WHY this contract exists
|
|
const SCENARIOS = [
|
|
// URGENT — real emergencies that need immediate action
|
|
{ priority: "urgent", weight: 8, note: "Worker walked off the job at 3 PM yesterday — client needs replacement by morning",
|
|
situation: "walkoff", action: "Replacement needed ASAP — previous worker quit mid-shift" },
|
|
{ priority: "urgent", weight: 5, note: "Client emailed at 11 PM — their regular crew has COVID exposure, entire team quarantined",
|
|
situation: "quarantine", action: "Full crew replacement — health emergency at job site" },
|
|
{ priority: "urgent", weight: 5, note: "2 no-shows this morning — client is short-staffed on the floor right now",
|
|
situation: "noshow", action: "Immediate backfill — client waiting on the phone" },
|
|
|
|
// HIGH — important but not crisis
|
|
{ priority: "high", weight: 10, note: "New contract starting Monday — client wants to meet workers this week",
|
|
situation: "new_client", action: "New client onboarding — first impression matters" },
|
|
{ priority: "high", weight: 8, note: "Client expanding to 2nd shift — need additional crew by next week",
|
|
situation: "expansion", action: "Growth opportunity — client adding a shift" },
|
|
{ priority: "high", weight: 6, note: "Worker's OSHA certification expires Friday — need certified replacement lined up",
|
|
situation: "cert_expiry", action: "Cert compliance — current worker can't continue without renewal" },
|
|
{ priority: "high", weight: 5, note: "Client requested specific workers back from last month's project",
|
|
situation: "client_request", action: "Client relationship — they asked for specific people" },
|
|
|
|
// MEDIUM — standard day-to-day operations
|
|
{ priority: "medium", weight: 15, note: "Ongoing weekly fill — same client, same role, reliable pipeline",
|
|
situation: "recurring", action: "Recurring contract — steady work" },
|
|
{ priority: "medium", weight: 12, note: "Seasonal uptick — warehouse volume increasing ahead of holidays",
|
|
situation: "seasonal", action: "Seasonal planning — volume ramping up" },
|
|
{ priority: "medium", weight: 10, note: "Backfill for worker on approved medical leave — returns in 3 weeks",
|
|
situation: "medical_leave", action: "Temporary coverage — worker returning soon" },
|
|
{ priority: "medium", weight: 8, note: "Client testing new role — wants to try 2 workers for a week before committing",
|
|
situation: "trial", action: "Trial placement — client evaluating the role" },
|
|
{ priority: "medium", weight: 6, note: "Cross-training opportunity — client wants workers who can learn a new skill",
|
|
situation: "cross_train", action: "Development opportunity — workers can learn new skills" },
|
|
|
|
// LOW — planning ahead
|
|
{ priority: "low", weight: 10, note: "Future fill — project starts in 2 weeks, gathering candidates now",
|
|
situation: "future", action: "Pipeline building — no rush, quality over speed" },
|
|
{ priority: "low", weight: 8, note: "Client exploring staffing options — not committed yet, just want to see who's available",
|
|
situation: "exploratory", action: "Exploratory — client shopping, impress them with quality" },
|
|
{ priority: "low", weight: 5, note: "Internal transfer — moving a worker from one site to another, need replacement at original",
|
|
situation: "transfer", action: "Planned transition — smooth handoff between sites" },
|
|
];
|
|
|
|
function pick<T>(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; }
|
|
|
|
// ─── Client-blacklist persistence (feature #2) ──────────────────────────
|
|
// Simple JSON file under mcp-server/data/. Synchronous writes are fine
|
|
// at the expected rate (a handful of blacklist adds per day).
|
|
const BLACKLIST_PATH = `${import.meta.dir}/data/client_blacklists.json`;
|
|
|
|
interface BlacklistEntry {
|
|
worker_id: string;
|
|
name: string;
|
|
reason: string;
|
|
added_at: string;
|
|
}
|
|
|
|
async function loadAllBlacklists(): Promise<Record<string, BlacklistEntry[]>> {
|
|
try {
|
|
const f = Bun.file(BLACKLIST_PATH);
|
|
if (!(await f.exists())) return {};
|
|
return await f.json() as Record<string, BlacklistEntry[]>;
|
|
} catch { return {}; }
|
|
}
|
|
async function saveAllBlacklists(all: Record<string, BlacklistEntry[]>): Promise<void> {
|
|
await Bun.write(BLACKLIST_PATH, JSON.stringify(all, null, 2));
|
|
}
|
|
async function loadClientBlacklist(client: string): Promise<BlacklistEntry[]> {
|
|
const all = await loadAllBlacklists();
|
|
return all[client] || [];
|
|
}
|
|
async function addToClientBlacklist(client: string, entry: BlacklistEntry): Promise<BlacklistEntry[]> {
|
|
const all = await loadAllBlacklists();
|
|
const list = all[client] || [];
|
|
// De-dupe: same worker_id replaces prior entry with fresher reason.
|
|
const filtered = list.filter(e => e.worker_id !== entry.worker_id);
|
|
filtered.push(entry);
|
|
all[client] = filtered;
|
|
await saveAllBlacklists(all);
|
|
return filtered;
|
|
}
|
|
async function removeFromClientBlacklist(client: string, worker_id: string): Promise<{ removed: boolean; total: number }> {
|
|
const all = await loadAllBlacklists();
|
|
const list = all[client] || [];
|
|
const filtered = list.filter(e => e.worker_id !== worker_id);
|
|
const removed = filtered.length < list.length;
|
|
all[client] = filtered;
|
|
await saveAllBlacklists(all);
|
|
return { removed, total: filtered.length };
|
|
}
|
|
|
|
// Seed playbook_memory from a filled contract so the next hybrid query
|
|
// ranks against it. Used by both runWeekSimulation (per-day) and the /log
|
|
// endpoint (per manual logging). Fail-soft — seeding is best-effort.
|
|
async function seedPlaybookFromContract(c: any) {
|
|
const names = (c.matches || []).slice(0, 5)
|
|
.map((m: any) => m.name || m.doc_id)
|
|
.filter((n: string) => n && !n.startsWith("W500-"));
|
|
if (!names.length) return;
|
|
const op = `fill: ${c.role} x${c.headcount} in ${c.city}, ${c.state}`;
|
|
try {
|
|
await api("POST", "/vectors/playbook_memory/seed", {
|
|
operation: op,
|
|
approach: `${c.situation || c.priority || "fill"} → hybrid search`,
|
|
context: `client=${c.client || ""} start=${c.start || ""}`,
|
|
endorsed_names: names,
|
|
append: true,
|
|
});
|
|
} catch {}
|
|
}
|
|
|
|
async function runWeekSimulation() {
|
|
const days = ["Monday","Tuesday","Wednesday","Thursday","Friday"];
|
|
const staffers = ["Sarah (Lead)","Mike (Senior)","Kim (Junior)"];
|
|
const results: any[] = [];
|
|
let totalFilled = 0, totalNeeded = 0, emergencies = 0, handoffs = 0, playbookEntries = 0;
|
|
|
|
for (let d = 0; d < days.length; d++) {
|
|
const dayLabel = days[d];
|
|
const numContracts = 4 + Math.floor(Math.random() * 5); // 4-8 per day
|
|
const contracts: any[] = [];
|
|
const staffer = staffers[d % staffers.length];
|
|
const handoffTo = staffers[(d + 1) % staffers.length];
|
|
|
|
for (let c = 0; c < numContracts; c++) {
|
|
const state = pick(STATES);
|
|
const city = pick(CITIES[state] || [state]);
|
|
const role = pick(ROLES);
|
|
// Weighted scenario selection
|
|
const totalWeight = SCENARIOS.reduce((s, sc) => s + sc.weight, 0);
|
|
let r = Math.random() * totalWeight;
|
|
let scenario = SCENARIOS[0];
|
|
for (const sc of SCENARIOS) { r -= sc.weight; if (r <= 0) { scenario = sc; break; } }
|
|
const priority = scenario.priority;
|
|
const headcount = priority === "urgent" ? 3 + Math.floor(Math.random() * 4) :
|
|
priority === "high" ? 2 + Math.floor(Math.random() * 3) :
|
|
priority === "medium" ? 2 + Math.floor(Math.random() * 3) :
|
|
1 + Math.floor(Math.random() * 2);
|
|
const minRel = priority === "urgent" ? 0.6 : priority === "high" ? 0.75 : 0.8;
|
|
const cid = `W${d+1}-${String(c+1).padStart(3,"0")}`;
|
|
|
|
if (priority === "urgent") emergencies++;
|
|
totalNeeded += headcount;
|
|
|
|
// Run hybrid search — Phase 19: boost on so past playbooks shape ranking
|
|
let filled = 0;
|
|
let matches: any[] = [];
|
|
try {
|
|
const filt = `role = '${role}' AND state = '${state}' AND reliability >= ${minRel}`;
|
|
const r = await api("POST", "/vectors/hybrid", {
|
|
question: `Find ${role} workers in ${city}, ${state} for ${scenario.situation}`,
|
|
index_name: "workers_500k_v1",
|
|
sql_filter: filt,
|
|
filter_dataset: "ethereal_workers",
|
|
id_column: "worker_id",
|
|
top_k: headcount + 2,
|
|
generate: false,
|
|
use_playbook_memory: true,
|
|
});
|
|
matches = (r.sources || []).slice(0, headcount).map((s: any) => ({
|
|
doc_id: s.doc_id,
|
|
name: s.chunk_text?.split("—")[0]?.trim() || s.doc_id,
|
|
score: s.score,
|
|
chunk_text: s.chunk_text || "",
|
|
playbook_boost: s.playbook_boost || 0,
|
|
playbook_citations: s.playbook_citations || [],
|
|
}));
|
|
filled = matches.length;
|
|
} catch {}
|
|
totalFilled += Math.min(filled, headcount);
|
|
|
|
contracts.push({
|
|
id: cid, client: makeClient(), role, state, city,
|
|
headcount, filled: Math.min(filled, headcount), priority,
|
|
start: pick(STARTS), notes: scenario.note, situation: scenario.situation,
|
|
action: scenario.action, matches,
|
|
staffer, handoff_to: d < 4 ? handoffTo : null,
|
|
});
|
|
}
|
|
|
|
// End of day: seed playbook_memory with TODAY's filled contracts so
|
|
// tomorrow's hybrid search ranks against them. This is the in-week
|
|
// feedback loop — without this, day 5 doesn't benefit from day 1.
|
|
for (const c of contracts) {
|
|
if (c.matches && c.matches.length) {
|
|
await seedPlaybookFromContract(c).catch(() => {});
|
|
}
|
|
}
|
|
|
|
if (d < 4) {
|
|
handoffs++;
|
|
try {
|
|
await api("POST", "/api/ingest/file?name=successful_playbooks", null); // just trigger
|
|
} catch {}
|
|
}
|
|
playbookEntries++;
|
|
|
|
results.push({
|
|
label: dayLabel,
|
|
staffer,
|
|
handoff_to: d < 4 ? handoffTo : null,
|
|
contracts,
|
|
filled: contracts.reduce((s: number, c: any) => s + c.filled, 0),
|
|
needed: contracts.reduce((s: number, c: any) => s + c.headcount, 0),
|
|
});
|
|
}
|
|
|
|
const summary = {
|
|
total_contracts: results.reduce((s, d) => s + d.contracts.length, 0),
|
|
total_needed: totalNeeded,
|
|
total_filled: totalFilled,
|
|
fill_pct: Math.round(totalFilled / Math.max(totalNeeded, 1) * 100),
|
|
emergencies,
|
|
handoffs,
|
|
playbook_entries: playbookEntries,
|
|
};
|
|
|
|
// BUG FIX 2026-04-20: previously this POSTed a multi-row CSV to
|
|
// /ingest/file?name=successful_playbooks at end of every simulation.
|
|
// That endpoint REPLACES the dataset's object list — so each
|
|
// /simulation/run wiped the prior simulation's rows. The SQL
|
|
// successful_playbooks table was never accumulating; it always reflected
|
|
// only the most-recent simulation batch.
|
|
//
|
|
// Per-day per-contract seeding via /vectors/playbook_memory/seed
|
|
// (added Pass 1, runs inside the day loop above) is the path that
|
|
// actually accumulates feedback. The SQL successful_playbooks table is
|
|
// intentionally not written by /simulation/run anymore until a proper
|
|
// append surface exists.
|
|
|
|
return { days: results, summary };
|
|
}
|