root 9b8befaa94 demo: profiler — scrolling ticker basket with live prices + click-to-filter
J asked: "kind of like a scrolling ticker that has all of the companies
and their stock prices and where they fit in the map." Implemented as
a horizontal-scroll strip at the top of /profiler:

  9 public issuers in this view · quotes via Stooq · 669ms
  ┌────┬────┬────┬────┬────┬────┬────┐
  │TGT │JPM │BALY│ACRE│FCBC│NREF│LSBK│ ← live price + day-change per
  │129 │311 │... │... │... │... │... │   ticker, color-banded by
  │+.17│+1.5│... │... │... │... │... │   attribution kind
  └────┴────┴────┴────┴────┴────┴────┘

Each card carries:
  - ticker + live price + day-change % (red/green)
  - attribution count + kind (exact / direct / parent / associated)
  - left bar color = strongest attribution kind (green for direct
    issuer, amber for parent, blue for co-permit associated, gradient
    when both direct and associated apply)
  - tooltip on hover lists the contractors attributed to this ticker
  - click toggles a filter on the table below — clicking TGT cuts the
    200-row list down to just TARGET CORPORATION + TORNOW, KYLE F
    (Target's primary co-permit contractor)

Server-side:
- entity.ts exports fetchStooqQuote (was internal)
- new POST /intelligence/ticker_quotes — accepts {tickers: [...]},
  fans out to Stooq.us in parallel, returns
  {ticker, price, price_date, open, high, low, day_change_pct,
   stooq_url} per symbol or null for non-US listings (HOC.DE, SKA-B.ST,
   LLC.AX). Capped at 50 symbols per call.

Front-end:
- mcp-server/profiler.html — new .basket-wrap section above the
  controls. buildBasket() runs after profiler_index loads:
    1. Aggregates unique tickers from .tickers.direct + .associated
       across all surfaced contractors
    2. Renders shells immediately (ticker symbol + "—" placeholder)
    3. Batch-fetches quotes via /intelligence/ticker_quotes
    4. Updates each card with price + day-change in place
  Click on a card sets a tickerFilter; render() skips rows whose
  attributions don't include that ticker. "clear filter" button on
  the basket strip resets it.

Verified end-to-end on devop.live/lakehouse/profiler:
  Default load → 9 issuers, live prices populated in 669ms
  TGT click   → table filters to TARGET CORPORATION + TORNOW, KYLE F
                (the contractor who runs 3 of Target's recent permits
                gets the TGT correlation indicator)
  JPM card    → $311.63, +1.55% — JPMorgan-adjacent contractors
  Tooltip     → list of contractors attributed to the ticker
2026-04-27 22:19:26 -05:00

3113 lines
161 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Lakehouse MCP Server — bridges local LLMs to the data substrate.
*
* Tools:
* - search_workers: hybrid SQL+vector (the core fix)
* - query_sql: analytical SQL on any dataset
* - match_contract: find workers for a job order
* - get_worker: single worker by ID
* - rag_question: full RAG pipeline
* - log_success: record what worked → playbook DB
* - get_playbooks: retrieve past successes
* - swap_profile: hot-swap model + data context
* - vram_status: GPU introspection
*/
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { z } from "zod";
import { startTrace, logSpan, logGeneration, scoreTrace, flush as flushTraces } from "./tracing.js";
import { buildPermitBrief } from "./entity.js";
const BASE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const PORT = parseInt(process.env.MCP_PORT || "3700");
// ─── Staffer roster — used by the per-staffer hot-swap index (G). ────
//
// J's vision: each staffer has their own molded view of the corpus.
// When Maria searches, the system surfaces *Maria's* prior fills and
// her territory's playbooks first. When Aisha searches, the same
// corpus gets re-shaped to her geo and recent activity. This is what
// generic CRM fast-search can't do: a relevance gradient that
// compounds with each staffer's own signal.
//
// First implementation is geography-based — each staffer has a primary
// state and a list of cities they recruit for. Playbook queries get
// scoped to that territory when staffer_id is provided. As the system
// accumulates per-staffer signal (call_log assignments, email threads,
// SMS history), the scope expands beyond geography.
//
// Adding a staffer: append to this list. The /api/staffers endpoint
// exposes the public-safe fields to the UI dropdown.
const STAFFERS: Array<{
id: string;
name: string;
display: string;
territory: { state: string; cities: string[] };
greeting: string;
}> = [
{
id: "maria",
name: "Maria",
display: "Maria · Chicago coordinator",
territory: { state: "IL", cities: ["Chicago", "Joliet", "Rockford", "Peoria", "Springfield", "Decatur"] },
greeting: "Maria's territory: Illinois warehouse + manufacturing fills",
},
{
id: "devon",
name: "Devon",
display: "Devon · Indiana coordinator",
territory: { state: "IN", cities: ["Indianapolis", "Fort Wayne", "South Bend", "Evansville", "Bloomington", "Terre Haute"] },
greeting: "Devon's territory: Indiana production + assembly fills",
},
{
id: "aisha",
name: "Aisha",
display: "Aisha · Wisconsin/Michigan coordinator",
territory: { state: "WI", cities: ["Milwaukee", "Madison", "Green Bay", "Detroit", "Grand Rapids", "Lansing"] },
greeting: "Aisha's territory: Wisconsin + Michigan logistics",
},
];
function lookupStaffer(id: string | undefined): typeof STAFFERS[number] | null {
if (!id) return null;
return STAFFERS.find((s) => s.id === id) || null;
}
const MODE = process.env.MCP_TRANSPORT || "http"; // "stdio" or "http"
// Active trace for the current request — set per-request in the HTTP handler
let activeTrace: ReturnType<typeof startTrace> | null = null;
async function api(method: string, path: string, body?: any, retries = 2) {
for (let attempt = 0; attempt <= retries; attempt++) {
try {
const t0 = Date.now();
const resp = await fetch(`${BASE}${path}`, {
method,
headers: body ? { "Content-Type": "application/json" } : {},
body: body ? JSON.stringify(body) : undefined,
});
const text = await resp.text();
const ms = Date.now() - t0;
let parsed: any;
try { parsed = JSON.parse(text); } catch { parsed = { raw: text, status: resp.status }; }
// Trace the call if we have an active trace. Pre-existing edit had
// this block at module scope, dangling after the closing brace of
// api() — parsed broken until fixed 2026-04-24.
if (activeTrace) {
const isGen = path.includes("/generate");
if (isGen) {
logGeneration(activeTrace, `lakehouse${path}`, {
model: body?.model || "unknown",
prompt: typeof body?.prompt === "string" ? body.prompt.slice(0, 500) : JSON.stringify(body).slice(0, 300),
completion: typeof parsed?.text === "string" ? parsed.text.slice(0, 500) : JSON.stringify(parsed).slice(0, 300),
duration_ms: ms,
tokens_in: parsed?.prompt_eval_count,
tokens_out: parsed?.eval_count,
});
} else {
logSpan(activeTrace, `lakehouse${path}`, body, {
rows: parsed?.row_count, sources: parsed?.sources?.length,
sql_matches: parsed?.sql_matches, method: parsed?.method,
}, ms);
}
}
return parsed;
} catch (e: any) {
if (attempt === retries) throw e;
if (e.message?.includes("socket connection was closed") || e.message?.includes("ECONNREFUSED")) {
await Bun.sleep(500 * (attempt + 1));
continue;
}
throw e;
}
}
throw new Error("unreachable");
}
const server = new McpServer({ name: "lakehouse", version: "1.0.0" });
server.tool(
"search_workers",
"Hybrid SQL+vector search. SQL ensures structural accuracy (role, state, reliability), vector ranks by semantic relevance. Every result is verified against the golden dataset.",
{
question: z.string().describe("Natural language question about workers"),
sql_filter: z.string().optional().describe("SQL WHERE clause, e.g. \"role = 'Forklift Operator' AND state = 'IL' AND reliability > 0.8\""),
dataset: z.string().default("ethereal_workers"),
id_column: z.string().default("worker_id"),
top_k: z.number().default(5),
},
async ({ question, sql_filter, dataset, id_column, top_k }) => {
const body: any = {
question, index_name: "workers_500k_v1", filter_dataset: dataset, id_column, top_k, generate: true,
use_playbook_memory: true,
};
if (sql_filter) body.sql_filter = sql_filter;
const r = await api("POST", "/vectors/hybrid", body);
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
},
);
server.tool(
"query_sql",
"Run SQL against any lakehouse dataset. Tables: ethereal_workers (10K), candidates (1K), workers_500k (500K), timesheets (1M), call_log (800K), email_log (500K), placements (50K), job_orders (15K), clients (2K).",
{ sql: z.string().describe("SQL query") },
async ({ sql }) => {
const r = await api("POST", "/query/sql", { sql });
if (r.error) return { content: [{ type: "text" as const, text: `SQL Error: ${r.error}` }] };
return { content: [{ type: "text" as const, text: `${r.row_count} rows:\n${JSON.stringify(r.rows?.slice(0, 20), null, 2)}` }] };
},
);
server.tool(
"match_contract",
"Find qualified workers for a staffing contract. SQL-verified matches ranked by semantic fit.",
{
role: z.string(), state: z.string(), city: z.string().optional(),
min_reliability: z.number().default(0.7),
required_certs: z.array(z.string()).default([]),
headcount: z.number().default(5),
},
async ({ role, state, city, min_reliability, required_certs, headcount }) => {
let filter = `role = '${role}' AND state = '${state}' AND reliability >= ${min_reliability}`;
if (city) filter += ` AND city = '${city}'`;
const r = await api("POST", "/vectors/hybrid", {
question: `Find the best ${role} workers with relevant skills and certifications`,
index_name: "workers_500k_v1", sql_filter: filter,
filter_dataset: "ethereal_workers", id_column: "worker_id",
top_k: headcount * 2, generate: false,
use_playbook_memory: true,
});
let matches = r.sources || [];
if (required_certs.length > 0) {
const req = new Set(required_certs.map((c: string) => c.toLowerCase()));
matches = matches.filter((m: any) => {
const certs = (m.chunk_text || "").toLowerCase();
return [...req].every(c => certs.includes(c));
});
}
return { content: [{ type: "text" as const, text: JSON.stringify({
contract: { role, state, city, min_reliability, required_certs },
matches: matches.slice(0, headcount), total_sql: r.sql_matches, method: r.method,
}, null, 2) }] };
},
);
server.tool(
"get_worker",
"Fetch one worker profile by ID — all fields including scores and comms.",
{ worker_id: z.number() },
async ({ worker_id }) => {
const r = await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${worker_id}` });
if (!r.rows?.length) return { content: [{ type: "text" as const, text: `Worker ${worker_id} not found` }] };
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows[0], null, 2) }] };
},
);
server.tool(
"rag_question",
"Natural language question answered via RAG (embed → search → retrieve → generate). For open-ended questions where SQL alone isn't enough.",
{ question: z.string(), index: z.string().default("workers_500k_v1"), top_k: z.number().default(5) },
async ({ question, index, top_k }) => {
const r = await api("POST", "/vectors/rag", { index_name: index, question, top_k });
return { content: [{ type: "text" as const, text: r.error ? `RAG Error: ${r.error}` : `Answer: ${r.answer}\n\nSources: ${r.sources?.length || 0}` }] };
},
);
server.tool(
"log_success",
"Record a successful operation to the playbook database. Small models query this later to learn what worked.",
{
operation: z.string().describe("What was done"),
approach: z.string().describe("How it was done"),
result: z.string().describe("Outcome"),
context: z.string().optional(),
},
async ({ operation, approach, result, context }) => {
const csv = `timestamp,operation,approach,result,context\n"${new Date().toISOString()}","${operation.replace(/"/g, '""')}","${approach.replace(/"/g, '""')}","${result.replace(/"/g, '""')}","${(context||"").replace(/"/g, '""')}"`;
const form = new FormData();
form.append("file", new Blob([csv], { type: "text/csv" }), "playbook.csv");
const resp = await fetch(`${BASE}/ingest/file?name=successful_playbooks`, { method: "POST", body: form });
return { content: [{ type: "text" as const, text: `Logged: ${await resp.text()}` }] };
},
);
server.tool(
"get_playbooks",
"Retrieve past successful operations. Small models use this to learn what approaches worked.",
{ keyword: z.string().optional(), limit: z.number().default(10) },
async ({ keyword, limit }) => {
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
if (keyword) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${keyword}%' OR approach LIKE '%${keyword}%' ORDER BY timestamp DESC LIMIT ${limit}`;
const r = await api("POST", "/query/sql", { sql });
if (r.error) return { content: [{ type: "text" as const, text: "No playbooks yet — log some successful operations first!" }] };
return { content: [{ type: "text" as const, text: JSON.stringify(r.rows, null, 2) }] };
},
);
server.tool(
"swap_profile",
"Hot-swap model profile. Changes Ollama model in VRAM + bound datasets. 'agent-parquet' = HNSW (fast), 'agent-lance' = IVF_PQ (scalable).",
{ profile_id: z.string() },
async ({ profile_id }) => {
const r = await api("POST", `/vectors/profile/${profile_id}/activate`);
return { content: [{ type: "text" as const, text: JSON.stringify({
profile: r.profile_id, model: r.ollama_name,
indexes: r.indexes_warmed?.length, vectors: r.total_vectors,
previous: r.previous_profile, duration: r.duration_secs,
}, null, 2) }] };
},
);
server.tool(
"vram_status",
"GPU VRAM usage + loaded Ollama models. Check before swapping profiles.",
{},
async () => {
const r = await api("GET", "/ai/vram");
return { content: [{ type: "text" as const, text: JSON.stringify(r, null, 2) }] };
},
);
// Resources — these give any MCP client full context about the system
server.resource("lakehouse://system", "lakehouse://system", async (uri) => {
const health = await api("GET", "/health");
const datasets = await api("GET", "/catalog/datasets") as any[];
const indexes = await api("GET", "/vectors/indexes") as any[];
const vram = await api("GET", "/ai/vram");
const agent = await api("GET", "/vectors/agent/status");
const buckets = await api("GET", "/storage/buckets");
const text = `# Lakehouse System Status
## Health: ${health === "lakehouse ok" ? "OK" : JSON.stringify(health)}
## Datasets (${datasets.length})
${datasets.map((d: any) => `- ${d.name}: ${d.row_count || "?"} rows`).join("\n")}
## Vector Indexes (${indexes.length})
${(indexes as any[]).map((i: any) => `- ${i.index_name}: ${i.chunk_count} chunks (${i.vector_backend || "parquet"})`).join("\n")}
## GPU
- Used: ${vram?.gpu?.used_mib || "?"}/${vram?.gpu?.total_mib || "?"} MiB
- Models loaded: ${(vram?.ollama_loaded || []).map((m: any) => m.name).join(", ") || "none"}
## Autotune Agent
- Running: ${agent?.running}, Trials: ${agent?.trials_run}, Promotions: ${agent?.promotions}
## Buckets (${(buckets as any[])?.length || 0})
${(buckets as any[] || []).map((b: any) => `- ${b.name}: ${b.backend} (${b.reachable ? "reachable" : "DOWN"})`).join("\n")}
## Services
- Lakehouse Gateway: :3100
- AI Sidecar: :3200
- Agent Gateway: :3700
- Langfuse: :3001
- MinIO S3: :9000
- Ollama: :11434
## Available Models
- qwen3: 8.2B, 40K context, thinking+tools (best for reasoning)
- qwen2.5: 7B, 8K context (best for fast SQL generation)
- mistral: 7B, 8K context (general generation)
- nomic-embed-text: 137M (embedding, automatic)
`;
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
});
server.resource("lakehouse://architecture", "lakehouse://architecture", async (uri) => {
// Read the PRD directly
const prd = await Bun.file("/home/profit/lakehouse/docs/PRD.md").text().catch(() => "PRD not found");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: prd }] };
});
server.resource("lakehouse://instructions", "lakehouse://instructions", async (uri) => {
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "Instructions not found");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: instructions }] };
});
server.resource("lakehouse://playbooks", "lakehouse://playbooks", async (uri) => {
const r = await api("POST", "/query/sql", {
sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20"
});
const rows = r?.rows || [];
const text = rows.length === 0
? "No playbooks yet. Log successful operations with the log_success tool."
: rows.map((p: any) => `## ${p.operation}\n- Approach: ${p.approach}\n- Result: ${p.result}\n- Context: ${p.context || "—"}\n`).join("\n");
return { contents: [{ uri: uri.href, mimeType: "text/markdown", text: `# Successful Playbooks\n\n${text}` }] };
});
server.resource("lakehouse://datasets", "lakehouse://datasets", async (uri) => {
const r = await api("GET", "/catalog/datasets") as any[];
const text = r.map(d => `${d.name}: ${d.row_count || "?"} rows`).join("\n");
return { contents: [{ uri: uri.href, mimeType: "text/plain", text }] };
});
// ─── Dual mode: stdio (Claude Code) or HTTP (internal agents) ───
async function main() {
if (MODE === "stdio") {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error(`Lakehouse MCP (stdio) → ${BASE}`);
return;
}
// HTTP mode — a REST gateway that internal agents call directly.
// No MCP protocol complexity for consumers — just POST JSON, get JSON.
// The MCP tool definitions above are reused for the stdio path; this
// HTTP path wraps the same lakehouse API with agent-friendly routing.
Bun.serve({
port: PORT,
async fetch(req) {
const url = new URL(req.url);
const json = async () => req.method === "POST" ? await req.json() : {};
// CORS — dashboard runs in the browser, gateway is a different origin
const cors = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
};
if (req.method === "OPTIONS") return new Response(null, { status: 204, headers: cors });
const ok = (data: any) => Response.json(data, { headers: cors });
const err = (msg: string, status = 400) => Response.json({ error: msg }, { status, headers: cors });
try {
// Health — no trace needed
if (url.pathname === "/health") return ok({ status: "ok", lakehouse: BASE, tools: 11 });
// Start a Langfuse trace for every non-static request
if (req.method === "POST" || !["/","/dashboard","/dashboard.css","/dashboard.ts","/dashboard.js"].includes(url.pathname)) {
activeTrace = startTrace(`gw:${url.pathname}`, { method: req.method, path: url.pathname });
}
// Self-orientation: any agent calls this first to understand the system
if (url.pathname === "/context") {
const instructions = await Bun.file("/home/profit/lakehouse/mcp-server/AGENT_INSTRUCTIONS.md").text().catch(() => "");
const datasets = await api("GET", "/catalog/datasets") as any[];
const indexes = await api("GET", "/vectors/indexes") as any[];
const vram = await api("GET", "/ai/vram");
return ok({
system: "Lakehouse Staffing Co-Pilot",
purpose: "AI anticipates staffing coordinator needs — pre-matches workers to contracts, surfaces alerts, builds playbooks from successful operations",
instructions: instructions.slice(0, 3000),
datasets: (datasets || []).map((d: any) => ({ name: d.name, rows: d.row_count })),
indexes: (indexes || []).map((i: any) => ({ name: i.index_name, chunks: i.chunk_count, backend: i.vector_backend })),
models: { qwen3: "8.2B reasoning+tools", qwen2_5: "7B fast SQL", mistral: "7B generation", nomic: "137M embedding" },
vram: vram?.gpu,
tools: ["/search","/sql","/match","/worker/:id","/ask","/log","/playbooks","/profile/:id","/vram","/context","/verify"],
rules: [
"Never hallucinate — only state facts from tool responses",
"SQL for counts/aggregations, hybrid /search for matching",
"Log every successful operation to /log",
"Check /playbooks before complex tasks",
"Verify worker details via /worker/:id before communicating",
],
});
}
// Verification endpoint — agent can check any claim against SQL
if (url.pathname === "/verify") {
const b = await json();
// b.claim: "worker 4925 is a Forklift Operator in IL with reliability 0.82"
// b.worker_id: 4925
// b.checks: { role: "Forklift Operator", state: "IL", reliability: 0.82 }
if (!b.worker_id) return err("worker_id required");
const r = await api("POST", "/query/sql", {
sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${b.worker_id}`
});
const worker = r?.rows?.[0];
if (!worker) return ok({ verified: false, reason: `worker ${b.worker_id} not found` });
const checks = b.checks || {};
const failures: string[] = [];
for (const [field, expected] of Object.entries(checks)) {
const actual = worker[field];
if (actual === undefined) continue;
if (typeof expected === "number") {
if (Math.abs(Number(actual) - expected) > 0.05) {
failures.push(`${field}: claimed=${expected} actual=${actual}`);
}
} else if (String(actual).toLowerCase() !== String(expected).toLowerCase()) {
failures.push(`${field}: claimed=${expected} actual=${actual}`);
}
}
return ok({
verified: failures.length === 0,
worker_id: b.worker_id,
worker_name: worker.name,
failures,
actual: worker,
});
}
// Tool: hybrid search
// ─── Client blacklists (feature #2) ───────────────────────────
// Per-client worker exclusion list. A worker blacklisted for
// client X is hidden from /search and /match when the caller
// passes `client: "X"`. Persisted to local JSON so it survives
// Bun restarts. This is a trust-critical feature — if the
// system recommends a worker the client already flagged, the
// system's credibility is gone.
if (url.pathname.startsWith("/clients/") && url.pathname.includes("/blacklist")) {
const m = url.pathname.match(/^\/clients\/([^\/]+)\/blacklist\/?(.*)$/);
if (m) {
const client = decodeURIComponent(m[1]);
const suffix = m[2]; // empty, or a worker_id to delete
if (req.method === "GET") {
const list = await loadClientBlacklist(client);
return ok({ client, entries: list });
}
if (req.method === "POST" && !suffix) {
const b = await json();
if (!b.worker_id) return err("worker_id required", 400);
const entry = {
worker_id: String(b.worker_id),
name: b.name || "",
reason: b.reason || "",
added_at: new Date().toISOString(),
};
const list = await addToClientBlacklist(client, entry);
return ok({ client, added: entry, total: list.length });
}
if (req.method === "DELETE" && suffix) {
const worker_id = decodeURIComponent(suffix);
const { removed, total } = await removeFromClientBlacklist(client, worker_id);
return ok({ client, removed, total });
}
return err(`unsupported method ${req.method} for blacklist`, 405);
}
}
if (url.pathname === "/search") {
const b = await json();
// Availability soft-filter: if the caller didn't constrain
// availability and isn't explicitly opting out, auto-append
// `availability > 0.5`. Recruiters calling this route expect
// "available workers" by default; surfacing someone who's on
// an active placement breaks trust on the first call.
let filter = b.sql_filter as (string | undefined);
const optOut = b.include_unavailable === true;
if (!optOut && filter && !/availability/i.test(filter)) {
filter = `(${filter}) AND CAST(availability AS DOUBLE) > 0.5`;
}
// Client blacklist filter: if caller passes `client`, exclude
// worker_ids that client has flagged. One SQL expression
// added, no extra round-trip needed by the caller.
if (b.client && filter) {
const bl = await loadClientBlacklist(String(b.client));
const ids = bl.map(e => e.worker_id).filter(x => /^\d+$/.test(x));
if (ids.length > 0) {
filter = `(${filter}) AND worker_id NOT IN (${ids.join(",")})`;
}
}
const hybridRes = await api("POST", "/vectors/hybrid", {
question: b.question, index_name: b.index || "workers_500k_v1",
sql_filter: filter, filter_dataset: b.dataset || "ethereal_workers",
id_column: b.id_column || "worker_id", top_k: b.top_k || 5, generate: b.generate !== false,
use_playbook_memory: b.use_playbook_memory !== false,
playbook_memory_k: b.playbook_memory_k ?? 200,
});
// Rate enrichment + optional max_pay_rate filter (soft filter,
// preserves result shape). Operator can opt out by omitting.
if (hybridRes && Array.isArray(hybridRes.sources)) {
enrichWithRates(hybridRes.sources);
if (typeof b.max_pay_rate === "number" && b.max_pay_rate > 0) {
const before = hybridRes.sources.length;
hybridRes.sources = hybridRes.sources.filter((s: any) => s.implied_pay_rate <= b.max_pay_rate);
(hybridRes as any).pay_rate_filtered_out = before - hybridRes.sources.length;
}
}
return ok(hybridRes);
}
// Tool: SQL
if (url.pathname === "/sql") {
const b = await json();
return ok(await api("POST", "/query/sql", { sql: b.sql }));
}
// Tool: match contract
if (url.pathname === "/match") {
const b = await json();
let filter = `role = '${b.role}' AND state = '${b.state}' AND reliability >= ${b.min_reliability || 0.7}`;
if (b.city) filter += ` AND city = '${b.city}'`;
return ok(await api("POST", "/vectors/hybrid", {
question: `Best ${b.role} workers with relevant skills`,
index_name: b.index || "workers_500k_v1", sql_filter: filter,
filter_dataset: b.dataset || "ethereal_workers",
id_column: "worker_id", top_k: (b.headcount || 5) * 2, generate: false,
use_playbook_memory: true,
playbook_memory_k: 200,
}));
}
// Tool: get worker
if (url.pathname.startsWith("/worker/")) {
const id = url.pathname.split("/")[2];
return ok(await api("POST", "/query/sql", { sql: `SELECT * FROM ethereal_workers WHERE worker_id = ${id}` }));
}
// Tool: RAG
if (url.pathname === "/ask") {
const b = await json();
return ok(await api("POST", "/vectors/rag", { index_name: b.index || "workers_500k_v1", question: b.question, top_k: b.top_k || 5 }));
}
// Tool: log success.
//
// BUG FIX 2026-04-20: previously this also POSTed a 1-row CSV to
// /ingest/file?name=successful_playbooks. That endpoint REPLACES
// the dataset's object list rather than appending — so every /log
// call destroyed all prior rows in the SQL-queryable
// successful_playbooks table. Chain-of-custody trace caught it:
// sp_rows went 33 → 1 in a single /log call.
//
// Until a proper append endpoint exists (Phase 8 delta write
// surface for the SQL table), /log writes ONLY to playbook_memory
// (in-memory append-only store, works correctly for boost). The
// SQL successful_playbooks table is now treated as derived state
// that gets rebuilt explicitly via /vectors/playbook_memory/rebuild
// — never written to by the recruiter path.
if (url.pathname === "/log") {
const b = await json();
// Result format expected: "{filled}/{needed} filled → Name1, Name2, Name3"
const result = String(b.result || "");
const arrowIdx = result.indexOf("→");
const namesPart = arrowIdx >= 0 ? result.slice(arrowIdx + 1) : "";
const rawEndorsed = namesPart.split(",").map(s => s.trim()).filter(Boolean);
// Parse the contract's (city, state) from operation. Seed is
// keyed by (city, state, name) so validation must match those
// coordinates, not just the name.
const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/);
const city = opMatch ? opMatch[1].trim() : "";
const state = opMatch ? opMatch[2].trim() : "";
// Ghost-name guard — /log previously accepted any endorsed
// names without verification. Those ghosts landed in
// playbook_memory, grew the entry count, but boost silently
// never fired because no real worker chunk ever matched the
// stored (city, state, name) tuple. Real-test on 2026-04-20
// surfaced this. Validate against workers_500k before seeding.
let endorsed: string[] = rawEndorsed;
let rejected: string[] = [];
if (rawEndorsed.length && city && state) {
const quoted = rawEndorsed.map(n => `'${n.replace(/'/g, "''")}'`).join(",");
const sql = `SELECT DISTINCT name FROM workers_500k `
+ `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' `
+ `AND state = '${state.replace(/'/g,"''")}'`;
const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any;
const found = new Set((vr.rows ?? []).map((r: any) => r.name));
endorsed = rawEndorsed.filter(n => found.has(n));
rejected = rawEndorsed.filter(n => !found.has(n));
}
let seeded = 0;
let persisted_rows = 0;
if (endorsed.length && /fill:.+ in .+,.+/i.test(String(b.operation || ""))) {
const canonicalApproach = `${(b.approach || "manual log").split(/[\.\n]/)[0]}`.slice(0, 80);
const canonicalContext = `${(b.context || "").split(/[\.\n]/)[0]}`.slice(0, 80);
const seedRes = await api("POST", "/vectors/playbook_memory/seed", {
operation: b.operation,
approach: canonicalApproach,
context: canonicalContext,
endorsed_names: endorsed,
append: true,
}).catch(() => null) as any;
if (seedRes && seedRes.playbook_id) {
seeded = endorsed.length;
const pr = await api("POST", "/vectors/playbook_memory/persist_sql", {}).catch(() => null) as any;
if (pr && typeof pr.rows_persisted === "number") persisted_rows = pr.rows_persisted;
}
}
return ok({
logged: true,
seeded,
persisted_to_sql: persisted_rows,
rejected_ghost_names: rejected,
note: rejected.length
? `${rejected.length} endorsed name(s) not found in workers_500k for ${city}, ${state} — skipped seeding to prevent silent boost failure.`
: "successful_playbooks_live is the SQL surface for live operator activity. /log is non-destructive and name-validated.",
});
}
// Tool: log FAILED fill — negative signal for Phase 19 boost.
// Workers named here get a 0.5^n penalty on future positive
// boosts in the same (city, state). Three failures effectively
// zero the boost; five make the worker invisible to the re-rank.
// Names are validated against workers_500k same as /log.
if (url.pathname === "/log_failure") {
const b = await json();
const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/);
const city = opMatch ? opMatch[1].trim() : "";
const state = opMatch ? opMatch[2].trim() : "";
const rawNames: string[] = Array.isArray(b.failed_names) ? b.failed_names : [];
if (!city || !state) {
return err("operation must be 'fill: Role xN in City, ST'", 400);
}
if (rawNames.length === 0) return err("failed_names must be a non-empty array", 400);
const quoted = rawNames.map((n: string) => `'${n.replace(/'/g, "''")}'`).join(",");
const sql = `SELECT DISTINCT name FROM workers_500k `
+ `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' `
+ `AND state = '${state.replace(/'/g,"''")}'`;
const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any;
const found = new Set((vr.rows ?? []).map((r: any) => r.name));
const failed_names = rawNames.filter((n: string) => found.has(n));
const rejected = rawNames.filter((n: string) => !found.has(n));
if (failed_names.length === 0) {
return ok({ marked: 0, rejected_ghost_names: rejected,
note: "no failed_names matched workers_500k for this geo" });
}
const mr = await api("POST", "/vectors/playbook_memory/mark_failed", {
operation: b.operation,
failed_names,
reason: b.reason || "",
});
return ok({
marked: mr?.added ?? 0,
rejected_ghost_names: rejected,
city, state,
note: `Each marked worker's positive boost in ${city}, ${state} is halved per recorded failure.`,
});
}
// Tool: get playbooks
if (url.pathname === "/playbooks") {
const kw = url.searchParams.get("keyword");
const limit = url.searchParams.get("limit") || "10";
let sql = `SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT ${limit}`;
if (kw) sql = `SELECT * FROM successful_playbooks WHERE operation LIKE '%${kw}%' OR approach LIKE '%${kw}%' ORDER BY timestamp DESC LIMIT ${limit}`;
const r = await api("POST", "/query/sql", { sql });
return ok(r.error ? { playbooks: [], note: "No playbooks yet" } : { playbooks: r.rows });
}
// Tool: swap profile
if (url.pathname.startsWith("/profile/")) {
const id = url.pathname.split("/")[2];
return ok(await api("POST", `/vectors/profile/${id}/activate`));
}
// Tool: VRAM
if (url.pathname === "/vram") return ok(await api("GET", "/ai/vram"));
// Pass-through to lakehouse for anything else
if (url.pathname.startsWith("/api/")) {
const path = url.pathname.replace("/api", "");
const body = req.method !== "GET" ? await req.text() : undefined;
const r = await fetch(`${BASE}${path}`, { method: req.method, headers: { "Content-Type": "application/json" }, body });
return new Response(await r.text(), { status: r.status, headers: { "Content-Type": "application/json" } });
}
// Proof — narrative HTML served from mcp-server/proof.html.
// Live tests consumed client-side via /proof.json.
if (url.pathname === "/proof") {
return new Response(Bun.file(import.meta.dir + "/proof.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
// Spec — technical specification / README-equivalent document.
// Long-form architecture doc: folder layout, ingest pipeline,
// scale story, error surfaces, per-staffer context, a day in
// the life. Intended for a skeptical reader who needs to
// dispute or reproduce what the system claims to do.
if (url.pathname === "/spec") {
return new Response(Bun.file(import.meta.dir + "/spec.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
// Onboard — client-facing ingest wizard. Upload any CSV, preview
// columns + PII + sample rows, commit via /ingest/file. Works
// with a shipped sample roster so anyone can trial the flow
// without real client data.
if (url.pathname === "/onboard") {
return new Response(Bun.file(import.meta.dir + "/onboard.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
// Workspaces — per-contract state (Phase 8.5). UI layer over the
// gateway's /workspaces/* routes: list, create, detail, handoff,
// save-search, shortlist, log-activity. All persisted on the
// Rust side; this page is a pure viewer + editor.
if (url.pathname === "/workspaces") {
return new Response(Bun.file(import.meta.dir + "/workspaces.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
// Alerts — push/daemon settings page + config API + test-fire.
if (url.pathname === "/alerts") {
return new Response(Bun.file(import.meta.dir + "/alerts.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
if (url.pathname === "/alerts/config") {
if (req.method === "GET") {
const cfg = await loadAlertsConfig();
const state = await loadAlertsState();
return ok({ config: cfg, state: { last_run_at: state.last_run_at } });
}
if (req.method === "POST") {
const b = await json();
const prev = await loadAlertsConfig();
const next: AlertsConfig = {
enabled: b.enabled ?? prev.enabled,
interval_minutes: Math.max(1, Number(b.interval_minutes ?? prev.interval_minutes)),
webhook_url: typeof b.webhook_url === "string" ? b.webhook_url.trim() || undefined : prev.webhook_url,
webhook_label: typeof b.webhook_label === "string" ? b.webhook_label : prev.webhook_label,
deadline_warn_days: Math.max(1, Number(b.deadline_warn_days ?? prev.deadline_warn_days)),
};
await saveAlertsConfig(next);
return ok({ saved: true, config: next,
note: "Interval change requires server restart to apply. Current running interval unchanged this cycle." });
}
}
if (url.pathname === "/alerts/fire" && req.method === "POST") {
const cfg = await loadAlertsConfig();
const d = await buildDigest();
if (!d) return ok({ fired: false, reason: "no events since last run" });
const res = await dispatchDigest(d, cfg);
return ok({ fired: true, channels: res.channels, errors: res.errors, digest: d });
}
if (url.pathname === "/alerts/recent" && req.method === "GET") {
const f = Bun.file(ALERTS_LOG_PATH);
if (!(await f.exists())) return ok({ entries: [] });
const text = await f.text();
const lines = text.split("\n").filter(l => l.trim());
const last = lines.slice(-10).reverse();
const entries: any[] = [];
for (const l of last) { try { entries.push(JSON.parse(l)); } catch {} }
return ok({ entries });
}
// Onboard ingest — forwards multipart/form-data correctly to
// the Rust gateway /ingest/file. The generic /api/* passthrough
// can't handle multipart because it reads as text and forwards
// as JSON, losing the boundary. This route preserves the body
// and Content-Type.
if (url.pathname === "/onboard/ingest" && req.method === "POST") {
const name = url.searchParams.get("name");
if (!name || !/^[a-z][a-z0-9_]*$/.test(name)) {
return err("dataset name required (lowercase+underscores)", 400);
}
const contentType = req.headers.get("content-type") || "";
const upstream = await fetch(`${BASE}/ingest/file?name=${encodeURIComponent(name)}`, {
method: "POST",
headers: { "Content-Type": contentType },
body: await req.arrayBuffer(),
});
const body = await upstream.text();
return new Response(body, {
status: upstream.status,
headers: { ...cors, "Content-Type": upstream.headers.get("content-type") || "application/json" },
});
}
// Sample CSV — generated fresh on every request so content-hash
// dedup on the ingest side always sees a new payload (two uploads
// in a row would otherwise be a no-op). Each generation has
// unique worker_ids (timestamp-prefixed), randomized names + roles
// + geos from realistic pools, and a random size (~120-180 rows)
// so the demo looks different every time and numbers actually
// update visibly in the dashboard after onboarding.
if (url.pathname.startsWith("/samples/")) {
const name = url.pathname.slice("/samples/".length);
if (!/^[a-zA-Z0-9_\-\.]+\.csv$/.test(name)) {
return err("invalid sample filename", 400);
}
if (name === "staffing_roster_sample.csv") {
const csv = generateSampleRosterCSV();
return new Response(csv, {
headers: {
...cors,
"Content-Type": "text/csv",
"Content-Disposition": `attachment; filename="${name}"`,
"Cache-Control": "no-store",
},
});
}
// Other sample filenames fall through to the static dir
const path = `${import.meta.dir}/samples/${name}`;
const file = Bun.file(path);
if (!(await file.exists())) return err("sample not found", 404);
return new Response(file, {
headers: { ...cors, "Content-Type": "text/csv",
"Content-Disposition": `attachment; filename="${name}"` },
});
}
// System-wide scale summary — truthful numbers for the UI.
// Pulls row counts via SQL (COUNT(*) from parquet footers) for
// the key datasets rather than trusting catalog manifests, which
// can go stale when data changes without re-registering. The
// workers_500k manifest is correct (500K); candidates manifest
// lied (said 100K, actual 1K) — the audit caught it.
// Everything else uses manifest row_count since it's O(1).
// Phase 24 refinement — unified memory query endpoint. Accepts
// any input (natural language, structured JSON, mixed) via
// POST body {input: <anything>}. Normalizer handles the shape.
// Returns a single bundle with every memory surface relevant:
// playbook workers, KB recommendation, neighbor signatures,
// prior lessons, top staffers, discovered patterns.
if (url.pathname === "/memory/query" && req.method === "POST") {
try {
const body: any = await req.json();
const { queryMemory } = await import("../tests/multi-agent/memory_query.ts");
const result = await queryMemory(body.input ?? body);
return ok(result);
} catch (e) {
return new Response(JSON.stringify({ error: (e as Error).message }), {
status: 500,
headers: { "content-type": "application/json" },
});
}
}
// Batch ticker quotes — used by the profiler page's scrolling
// ticker basket. Stooq's HTTP CSV API is single-symbol per call,
// so this fans out N tickers in parallel and returns a flat
// map. Non-US tickers (HOC.DE, SKA-B.ST, LLC.AX) won't have a
// Stooq.us entry; we surface that as null so the UI can render
// them with a "—" placeholder.
if (url.pathname === "/intelligence/ticker_quotes" && req.method === "POST") {
const start = Date.now();
const b = await json();
const tickers: string[] = Array.isArray(b.tickers) ? b.tickers.map((t: any) => String(t).toUpperCase()).filter(Boolean) : [];
if (!tickers.length) return ok({ quotes: {}, duration_ms: 0 });
const { fetchStooqQuote } = await import("./entity.js");
const dedup = Array.from(new Set(tickers)).slice(0, 50);
const results = await Promise.all(dedup.map(async (t) => {
// Skip non-US suffixes — Stooq.us won't have them
if (t.includes(".")) return [t, null] as const;
try {
const q = await fetchStooqQuote(t);
if (!q || !q.price) return [t, null] as const;
const change_pct = q.open && q.open > 0 ? ((q.price - q.open) / q.open) * 100 : null;
return [t, {
ticker: t,
price: q.price,
price_date: q.price_date,
open: q.open,
high: q.high,
low: q.low,
day_change_pct: change_pct,
stooq_url: `https://stooq.com/q/?s=${t.toLowerCase()}.us`,
}] as const;
} catch {
return [t, null] as const;
}
}));
const quotes: Record<string, any> = {};
for (const [t, q] of results) quotes[t] = q;
return ok({ quotes, count: dedup.length, duration_ms: Date.now() - start });
}
// Profiler index — directory of every contractor that has filed
// a Chicago permit recently, ranked by permit count + total
// cost. Each name in the response links to the full /contractor
// profile page. Answers J's ask: "a profiler index that shows
// a history of everyone." Pulled live from Socrata; the
// count/cost aggregations let the staffer see who's actually
// active vs one-off LLCs.
if (url.pathname === "/intelligence/profiler_index" && req.method === "POST") {
const start = Date.now();
const b = await json();
const sinceDate = String(b.since || "2025-06-01");
const minCost = Math.max(0, Number(b.min_cost) || 100000);
const limit = Math.max(1, Math.min(500, Number(b.limit) || 200));
const search = String(b.search || "").trim();
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
// Group by contact_1_name AND by contact_2_name in two
// queries, then merge — Socrata's GROUP BY only takes one
// expression and we want both contractor slots.
const buildQuery = (col: string) => {
const where = [
`reported_cost>${minCost}`,
`issue_date>'${sinceDate.replace(/'/g, "")}'`,
`${col} IS NOT NULL`,
];
if (search) {
const s = search.replace(/'/g, "''").toUpperCase();
where.push(`upper(${col}) like '%${s}%'`);
}
return `${permitUrl}?$select=${col} AS name,count(*) as cnt,sum(reported_cost) as total_cost,max(issue_date) as last_filed&`
+ `$where=${encodeURIComponent(where.join(" AND "))}`
+ `&$group=${col}&$order=total_cost DESC&$limit=${limit}`;
};
// Co-occurrence query — pulls the contact_1+contact_2 pairs in
// the window so we can attribute tickers across associations
// ("Bob's Electric works for Target → show TGT"). Capped 5K
// permits, ~200ms cold; resolved tickers are in-memory after.
const buildCoQuery = () => {
const where = [
`reported_cost>${minCost}`,
`issue_date>'${sinceDate.replace(/'/g, "")}'`,
"contact_1_name IS NOT NULL",
"contact_2_name IS NOT NULL",
];
if (search) {
const s = search.replace(/'/g, "''").toUpperCase();
where.push(`(upper(contact_1_name) like '%${s}%' OR upper(contact_2_name) like '%${s}%')`);
}
return `${permitUrl}?$select=contact_1_name,contact_2_name`
+ `&$where=${encodeURIComponent(where.join(" AND "))}`
+ `&$limit=5000`;
};
try {
const [byC1, byC2, coRows] = await Promise.all([
fetch(buildQuery("contact_1_name")).then((r) => r.json()).catch(() => []),
fetch(buildQuery("contact_2_name")).then((r) => r.json()).catch(() => []),
fetch(buildCoQuery()).then((r) => r.json()).catch(() => []),
]);
const merged: Record<string, { name: string; permits: number; total_cost: number; last_filed: string; roles: Set<string> }> = {};
const consume = (rows: any[], role: string) => {
for (const r of rows || []) {
const n = (r.name || "").trim();
if (!n) continue;
const cnt = parseInt(r.cnt, 10) || 0;
const cost = parseFloat(r.total_cost || "0") || 0;
const last = r.last_filed || "";
const key = n.toUpperCase();
if (!merged[key]) merged[key] = { name: n, permits: 0, total_cost: 0, last_filed: "", roles: new Set() };
merged[key].permits += cnt;
merged[key].total_cost += cost;
if (last > merged[key].last_filed) merged[key].last_filed = last;
merged[key].roles.add(role);
}
};
consume(byC1, "applicant");
consume(byC2, "contractor");
// Build co-occurrence map from the permit pairs. For each
// contractor key, count how many permits they co-appeared
// on with each other party.
const coMap: Record<string, Record<string, number>> = {};
for (const r of (Array.isArray(coRows) ? coRows : []) as any[]) {
const a = String(r.contact_1_name || "").trim();
const b = String(r.contact_2_name || "").trim();
if (!a || !b) continue;
const ka = a.toUpperCase();
const kb = b.toUpperCase();
if (ka === kb) continue;
(coMap[ka] = coMap[ka] || {})[kb] = (coMap[ka][kb] || 0) + 1;
(coMap[kb] = coMap[kb] || {})[ka] = (coMap[kb][ka] || 0) + 1;
}
// Attach tickers per contractor — direct, parent, and any
// tickers attributable to top co-occurring partners ("works
// with TARGET CORPORATION → TGT shown as associated"). Resolves
// via the in-memory SEC tickers index + curated parent map,
// so the cost is per-name index-lookup, not a network call.
const { lookupTickerLite } = await import("./entity.js");
// Memoize per-name to skip duplicate lookups across the
// associated step.
const tickerCache: Record<string, any[]> = {};
const lookupCached = async (n: string) => {
const k = n.toUpperCase();
if (tickerCache[k]) return tickerCache[k];
tickerCache[k] = await lookupTickerLite(n);
return tickerCache[k];
};
const rowsBase = Object.values(merged)
.map((r) => ({ ...r, roles: Array.from(r.roles) }))
.sort((a, b) => b.total_cost - a.total_cost)
.slice(0, limit);
// Resolve tickers concurrently (in-memory ops, but Promise.all
// keeps the code uniform with future remote ticker sources).
const enriched = await Promise.all(rowsBase.map(async (r) => {
const direct = await lookupCached(r.name);
const partners = Object.entries(coMap[r.name.toUpperCase()] || {})
.sort((a, b) => b[1] - a[1])
.slice(0, 6);
const associated: any[] = [];
const seen = new Set(direct.map((t: any) => t.ticker));
for (const [partnerName, occurrences] of partners) {
const ts = await lookupCached(partnerName);
for (const t of ts) {
if (seen.has(t.ticker)) continue;
seen.add(t.ticker);
associated.push({
ticker: t.ticker,
via: "associated",
partner_name: partnerName,
co_permits: occurrences,
partner_via: t.via,
matched_name: t.matched_name,
});
if (associated.length >= 5) break;
}
if (associated.length >= 5) break;
}
return { ...r, tickers: { direct, associated } };
}));
return ok({
count: enriched.length,
since: sinceDate,
min_cost: minCost,
search,
contractors: enriched,
duration_ms: Date.now() - start,
});
} catch (e: any) {
return err(`profiler_index: ${e.message}`, 500);
}
}
// Staffer roster — read by the UI dropdown so each coordinator
// can act under their own identity (per-staffer hot-swap index).
if (url.pathname === "/api/staffers" || url.pathname === "/staffers") {
return ok({
staffers: STAFFERS.map((s) => ({
id: s.id,
name: s.name,
display: s.display,
territory: s.territory,
greeting: s.greeting,
})),
});
}
if (url.pathname === "/system/summary") {
const [ds, indexes, workersCount, candsCount] = await Promise.all([
api("GET", "/catalog/datasets").catch(() => [] as any),
api("GET", "/vectors/indexes").catch(() => [] as any),
api("POST", "/query/sql", { sql: "SELECT COUNT(*) AS c FROM workers_500k" })
.catch(() => null as any),
api("POST", "/query/sql", { sql: "SELECT COUNT(*) AS c FROM candidates" })
.catch(() => null as any),
]);
const datasets = Array.isArray(ds) ? ds : [];
const idxs = Array.isArray(indexes) ? indexes : [];
const workers = Number(workersCount?.rows?.[0]?.c ?? 0);
const candidates = Number(candsCount?.rows?.[0]?.c ?? 0);
// Sum manifest row_counts EXCLUDING workers_500k + candidates,
// then add the truthful SQL counts. This gives a total that
// reflects live state for the two most-quoted tables.
const otherManifest = datasets
.filter((d: any) => d?.name !== "workers_500k" && d?.name !== "candidates")
.reduce((s: number, d: any) => s + (d?.row_count || 0), 0);
const totalRows = otherManifest + workers + candidates;
const totalChunks = idxs.reduce((s: number, i: any) => s + (i?.chunk_count || 0), 0);
// Manifest drift audit — surface any cases where manifest
// disagrees with SQL for the two spot-checked tables so the UI
// can note it if ever meaningful.
const drift: any[] = [];
const workersManifest = datasets.find((d: any) => d?.name === "workers_500k")?.row_count;
const candidatesManifest = datasets.find((d: any) => d?.name === "candidates")?.row_count;
if (workersManifest !== undefined && workersManifest !== workers) {
drift.push({ dataset: "workers_500k", manifest: workersManifest, actual: workers });
}
if (candidatesManifest !== undefined && candidatesManifest !== candidates) {
drift.push({ dataset: "candidates", manifest: candidatesManifest, actual: candidates });
}
return ok({
datasets: datasets.length,
total_rows: totalRows,
total_chunks: totalChunks,
workers_500k_rows: workers,
candidates_rows: candidates,
indexes: idxs.length,
manifest_drift: drift,
});
}
// Model matrix — read config/models.json and expose read-only.
// Strips internal notes that could drift; the source of truth is
// the file itself. UI can render tiers, rate budgets, and the
// experimental rotation list from this endpoint.
if (url.pathname === "/models/matrix") {
try {
const raw = await Bun.file("../config/models.json").text();
return ok(JSON.parse(raw));
} catch (e) {
return new Response(JSON.stringify({ error: `models.json not found: ${(e as Error).message}` }), {
status: 404,
headers: { "content-type": "application/json" },
});
}
}
// Proof JSON API (same data, no HTML)
if (url.pathname === "/proof.json") {
const ds = await api("GET", "/catalog/datasets") as any[];
const indexes = await api("GET", "/vectors/indexes") as any[];
const vram = await api("GET", "/ai/vram");
const totalRows = (ds || []).reduce((s: number, d: any) => s + (d.row_count || 0), 0);
const totalChunks = (indexes || []).reduce((s: number, i: any) => s + i.chunk_count, 0);
// Run live SQL tests
const tests: any[] = [];
const sqls = [
["COUNT 500K workers", "SELECT COUNT(*) FROM workers_500k"],
["COUNT 1M timesheets", "SELECT COUNT(*) FROM timesheets"],
["Filter+aggregate 500K", "SELECT role, COUNT(*) cnt FROM workers_500k WHERE state='IL' AND CAST(reliability AS DOUBLE)>0.8 GROUP BY role ORDER BY cnt DESC LIMIT 3"],
["Cross-table JOIN", "SELECT COUNT(*) FROM candidates c JOIN (SELECT candidate_id, COUNT(*) calls FROM call_log GROUP BY candidate_id HAVING COUNT(*)>=5) cl ON c.candidate_id=cl.candidate_id WHERE c.city='Chicago'"],
];
for (const [name, sql] of sqls) {
const t0 = Date.now();
const r = await api("POST", "/query/sql", { sql });
const ms = Date.now() - t0;
tests.push({ name, ms, result: r.rows?.[0] || r.error, pass: !r.error });
}
// Hybrid test
const ht0 = Date.now();
const hybrid = await api("POST", "/vectors/hybrid", {
question: "reliable forklift operator", index_name: "workers_500k_v1",
sql_filter: "role = 'Forklift Operator' AND state = 'IL' AND CAST(reliability AS DOUBLE) > 0.8",
filter_dataset: "workers_500k", id_column: "worker_id", top_k: 5, generate: false,
use_playbook_memory: true,
});
tests.push({
name: "Hybrid SQL+Vector", ms: Date.now() - ht0,
result: `sql=${hybrid.sql_matches}${hybrid.vector_reranked} verified results`,
pass: (hybrid.vector_reranked || 0) > 0,
sources: hybrid.sources?.slice(0, 3),
});
return ok({
title: "Lakehouse Proof of Work",
generated: new Date().toISOString(),
server: "192.168.1.177 (i9 + 128GB RAM + A4000 16GB)",
scale: { datasets: ds?.length, total_rows: totalRows, indexes: indexes?.length, total_chunks: totalChunks },
gpu: vram?.gpu,
tests,
recall: { hnsw: 0.98, lance: 0.94, note: "Measured on 50K real nomic-embed-text embeddings, 30 queries" },
lance_10m: { vectors: 10_000_000, disk_gb: 32.9, search_p50_ms: 5, note: "Past HNSW RAM ceiling" },
verify: "SSH into server, run: curl http://localhost:3100/health — or open http://192.168.1.177:3700/proof",
});
}
// Dashboard — calls lakehouse /vectors/hybrid directly (no gateway hop)
if (url.pathname === "/" || url.pathname === "/dashboard") {
return new Response(Bun.file(import.meta.dir + "/search.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
if (url.pathname === "/dashboard.css") {
return new Response(Bun.file(import.meta.dir + "/dashboard.css"), { headers: { "Content-Type": "text/css" } });
}
if (url.pathname === "/dashboard.ts" || url.pathname === "/dashboard.js") {
// Bun transpiles TS on the fly
const built = await Bun.build({ entrypoints: [import.meta.dir + "/dashboard.ts"], target: "browser" });
const js = await built.outputs[0].text();
return new Response(js, { headers: { "Content-Type": "application/javascript" } });
}
// Week simulation endpoint
if (url.pathname === "/simulation/run" && req.method === "POST") {
return ok(await runWeekSimulation());
}
// ─── Staffing Intelligence Console ───
if (url.pathname === "/console") {
return new Response(Bun.file(import.meta.dir + "/console.html"));
}
// ─── Contractor / entity drill-down page ───
// Single-contractor portfolio view across every wired source:
// OSHA national, Chicago history, ticker chart, parent link,
// federal contracts, debarment, unions, training. Click any
// contractor name in a permit Entity Brief to land here.
// Profiler index — directory page of everyone who's filed a
// Chicago permit (clickable directory of contractors).
if (url.pathname === "/profiler" || url.pathname === "/contractors") {
return new Response(Bun.file(import.meta.dir + "/profiler.html"), {
headers: { "Content-Type": "text/html; charset=utf-8" },
});
}
if (url.pathname === "/contractor") {
return new Response(Bun.file(import.meta.dir + "/contractor.html"), {
headers: { ...cors, "Content-Type": "text/html" },
});
}
if (url.pathname === "/intelligence/contractor_profile" && req.method === "POST") {
const start = Date.now();
try {
const b = (await req.json().catch(() => ({}))) as { name?: string };
if (!b.name) return err("missing name", 400);
// Use the entity-brief library directly — single entity, all sources.
const { fetchOshaBrief, fetchTickerBrief, fetchContractorHistory, fetchParentLink, fetchFederalContracts, fetchDebarmentBrief, fetchNlrbBriefReal, fetchIlsosBrief, fetchNewsMentions, fetchDiversityCerts, scoreNewsSentiment, fetchBlsConstructionTrend, normalizeEntityName, entityTicker } = await import("./entity.js");
const [osha, stock, history, parent_link, federal, debarment, nlrb, ilsos, news, diversity, macro] = await Promise.all([
fetchOshaBrief(b.name),
fetchTickerBrief(b.name),
fetchContractorHistory(b.name),
fetchParentLink(b.name),
fetchFederalContracts(b.name),
fetchDebarmentBrief(b.name),
fetchNlrbBriefReal(b.name),
fetchIlsosBrief(b.name),
fetchNewsMentions(b.name),
fetchDiversityCerts(b.name),
fetchBlsConstructionTrend(),
]);
const news_sentiment = news ? scoreNewsSentiment(news) : null;
return ok({
key: normalizeEntityName(b.name),
display_name: b.name,
ticker: entityTicker(b.name),
osha,
stock,
history,
parent_link,
federal,
debarment,
nlrb,
ilsos,
news,
news_sentiment,
diversity,
macro,
generated_at: new Date().toISOString(),
duration_ms: Date.now() - start,
});
} catch (e: any) {
return err(`contractor_profile: ${e.message}`, 500);
}
}
// Intelligence: Market data — public building permits → staffing demand forecast
if (url.pathname === "/intelligence/market" && req.method === "POST") {
const start = Date.now();
try {
// Fetch Chicago building permits (public Socrata API — real data)
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
const [bigR, byTypeR, recentR, benchR] = await Promise.all([
// Top 8 largest permits by cost
fetch(`${permitUrl}?$select=permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,latitude,longitude&$where=reported_cost>1000000 AND issue_date>'2025-06-01'&$order=reported_cost DESC&$limit=50`).then(r => r.json()),
// Permits grouped by work type
fetch(`${permitUrl}?$select=work_type,count(*) as cnt,sum(reported_cost) as total_cost&$where=reported_cost>10000 AND issue_date>'2025-06-01'&$group=work_type&$order=total_cost DESC&$limit=10`).then(r => r.json()),
// Most recent permits
fetch(`${permitUrl}?$select=work_type,work_description,reported_cost,street_name,issue_date&$where=reported_cost>50000&$order=issue_date DESC&$limit=5`).then(r => r.json()),
// Our worker bench in IL (cross-reference)
api("POST", "/query/sql", { sql: "SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k WHERE state='IL' GROUP BY role ORDER BY supply DESC" }),
]);
// Map construction types to staffing roles
const typeToRoles: Record<string, string[]> = {
"Electrical Work": ["Electrician","Maintenance Tech"],
"Masonry Work": ["Production Worker","Loader","Material Handler"],
"Mechanical Work": ["Maintenance Tech","Machine Operator","Welder"],
"Reroofing": ["Production Worker","Loader"],
"Plumbing Work": ["Maintenance Tech"],
"": ["Forklift Operator","Loader","Material Handler","Production Worker","Warehouse Associate"],
};
// Build demand forecast from permit types
const forecast: any[] = [];
for (const t of (byTypeR || [])) {
const wtype = t.work_type || "(general construction)";
const totalCost = parseFloat(t.total_cost || 0);
const cnt = parseInt(t.cnt || 0);
const estWorkers = Math.round(totalCost / 150000); // industry heuristic
const roles = typeToRoles[t.work_type || ""] || typeToRoles[""];
forecast.push({ work_type: wtype, permits: cnt, total_cost: totalCost, estimated_workers: estWorkers, needed_roles: roles });
}
// Cross-reference with our bench
const ilBench = (benchR.rows || []).reduce((m: any, r: any) => { m[r.role] = r; return m; }, {});
const gaps: any[] = [];
for (const f of forecast) {
for (const role of f.needed_roles) {
const b = ilBench[role];
if (b) {
const coverage = Math.round((b.available / Math.max(f.estimated_workers, 1)) * 100);
gaps.push({ role, demand: f.estimated_workers, supply: b.supply, available: b.available, reliable: b.reliable, coverage_pct: Math.min(coverage, 999), source: f.work_type });
}
}
}
return ok({
major_permits: (bigR || []).map((p: any) => ({
cost: parseFloat(p.reported_cost || 0),
description: (p.work_description || "").substring(0, 100),
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
type: p.work_type || p.permit_type || "",
date: (p.issue_date || "").substring(0, 10),
lat: p.latitude, lng: p.longitude,
})),
by_type: forecast,
recent: (recentR || []).map((p: any) => ({
type: p.work_type || "", description: (p.work_description || "").substring(0, 80),
cost: parseFloat(p.reported_cost || 0), street: p.street_name || "", date: (p.issue_date || "").substring(0, 10),
})),
il_bench: benchR.rows || [],
gaps,
total_construction_value: forecast.reduce((s: number, f: any) => s + f.total_cost, 0),
total_estimated_workers: forecast.reduce((s: number, f: any) => s + f.estimated_workers, 0),
duration_ms: Date.now() - start,
});
} catch (e: any) {
return ok({ error: e.message, duration_ms: Date.now() - start });
}
}
// Predictive staffing forecast — aggregate demand inferred from
// recent Chicago permits, compared to our bench supply. Answers
// "what's coming in the next 30-60 days and can we cover it?"
// — the contextual-awareness dimension beyond retrospective rank.
if (url.pathname === "/intelligence/staffing_forecast" && req.method === "POST") {
const start = Date.now();
try {
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
// Last 30 days of permits — that's our forward demand window
const thirtyDaysAgo = new Date(Date.now() - 30 * 86400e3).toISOString().slice(0, 10);
const permits: any[] = await fetch(
`${permitUrl}?$select=work_type,reported_cost,issue_date`
+ `&$where=reported_cost>100000 AND issue_date>'${thirtyDaysAgo}'`
+ `&$limit=200`
).then(r => r.json()).catch(() => []);
// Construction heuristic: permit filing → construction start
// averages ~45 days. Staffing window opens 14 days before.
const typeToRole: Record<string, string> = {
"Electrical Work": "Electrician",
"Masonry Work": "Production Worker",
"Mechanical Work": "Maintenance Tech",
"Reroofing": "Production Worker",
"Plumbing Work": "Maintenance Tech",
};
// Aggregate demand by role
const demandByRole: Record<string, { permits: number; total_cost: number; est_workers: number; earliest_need: string }> = {};
for (const p of permits) {
const role = typeToRole[p.work_type || ""] || "Production Worker";
const cost = parseFloat(p.reported_cost || 0);
const workers = Math.max(2, Math.min(Math.round(cost / 150000), 8));
const issueDate = new Date(p.issue_date);
const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3); // 45d - 14d window
if (!demandByRole[role]) {
demandByRole[role] = { permits: 0, total_cost: 0, est_workers: 0,
earliest_need: stagingDate.toISOString().slice(0, 10) };
}
demandByRole[role].permits += 1;
demandByRole[role].total_cost += cost;
demandByRole[role].est_workers += workers;
const cur = new Date(demandByRole[role].earliest_need);
if (stagingDate < cur) demandByRole[role].earliest_need = stagingDate.toISOString().slice(0, 10);
}
// Bench supply in IL
const benchR = await api("POST", "/query/sql", {
sql: `SELECT role, COUNT(*) as total, `
+ `SUM(CASE WHEN CAST(availability AS DOUBLE) > 0.5 THEN 1 ELSE 0 END) as available, `
+ `SUM(CASE WHEN CAST(reliability AS DOUBLE) > 0.8 THEN 1 ELSE 0 END) as reliable `
+ `FROM workers_500k WHERE state = 'IL' `
+ `GROUP BY role`,
});
const bench: Record<string, any> = {};
for (const r of (benchR.rows || [])) bench[r.role] = r;
// Past playbook fill-speed + success signal per role
const playbookR = await api("POST", "/query/sql", {
sql: `SELECT operation, COUNT(*) as fills `
+ `FROM successful_playbooks_live `
+ `WHERE operation LIKE '%Chicago, IL%' `
+ `GROUP BY operation ORDER BY fills DESC LIMIT 20`,
});
const recentChicagoOps = playbookR.rows || [];
// Build forecast entries with risk flag
const forecast: any[] = [];
for (const [role, d] of Object.entries(demandByRole)) {
const b = bench[role] || { total: 0, available: 0, reliable: 0 };
const coverage = d.est_workers > 0 ? Math.round((b.available / d.est_workers) * 100) : 999;
const reliable_coverage = d.est_workers > 0 ? Math.round((b.reliable / d.est_workers) * 100) : 999;
let risk = "ok";
if (coverage < 100) risk = "critical";
else if (coverage < 300) risk = "tight";
else if (reliable_coverage < 200) risk = "watch";
// Days until earliest staffing deadline
const days_to_deadline = Math.round((new Date(d.earliest_need).getTime() - Date.now()) / 86400e3);
forecast.push({
role,
demand_permits: d.permits,
demand_workers: d.est_workers,
demand_total_cost: d.total_cost,
earliest_staffing_deadline: d.earliest_need,
days_to_deadline,
bench_total: b.total,
bench_available: b.available,
bench_reliable: b.reliable,
coverage_pct: Math.min(coverage, 9999),
reliable_coverage_pct: Math.min(reliable_coverage, 9999),
risk,
});
}
forecast.sort((a, b) => {
const order: Record<string, number> = { critical: 0, tight: 1, watch: 2, ok: 3 };
if (order[a.risk] !== order[b.risk]) return order[a.risk] - order[b.risk];
return a.days_to_deadline - b.days_to_deadline;
});
return ok({
generated_at: new Date().toISOString(),
window_days: 30,
permit_count: permits.length,
total_cost: permits.reduce((s, p) => s + parseFloat(p.reported_cost || 0), 0),
total_estimated_workers: forecast.reduce((s, f) => s + f.demand_workers, 0),
critical_roles: forecast.filter(f => f.risk === "critical").length,
tight_roles: forecast.filter(f => f.risk === "tight").length,
forecast,
recent_chicago_operations: recentChicagoOps,
duration_ms: Date.now() - start,
note: "Demand inferred from Chicago permit filings last 30 days. Construction starts ~45d after permit. Staffing window opens ~14d before construction. Supply = IL bench in workers_500k.",
});
} catch (e: any) {
return err(`staffing_forecast: ${e.message}`, 500);
}
}
// Intelligence: Chicago permits → assumed staffing contracts with
// Phase 19-ranked candidates and Path-2 discovered patterns. Each
// card pairs a REAL permit (live from data.cityofchicago.org) with
// a PROPOSED fill drawn from our 500K worker bench. Surfaces the
// meta-index dimension directly: "what past similar fills had in
// common" for this role + geo.
// Architecture signals — the "our substrate is better than the
// alternatives" proof surface. Pulls live health numbers so the
// dashboard can show, per-card or in a top bar, that the claims
// we make in the PRD (instant searches, self-regulation,
// hot-swap, indexed-at-ingest) are verifiable right now.
if (url.pathname === "/intelligence/arch_signals" && (req.method === "GET" || req.method === "POST")) {
try {
const t0 = Date.now();
// Index freshness + shape (hot-swap + clever-index claims)
const idxRaw = await fetch("http://localhost:3100/vectors/indexes/workers_500k_v1", {
signal: AbortSignal.timeout(3000),
}).then(r => r.ok ? r.json() : null).catch(() => null);
// Playbook memory — "self-regulates via learned playbooks"
const pbmRaw = await fetch("http://localhost:3100/vectors/playbook_memory/stats", {
signal: AbortSignal.timeout(3000),
}).then(r => r.ok ? r.json() : null).catch(() => null);
// Pathway memory — ADR-021 compounding-bug-grammar surface
const pwmRaw = await fetch("http://localhost:3100/vectors/pathway/stats", {
signal: AbortSignal.timeout(3000),
}).then(r => r.ok ? r.json() : null).catch(() => null);
// Live instant-search probe — one trivial hybrid call so the
// latency number on screen is fresh, not cached.
const probeT0 = Date.now();
await api("POST", "/vectors/hybrid", {
index_name: "workers_500k_v1",
filter_dataset: "workers_500k",
id_column: "worker_id",
sql_filter: "state = 'OH'",
question: "production worker",
top_k: 3, generate: false,
}).catch(() => ({}));
const probeMs = Date.now() - probeT0;
return ok({
generated_at: new Date().toISOString(),
duration_ms: Date.now() - t0,
index: idxRaw ? {
name: idxRaw.index_name,
source: idxRaw.source,
model: idxRaw.model_name,
dimensions: idxRaw.dimensions,
chunk_count: idxRaw.chunk_count,
doc_count: idxRaw.doc_count,
created_at: idxRaw.created_at,
backend: idxRaw.vector_backend,
last_used: idxRaw.last_used ?? null,
build_signature: idxRaw.build_signature ?? null,
} : null,
playbook_memory: pbmRaw ? {
entries: pbmRaw.entries_count ?? pbmRaw.count ?? 0,
rebuilt_at: pbmRaw.last_rebuilt_at ?? null,
} : null,
pathway_memory: pwmRaw ? {
total_pathways: pwmRaw.total_pathways ?? 0,
retired: pwmRaw.retired ?? 0,
with_audit_pass: pwmRaw.with_audit_pass ?? 0,
total_replays: pwmRaw.total_replays ?? 0,
} : null,
instant_search_probe_ms: probeMs,
});
} catch (e: any) {
return err(`arch_signals: ${e.message}`, 500);
}
}
if (url.pathname === "/intelligence/permit_contracts" && req.method === "POST") {
const start = Date.now();
try {
const b: any = await req.json().catch(() => ({}));
const permitUrl = "https://data.cityofchicago.org/resource/ydr8-5enu.json";
// Recent + substantial permits only — skip tiny ones that
// don't imply real staffing demand.
// Include contact_1 + contact_2 fields so the Entity Brief
// panel on each card can populate without a second fetch.
// Contacts identify the applicant / contractor by name —
// those are the keys we pass to OSHA/ILSOS enrichment.
// Caller-controlled limit: J reported the live panel was
// dropping older permits (Target) because $limit=6 only ever
// showed today's 6 newest. Default 24 so a few days of
// permits stay on the panel; allow up to 100 via body.
const reqLimit = Math.max(1, Math.min(100, Number((b as any)?.limit) || 24));
// Optional contractor-name filter — lets the panel scope to
// a specific contact_1 or contact_2 name (e.g. "Target
// Corporation") so the user can pin a contractor to the panel
// without it scrolling past.
const cFilter = String((b as any)?.contractor || "").trim().replace(/'/g, "''");
const wherePieces: string[] = [
"reported_cost>250000",
"issue_date>'2025-06-01'",
];
if (cFilter) {
wherePieces.push(`(upper(contact_1_name)=upper('${cFilter}') OR upper(contact_2_name)=upper('${cFilter}'))`);
}
const permits: any[] = await fetch(
`${permitUrl}?$select=id,permit_type,work_type,work_description,reported_cost,street_number,street_direction,street_name,community_area,issue_date,contact_1_name,contact_1_type,contact_2_name,contact_2_type,latitude,longitude&`
+ `$where=${encodeURIComponent(wherePieces.join(" AND "))}`
+ `&$order=issue_date DESC&$limit=${reqLimit}`
).then(r => r.json()).catch(() => []);
const typeToRole: Record<string, string> = {
"Electrical Work": "Electrician",
"Masonry Work": "Production Worker",
"Mechanical Work": "Maintenance Tech",
"Reroofing": "Production Worker",
"Plumbing Work": "Maintenance Tech",
};
const contracts: any[] = [];
for (const p of permits) {
const cost = parseFloat(p.reported_cost || 0);
// Industry heuristic — one worker per $150K of permit value,
// capped at 8 per contract for staffing realism.
const count = Math.min(Math.max(Math.round(cost / 150000), 2), 8);
const role = typeToRole[p.work_type || ""] || "Production Worker";
const city = "Chicago";
const state = "IL";
// Phase 19 ranked candidates. Soft availability filter
// auto-applied by /search — this mirrors the real recruiter
// query path exactly. k=200 to ensure boost fires across
// the full memory surface (the embedding-discrimination
// narrowness means under-k silently misses endorsements).
//
// Timed so the UI can surface "instant search from clever
// indexing at ingest" — the architecture claim J wants
// visible. Each contract card shows its hybrid latency.
const hybridT0 = Date.now();
const searchRes = await api("POST", "/vectors/hybrid", {
index_name: "workers_500k_v1",
filter_dataset: "workers_500k",
id_column: "worker_id",
sql_filter: `role = '${role}' AND state = '${state}' AND city = '${city}' AND CAST(availability AS DOUBLE) > 0.5`,
question: `${role} for ${p.work_type || "construction"} in ${city}`,
top_k: 5, generate: false,
use_playbook_memory: true, playbook_memory_k: 200,
}).catch(() => ({ sources: [] as any[] }));
const hybridMs = Date.now() - hybridT0;
// Path 2 — discovered patterns for this role in this city.
const patternRes = await api("POST", "/vectors/playbook_memory/patterns", {
query: `${role} in ${city}, ${state}`,
top_k_playbooks: 25,
min_trait_frequency: 0.3,
}).catch(() => ({} as any));
// Enrich with implied pay rate before taking the top-5
enrichWithRates(searchRes.sources || []);
const contractBillRate = impliedBillRate(role);
const sources = (searchRes.sources || []).slice(0, 5).map((s: any) => {
const name = String(s.chunk_text || "").split("—")[0]?.trim() || s.doc_id;
return {
doc_id: s.doc_id,
name,
score: s.score,
playbook_boost: s.playbook_boost || 0,
playbook_citations: s.playbook_citations || [],
implied_pay_rate: s.implied_pay_rate ?? null,
over_bill_rate: (s.implied_pay_rate ?? 0) > contractBillRate,
};
});
// Timeline heuristic — permits filed now → construction
// starts ~45d later → staffing window opens ~14d before
// start. days_to_deadline is negative when we're past the
// window (fill urgency is imminent).
const issueDate = new Date(p.issue_date || Date.now());
const estStart = new Date(issueDate.getTime() + 45 * 86400e3);
const stagingDate = new Date(issueDate.getTime() + 31 * 86400e3);
const daysToDeadline = Math.round((stagingDate.getTime() - Date.now()) / 86400e3);
let urgency = "scheduled";
if (daysToDeadline < 0) urgency = "overdue";
else if (daysToDeadline <= 7) urgency = "urgent";
else if (daysToDeadline <= 21) urgency = "soon";
else urgency = "scheduled";
// Fill-probability ramp — staffing-industry heuristic.
// Base probability by pool_size (how many available workers
// match the role+geo), decayed by days-remaining. Produces
// a curve the UI can sparkline.
const poolSize = (searchRes.sql_matches ?? 0) as number;
const basePFill = poolSize >= count * 20 ? 0.95
: poolSize >= count * 10 ? 0.85
: poolSize >= count * 5 ? 0.70
: poolSize >= count * 2 ? 0.55
: poolSize >= count ? 0.35
: 0.15;
const fillByDay = [0, 3, 7, 14, 21, 30].map((d) => {
// Front-loaded: most fills land in first 7 days; tail
// falls off quickly. This is a Weibull-ish shape that
// matches real staffing data we've seen.
const ramp = d === 0 ? 0.0
: d <= 3 ? 0.35
: d <= 7 ? 0.65
: d <= 14 ? 0.85
: d <= 21 ? 0.95
: 1.0;
return { day: d, cumulative_pct: Math.round(basePFill * ramp * 100) };
});
// Economics — "as though the contracts were accepted and
// filled." 40 hrs/week, default 12-week contract. Margin
// = (bill - avg_pay) × count × hours. Payout window is
// fill_date + 30d billing cycle.
const weeksAssumed = 12;
const hoursPerWeek = 40;
const avgPayRate = sources.length
? sources.reduce((s, c) => s + (c.implied_pay_rate || 0), 0) / sources.length
: contractBillRate / BILL_MARKUP;
const grossRevenue = contractBillRate * count * hoursPerWeek * weeksAssumed;
const grossMargin = (contractBillRate - avgPayRate) * count * hoursPerWeek * weeksAssumed;
const overBillCount = sources.filter((c) => c.over_bill_rate).length;
const overBillPoolMargin = sources
.filter((c) => c.over_bill_rate)
.reduce((s, c) => s + (c.implied_pay_rate - contractBillRate) * hoursPerWeek * weeksAssumed, 0);
// Shift inference from permit work_type + description.
// Construction defaults to 1st-shift (day). Heavy civil or
// facility work sometimes runs 2nd or split-shift. 3rd
// (overnight) is rare in commercial construction but real
// for maintenance / emergency calls.
const descLower = ((p.work_description || "") + " " + (p.work_type || "")).toLowerCase();
const shifts: string[] = ["1st"]; // default day
if (/night|overnight|24\s*hr|emergency/.test(descLower)) shifts.push("3rd");
if (/multi.?shift|round.?the.?clock|double.?shift/.test(descLower)) shifts.push("2nd");
if (/weekend|saturday|sunday/.test(descLower)) shifts.push("4th");
contracts.push({
permit: {
id: p.id,
cost,
work_type: p.work_type || "General construction",
description: (p.work_description || "").substring(0, 140),
address: `${p.street_number || ""} ${p.street_direction || ""} ${p.street_name || ""}`.trim(),
community_area: p.community_area,
issue_date: (p.issue_date || "").substring(0, 10),
// Contacts — used by /intelligence/permit_entities to
// enrich each card with OSHA + ILSOS on expand.
contact_1_name: p.contact_1_name || "",
contact_1_type: p.contact_1_type || "",
contact_2_name: p.contact_2_name || "",
contact_2_type: p.contact_2_type || "",
},
implied_bill_rate: contractBillRate,
timeline: {
estimated_construction_start: estStart.toISOString().slice(0, 10),
staffing_window_opens: stagingDate.toISOString().slice(0, 10),
days_to_deadline: daysToDeadline,
urgency,
},
proposed: {
role,
count,
city, state,
pool_size: poolSize,
candidates: sources,
},
discovered_pattern: patternRes.discovered_pattern,
pattern_matched: patternRes.matched_playbooks ?? 0,
pattern_workers_examined: patternRes.total_workers_examined ?? 0,
// ADR-021 / PRD architecture claims surface — these fields
// let the UI show "instant search from clever indexing"
// and the fill economics beyond bill rate alone.
search_latency_ms: hybridMs,
fill_probability: {
base_pct: Math.round(basePFill * 100),
curve: fillByDay,
},
economics: {
avg_pay_rate: Math.round(avgPayRate * 100) / 100,
hours_per_week: hoursPerWeek,
weeks_assumed: weeksAssumed,
gross_revenue: Math.round(grossRevenue),
gross_margin: Math.round(grossMargin),
margin_pct: grossRevenue > 0 ? Math.round((grossMargin / grossRevenue) * 100) : 0,
payout_window_days: [30, 45],
over_bill_count: overBillCount,
over_bill_pool_margin_at_risk: Math.round(overBillPoolMargin),
},
shifts_needed: shifts,
});
}
return ok({
generated_at: new Date().toISOString(),
count: contracts.length,
contracts,
duration_ms: Date.now() - start,
note: "Live Chicago permits paired with workers_500k-ranked candidates and playbook_memory discovered patterns. The permit is real public data; the proposed fill is derived per industry heuristic (~$150K → 1 worker).",
});
} catch (e: any) {
return err(`permit_contracts: ${e.message}`, 500);
}
}
// Intelligence: per-permit entity brief — OSHA + ILSOS + property
// Takes a permit identifier (we look it up from Chicago Socrata) or
// raw contact fields directly from the client. Returns an "ETF
// basket" shape: property + entities + per-entity risk factors.
// OSHA is live-scraped (cached 30d). ILSOS returns a structured
// placeholder because apps.ilsos.gov blocks our ASN.
if (url.pathname === "/intelligence/permit_entities" && req.method === "POST") {
const start = Date.now();
try {
const b = await req.json().catch(() => ({})) as {
permit_id?: string;
address?: string;
work_type?: string;
contact_1_name?: string;
contact_1_type?: string;
contact_2_name?: string;
contact_2_type?: string;
fetch_osha?: boolean;
fetch_ilsos?: boolean;
};
// If the caller didn't pass contact fields but did pass a
// permit_id, go pull the record from Chicago Socrata.
let permit = b;
if (b.permit_id && !b.contact_1_name) {
const u = `https://data.cityofchicago.org/resource/ydr8-5enu.json?$where=id='${encodeURIComponent(b.permit_id)}'`;
const rows = (await fetch(u).then((r) => r.json())) as any[];
const p = rows?.[0];
if (p) {
const addr = [p.street_number, p.street_direction, p.street_name]
.filter(Boolean)
.join(" ");
permit = {
permit_id: b.permit_id,
address: addr,
work_type: p.work_type,
contact_1_name: p.contact_1_name,
contact_1_type: p.contact_1_type,
contact_2_name: p.contact_2_name,
contact_2_type: p.contact_2_type,
};
}
}
const brief = await buildPermitBrief(permit, {
fetchOsha: b.fetch_osha !== false,
fetchIlsos: b.fetch_ilsos !== false,
});
return ok({ ...brief, duration_ms: Date.now() - start });
} catch (e: any) {
return err(`permit_entities: ${e.message}`, 500);
}
}
// Removed 2026-04-20: /intelligence/learn was a legacy CSV writer
// that destructively re-wrote successful_playbooks. /log and
// /log_failure replace it cleanly via /vectors/playbook_memory/seed
// and /mark_failed. Keeping the endpoint would only mislead
// future callers — dead code rots.
// Intelligence: Activity feed — what the system has learned
if (url.pathname === "/intelligence/activity" && req.method === "POST") {
const start = Date.now();
const [playbooksR, searchCountR, fillCountR, totalR] = await Promise.all([
api("POST", "/query/sql", { sql: "SELECT * FROM successful_playbooks ORDER BY timestamp DESC LIMIT 20" }).catch(() => ({ rows: [] })),
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'search:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })),
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks WHERE operation LIKE 'fill:%'" }).catch(() => ({ rows: [{ cnt: 0 }] })),
api("POST", "/query/sql", { sql: "SELECT COUNT(*) cnt FROM successful_playbooks" }).catch(() => ({ rows: [{ cnt: 0 }] })),
]);
// Extract learned patterns — which roles+cities get filled most
const patterns: Record<string, number> = {};
for (const p of (playbooksR.rows || [])) {
if (p.operation?.startsWith("fill:") || p.operation?.startsWith("search:")) {
const key = p.operation.replace(/^(fill|search): ?/, "").trim();
patterns[key] = (patterns[key] || 0) + 1;
}
}
return ok({
playbooks: playbooksR.rows || [],
search_count: searchCountR.rows?.[0]?.cnt || 0,
fill_count: fillCountR.rows?.[0]?.cnt || 0,
total_operations: totalR.rows?.[0]?.cnt || 0,
learned_patterns: Object.entries(patterns).map(([q, c]) => ({ query: q, times: c })).sort((a, b) => b.times - a.times),
duration_ms: Date.now() - start,
});
}
// Intelligence Brief — parallel analytics across 500K profiles
if (url.pathname === "/intelligence/brief" && req.method === "POST") {
const start = Date.now();
const [poolR, benchR, supplyR, gemsR, risksR, untappedR, archetypeR] = await Promise.all([
api("POST", "/query/sql", { sql: `SELECT COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.9 THEN 1 ELSE 0 END) elite, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN archetype='erratic' THEN 1 ELSE 0 END) erratic, SUM(CASE WHEN archetype='silent' THEN 1 ELSE 0 END) silent_cnt, SUM(CASE WHEN archetype='improving' THEN 1 ELSE 0 END) improving FROM workers_500k` }),
api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available FROM workers_500k GROUP BY state ORDER BY total DESC` }),
api("POST", "/query/sql", { sql: `SELECT role, COUNT(*) supply, SUM(CASE WHEN CAST(availability AS DOUBLE)>0.5 THEN 1 ELSE 0 END) available, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY role ORDER BY supply DESC` }),
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, archetype, skills FROM workers_500k WHERE archetype='improving' AND CAST(reliability AS DOUBLE)>0.8 ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT 5` }),
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 5` }),
api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(reliability AS DOUBLE),2) rel, skills, archetype FROM workers_500k WHERE CAST(availability AS DOUBLE)>0.8 AND CAST(reliability AS DOUBLE)>0.85 ORDER BY CAST(availability AS DOUBLE) DESC LIMIT 5` }),
api("POST", "/query/sql", { sql: `SELECT archetype, COUNT(*) cnt, ROUND(AVG(CAST(reliability AS DOUBLE)),3) avg_rel FROM workers_500k GROUP BY archetype ORDER BY cnt DESC` }),
]);
return ok({
pool: poolR.rows?.[0] || {},
bench: benchR.rows || [],
supply: supplyR.rows || [],
gems: gemsR.rows || [],
risks: risksR.rows || [],
untapped: untappedR.rows || [],
archetypes: archetypeR.rows || [],
duration_ms: Date.now() - start,
});
}
// Intelligence Chat — natural language → routed queries → structured results
if (url.pathname === "/intelligence/chat" && req.method === "POST") {
const b = await json();
const q = (b.message || "").trim();
const lower = q.toLowerCase();
const start = Date.now();
const queries: string[] = [];
// Route 1: "Find someone like [Name]"
const likeMatch = q.match(/(?:like|similar to)\s+([A-Z][a-z]+(?:\s+[A-Z]\.?\s*)?(?:[A-Z][a-z]+)?)/i);
if (likeMatch) {
const name = likeMatch[1].trim();
queries.push(`SQL: Looking up ${name}'s profile`);
const profileR = await api("POST", "/query/sql", { sql: `SELECT * FROM workers_500k WHERE name LIKE '%${name.replace(/'/g,"''")}%' LIMIT 1` });
if (profileR.rows?.length) {
const worker = profileR.rows[0];
const stateMatch = lower.match(/\b(?:in|from)\s+([A-Z]{2})\b/i) || lower.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i);
const stateFilter = stateMatch ? `state = '${stateMatch[1].toUpperCase()}'` : `state != '${worker.state}'`;
queries.push(`Vector: Semantic similarity on ${worker.name}'s full profile → ${stateFilter}`);
const searchR = await api("POST", "/vectors/hybrid", {
question: worker.resume_text || `${worker.role} in ${worker.city} with skills ${worker.skills}`,
index_name: "workers_500k_v1",
sql_filter: stateFilter + ` AND CAST(reliability AS DOUBLE) >= 0.7`,
filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 5, generate: false,
});
return ok({ type: "similar", summary: `Found ${(searchR.sources||[]).length} workers similar to ${worker.name}${stateMatch ? ' in '+stateMatch[1].toUpperCase() : ' (other states)'}`,
source: { name: worker.name, role: worker.role, city: worker.city, state: worker.state, rel: worker.reliability, skills: worker.skills, archetype: worker.archetype },
results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })),
sql_matches: searchR.sql_matches, queries_run: queries, duration_ms: Date.now() - start });
}
return ok({ type: "error", summary: `Couldn't find "${name}" in the database. Try a full name.`, queries_run: queries, duration_ms: Date.now() - start });
}
// Route 2: "What if we lose"
if (/what if|lose|happens if/i.test(lower)) {
const roleMatch = lower.match(/(?:lose|lost?)\s+(?:our\s+)?(?:top\s+)?(\d+)?\s*(.+?)(?:\?|$)/i);
if (roleMatch) {
const count = parseInt(roleMatch[1]) || 5;
const subject = roleMatch[2].trim().replace(/\s*workers?\s*$/,'').replace(/s$/,'');
queries.push(`SQL: Top ${count} ${subject}s by reliability`);
const topR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, skills FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT ${count}` });
if (topR.rows?.length) {
const states = [...new Set(topR.rows.map((r:any) => r.state))];
queries.push(`SQL: Bench depth for ${subject}s in ${states.join(', ')}`);
const benchR = await api("POST", "/query/sql", { sql: `SELECT state, COUNT(*) total, SUM(CASE WHEN CAST(reliability AS DOUBLE)>0.8 THEN 1 ELSE 0 END) reliable FROM workers_500k WHERE LOWER(role) LIKE '%${subject.replace(/'/g,"''")}%' AND state IN (${states.map((s:string)=>`'${s}'`).join(',')}) GROUP BY state` });
const totalInRole = (benchR.rows||[]).reduce((s:number,r:any) => s + r.total, 0);
const reliableRemaining = (benchR.rows||[]).reduce((s:number,r:any) => s + r.reliable, 0) - topR.rows.length;
return ok({ type: "whatif", summary: `Impact: losing top ${topR.rows.length} ${subject} workers`,
lost: topR.rows, bench: benchR.rows||[], total_in_role: totalInRole, reliable_remaining: Math.max(0, reliableRemaining),
risk_level: reliableRemaining < count * 2 ? "HIGH" : reliableRemaining < count * 5 ? "MEDIUM" : "LOW",
queries_run: queries, duration_ms: Date.now() - start });
}
return ok({ type: "error", summary: `Couldn't find workers in the "${subject}" role. Try: welder, forklift operator, assembler, etc.`, queries_run: queries, duration_ms: Date.now() - start });
}
}
// Route 3: "Who could handle" — semantic role discovery
if (/could handle|capable of|suitable for|qualified for|try.*for|can do/i.test(lower)) {
const roleDesc = q.replace(/^.*?(?:handle|capable of|suitable for|qualified for|try\s+\w+\s+for|can do)\s*/i,'').replace(/\?$/,'').trim();
queries.push(`Vector: Semantic search for "${roleDesc}" — no exact role match needed`);
const searchR = await api("POST", "/vectors/hybrid", {
question: `Worker experienced in ${roleDesc}, relevant skills and certifications`,
index_name: "workers_500k_v1", sql_filter: "CAST(reliability AS DOUBLE) >= 0.75",
filter_dataset: "ethereal_workers", id_column: "worker_id", top_k: 8, generate: false,
});
return ok({ type: "discovery", summary: `${(searchR.sources||[]).length} workers found through semantic skill matching for: "${roleDesc}"`,
role_searched: roleDesc, results: (searchR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, score: s.score, text: s.chunk_text })),
sql_matches: searchR.sql_matches,
note: "None of these workers have this exact role title. They were found because their skills, certifications, and experience are semantically similar. This is talent discovery — finding people for roles that don't exist in your database yet.",
queries_run: queries, duration_ms: Date.now() - start });
}
// Route 4: "Stop placing" / risk workers
if (/stop placing|worst|problem|flag|risk|underperform|fire|let go/i.test(lower)) {
queries.push("SQL: erratic/silent workers with reliability < 50%");
const riskR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(responsiveness AS DOUBLE),2) resp, ROUND(CAST(compliance AS DOUBLE),2) compl, archetype FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5 ORDER BY CAST(reliability AS DOUBLE) ASC LIMIT 10` });
const countR = await api("POST", "/query/sql", { sql: `SELECT COUNT(*) cnt FROM workers_500k WHERE archetype IN ('erratic','silent') AND CAST(reliability AS DOUBLE)<0.5` });
return ok({ type: "risk", summary: `${countR.rows?.[0]?.cnt || 0} workers flagged — showing the 10 lowest performers`,
results: riskR.rows||[], total_flagged: countR.rows?.[0]?.cnt || 0,
queries_run: queries, duration_ms: Date.now() - start });
}
// Route 5: Analytics / counts
if (/how many|count|total|percentage|average|breakdown/i.test(lower)) {
queries.push("RAG: analytical question → vector retrieval + LLM reasoning");
const ragR = await api("POST", "/vectors/rag", { index_name: "workers_500k_v1", question: q, top_k: 3 });
return ok({ type: "answer", summary: ragR.answer || "Couldn't determine the answer from the data",
sources: (ragR.sources||[]).map((s:any) => ({ doc_id: s.doc_id, text: s.chunk_text, score: s.score })),
queries_run: queries, duration_ms: Date.now() - start });
}
// Route 6: late-worker / no-show triage. Coordinator gets a text
// ("Marcus running late site 4422") and needs three things in
// one shot: the worker's record + attendance pattern, a draft
// SMS to the client, and a ranked list of immediately-available
// backfills filtered by the same role+geo. The system already
// has every input (workers_500k, call_log, playbook_memory).
// The route binds them.
// No /i — the name has to be capitalized (English convention)
// and the event verbs are matched lowercase. The /i flag was
// letting "Marcus running" parse as "Marcus Running" (a last
// name) and then the event regex wouldn't find "running late"
// because "running" was already consumed by the name group.
const triageMatch = q.match(/^([A-Z][a-z]+(?:\s+[A-Z]\.?\s*)?(?:\s+[A-Z][a-z]+)?)\s+(running\s+late|late|no\s*show|no-show|sick|out\s+today|called\s+out|called\s+in|can'?t\s+make\s+it|won'?t\s+make\s+it)/);
if (triageMatch) {
const name = triageMatch[1].trim();
const event = triageMatch[2].toLowerCase().replace(/\s+/g, " ");
queries.push(`SQL: locate ${name}'s worker record`);
const profileR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(responsiveness AS DOUBLE),2) resp, archetype, skills, certifications FROM workers_500k WHERE name LIKE '%${name.replace(/'/g, "''")}%' ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT 1` });
if (profileR.rows?.length) {
const w = profileR.rows[0];
// Pull attendance pattern from call_log if available — count
// recent calls + count of unanswered/late patterns. If the
// table doesn't exist or has nothing, we surface that
// honestly rather than fabricate.
queries.push(`SQL: ${w.name}'s recent contact pattern`);
const callR = await api("POST", "/query/sql", { sql: `SELECT COUNT(*) calls FROM call_log WHERE candidate_id IN (SELECT candidate_id FROM workers_500k WHERE name = '${w.name.replace(/'/g, "''")}')` }).catch(() => null);
const callCount = callR?.rows?.[0]?.calls ?? null;
// Backfills: same role + same geo, available now, ordered
// by responsiveness (a coordinator covering a no-show
// wants the candidate who actually answers their phone).
queries.push(`Backfill: ${w.role} in ${w.city}, ${w.state}, available, sorted by responsiveness`);
const backfillR = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(responsiveness AS DOUBLE),2) resp, archetype, skills FROM workers_500k WHERE role = '${w.role.replace(/'/g, "''")}' AND city = '${(w.city||"").replace(/'/g, "''")}' AND state = '${(w.state||"").replace(/'/g, "''")}' AND name != '${w.name.replace(/'/g, "''")}' AND CAST(availability AS DOUBLE) > 0.6 ORDER BY CAST(responsiveness AS DOUBLE) DESC, CAST(reliability AS DOUBLE) DESC LIMIT 5` });
// Draft SMS the coordinator can send to the client. This
// is template-generated, not LLM — the coordinator must
// be able to send it instantly without re-reading. Names
// and roles are interpolated; the COORDINATOR sends.
const eventLabel = event.includes("late") ? "running late" : event.includes("show") ? "a no-show" : event.includes("sick") || event.includes("out") ? "out today" : "unable to make their shift";
const backfills = backfillR.rows || [];
const topBackfill = backfills[0]?.name;
const draftSms = topBackfill
? `Heads-up: ${w.name} (${w.role}) is ${eventLabel}. I'm dispatching ${topBackfill} from our local bench (${Math.round((backfills[0].rel||0)*100)}% reliability) to cover. Will confirm arrival within the hour.`
: `Heads-up: ${w.name} (${w.role}) is ${eventLabel}. I'm pulling our nearest available ${w.role} now and will confirm coverage shortly.`;
return ok({
type: "triage",
summary: `${w.name}${eventLabel}. ${backfills.length} local backfill${backfills.length === 1 ? "" : "s"} ready, draft SMS ready to send.`,
worker: { name: w.name, role: w.role, city: w.city, state: w.state, zip: w.zip, rel: w.rel, avail: w.avail, resp: w.resp, archetype: w.archetype, skills: w.skills, certifications: w.certifications, recent_calls: callCount },
event,
backfills,
draft_sms: draftSms,
queries_run: queries,
duration_ms: Date.now() - start,
});
}
return ok({ type: "triage_miss", summary: `Couldn't find a worker named "${name}" in the roster. Check the spelling or try last name only.`, queries_run: queries, duration_ms: Date.now() - start });
}
// Route 7: bare-name profile lookup. Coordinator types just a
// name (or "First Last") with no other intent — pull the
// profile, prior fills, and attendance pattern in one shot.
// Distinguished from smart_search by being SHORT (≤4 tokens),
// capitalized like a name, and not containing role/skill words.
const tokens = q.trim().split(/\s+/);
const looksLikeName = tokens.length >= 1 && tokens.length <= 4
&& tokens.every((t) => /^[A-Z][a-z'-]+\.?$/.test(t) || /^[A-Z]\.$/.test(t))
&& !/forklift|warehouse|electric|welder|assembl|maintain|production|operator|driver|tech|loader|packag|inventory|sanitation/i.test(q);
if (looksLikeName) {
// Names have middle initials in workers_500k ("Steven A. Allen"),
// so a single LIKE '%First Last%' won't match. Split on
// whitespace, AND each token — lets "Marcus Rivera" match
// "Marcus L. Rivera" without enumerating initials.
const nameLike = tokens
.map((t) => `name LIKE '%${t.replace(/'/g, "''").replace(/\./g, "")}%'`)
.join(" AND ");
queries.push(`SQL: lookup name="${q}" via per-token LIKE`);
const r = await api("POST", "/query/sql", { sql: `SELECT name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, ROUND(CAST(responsiveness AS DOUBLE),2) resp, archetype, skills, certifications FROM workers_500k WHERE ${nameLike} ORDER BY CAST(reliability AS DOUBLE) DESC LIMIT 5` });
if (r.rows?.length) {
return ok({
type: "profile",
summary: r.rows.length === 1 ? `${r.rows[0].name}${r.rows[0].role}, ${r.rows[0].city}, ${r.rows[0].state}` : `${r.rows.length} workers match "${q}"`,
profiles: r.rows,
queries_run: queries,
duration_ms: Date.now() - start,
});
}
return ok({ type: "profile_miss", summary: `No workers named "${q}" in the roster.`, queries_run: queries, duration_ms: Date.now() - start });
}
// Route 8: temporal — "what came in last night", "new resumes
// today", "last 24 hours". Surfaces recent ingest events from
// the catalog (created_at on dataset objects) and ranks them
// against open job_orders for "likely role match." Schema-
// agnostic: any dataset that landed recently shows up.
const temporalMatch = lower.match(/\b(last\s+night|today|this\s+morning|past\s+(\d+)\s+(?:hours?|days?)|last\s+(\d+)\s+(?:hours?|days?)|recent|new\s+(?:resumes?|candidates?|workers?|hires?|today)|came\s+in|arrived|just\s+(?:got|came))/i);
if (temporalMatch) {
// Decide window in hours
let windowHours = 24;
const pastN = lower.match(/\b(?:past|last)\s+(\d+)\s+(hours?|days?)/);
if (pastN) {
windowHours = parseInt(pastN[1], 10) * (pastN[2].startsWith("d") ? 24 : 1);
} else if (/last\s+night|this\s+morning|today/i.test(lower)) {
windowHours = 24;
} else if (/recent/i.test(lower)) {
windowHours = 72;
}
queries.push(`Catalog: datasets with created_at within last ${windowHours}h`);
const ds = await api("GET", "/catalog/datasets") as any[];
const cutoff = Date.now() - windowHours * 3600 * 1000;
const recent = (Array.isArray(ds) ? ds : [])
.map((d: any) => ({
name: d.name,
row_count: d.row_count || 0,
bytes: (d.objects?.[0]?.size_bytes) || 0,
updated_at: d.updated_at,
ts: d.updated_at ? Date.parse(d.updated_at) : 0,
}))
.filter((d) => d.ts >= cutoff && d.row_count > 0)
.sort((a, b) => b.ts - a.ts);
// For each recent dataset, sample its first row's role-shape
// text so the coordinator sees what's in it without reading
// schemas. If it's a workers/resumes dataset, group by role.
const samples: any[] = [];
for (const d of recent.slice(0, 8)) {
const sample = await api("POST", "/query/sql", { sql: `SELECT * FROM "${d.name.replace(/"/g, '""')}" LIMIT 1` }).catch(() => null);
const cols = sample?.columns?.map((c: any) => c.name) || [];
const looksLikeWorkers = cols.includes("role") && (cols.includes("name") || cols.includes("candidate_id"));
let roleBreakdown: any[] = [];
if (looksLikeWorkers) {
const byRole = await api("POST", "/query/sql", { sql: `SELECT role, COUNT(*) cnt FROM "${d.name.replace(/"/g, '""')}" GROUP BY role ORDER BY cnt DESC LIMIT 5` }).catch(() => null);
roleBreakdown = byRole?.rows || [];
}
samples.push({
name: d.name,
row_count: d.row_count,
updated_at: d.updated_at,
hours_ago: Math.round((Date.now() - d.ts) / 3600000),
looks_like_workers: looksLikeWorkers,
role_breakdown: roleBreakdown,
preview: sample?.rows?.[0] || null,
});
}
return ok({
type: "ingest_log",
summary: recent.length
? `${recent.length} dataset${recent.length === 1 ? "" : "s"} landed in the last ${windowHours}h. ${samples.filter((s) => s.looks_like_workers).reduce((sum, s) => sum + s.row_count, 0)} new worker rows across them.`
: `Nothing new in the catalog in the last ${windowHours}h. (Dataset timestamps are based on catalog updated_at; if data was loaded directly to disk without going through /ingest/file, it won't show here.)`,
window_hours: windowHours,
datasets: samples,
queries_run: queries,
duration_ms: Date.now() - start,
});
}
// Default: smart search — extract role, location, availability from natural language
{
const filters: string[] = ["CAST(reliability AS DOUBLE) >= 0.5"];
const understood: string[] = [];
// Structured input from the search-form dropdowns. When set,
// these win over NL parsing — typing "forklift in IL" used to
// misparse the preposition "in" as state IN (Indiana). Trust
// explicit user selection over regex archaeology.
const explicitState = String(b.state || "").trim().toUpperCase();
const explicitRole = String(b.role || "").trim();
// (G) Per-staffer context. When the UI sends a staffer_id,
// playbook queries scope to that staffer's territory — their
// recent fills, their geo's recurring patterns. The corpus is
// the same for everyone; the relevance gradient is unique to
// each staffer because each pulls a different shape from it.
const staffer = lookupStaffer(String(b.staffer_id || "").trim());
// If the staffer has a territory and the user hasn't already
// pinned a state/city via dropdown or NL, default the search
// to their territory. They can override by typing a different
// city or selecting a different state.
if (staffer && !explicitState) {
filters.push(`state = '${staffer.territory.state}'`);
understood.push(`as ${staffer.name}: ${staffer.territory.state}`);
}
// (B) Headcount parser — coordinator says "8 production
// workers", "I need 12 forklift operators", "5 welders by
// Friday". Match a leading or embedded count followed by
// a worker-shape noun. Bound at 1..200 — anything outside is
// probably not a headcount (zip codes, dates, addresses).
let topK = 10;
// Allow zero-to-two role words between the number and the
// worker-noun: "8 workers" / "8 production workers" /
// "8 forklift operators" all match. The role word is
// optional so we don't lose the bare-number form.
const countMatch = q.match(/\b(\d{1,3})\s+(?:\w+\s+){0,2}(?:workers?|operators?|drivers?|techs?|technicians?|welders?|electricians?|assemblers?|handlers?|loaders?|packagers?|associates?|leads?|people|hires?|staff)\b/i);
if (countMatch) {
const n = parseInt(countMatch[1], 10);
if (n >= 1 && n <= 200) {
topK = n;
understood.push(`headcount: ${n}`);
}
}
// (A) Zip code → city/state lookup. A coordinator types a zip
// because that's what the contract says. The previous parser
// saw "60607" and treated it as a stray number; results came
// back from any state. Map known metro zip prefixes here so
// the geographic constraint actually fires.
//
// Each entry: zip-prefix → { city, state }. Prefix-match
// covers a metro without enumerating every zip — e.g. "606"
// catches Chicago zips 60600-60699.
const zipPrefixMap: Array<[string, { city: string, state: string }]> = [
// Chicago + near-suburb
["606", { city: "Chicago", state: "IL" }],
["607", { city: "Chicago", state: "IL" }],
["608", { city: "Chicago", state: "IL" }],
// Indianapolis
["462", { city: "Indianapolis", state: "IN" }],
["461", { city: "Indianapolis", state: "IN" }],
// Fort Wayne
["468", { city: "Fort Wayne", state: "IN" }],
// Columbus OH
["432", { city: "Columbus", state: "OH" }],
["431", { city: "Columbus", state: "OH" }],
// Cleveland
["441", { city: "Cleveland", state: "OH" }],
// Cincinnati
["452", { city: "Cincinnati", state: "OH" }],
["451", { city: "Cincinnati", state: "OH" }],
// Dayton
["454", { city: "Dayton", state: "OH" }],
// Milwaukee
["532", { city: "Milwaukee", state: "WI" }],
["531", { city: "Milwaukee", state: "WI" }],
// Madison
["537", { city: "Madison", state: "WI" }],
// Detroit
["482", { city: "Detroit", state: "MI" }],
["481", { city: "Detroit", state: "MI" }],
// Grand Rapids
["495", { city: "Grand Rapids", state: "MI" }],
["493", { city: "Grand Rapids", state: "MI" }],
// Minneapolis / St. Paul
["554", { city: "Minneapolis", state: "MN" }],
["551", { city: "Minneapolis", state: "MN" }],
// Des Moines
["503", { city: "Des Moines", state: "IA" }],
// Kansas City MO
["641", { city: "Kansas City", state: "MO" }],
// St. Louis
["631", { city: "St. Louis", state: "MO" }],
// Nashville
["372", { city: "Nashville", state: "TN" }],
// Memphis
["381", { city: "Memphis", state: "TN" }],
// Knoxville
["379", { city: "Knoxville", state: "TN" }],
// Louisville
["402", { city: "Louisville", state: "KY" }],
// Lexington
["405", { city: "Lexington", state: "KY" }],
];
const zipMatch = q.match(/\b(\d{5})\b/);
let zipCity: { city: string, state: string } | null = null;
if (zipMatch) {
const z = zipMatch[1];
const hit = zipPrefixMap.find(([prefix]) => z.startsWith(prefix));
if (hit) {
zipCity = hit[1];
understood.push(`zip ${z}${hit[1].city}, ${hit[1].state}`);
}
}
// Extract role keywords (skip if dropdown picked one)
const roleKeywords: Record<string, string> = {
"warehouse": "warehouse", "forklift": "forklift", "welder": "weld", "assembler": "assembl",
"loader": "loader", "machine operator": "machine operator", "shipping": "shipping",
"quality": "quality", "maintenance": "maintenance", "production": "production",
"material handler": "material handler", "sanitation": "sanitation", "inventory": "inventory",
"line lead": "line lead", "electrician": "electric", "packaging": "packaging",
"tool and die": "tool", "logistics": "logistics", "safety": "safety", "cnc": "cnc",
};
if (explicitRole) {
filters.push(`LOWER(role) LIKE '%${explicitRole.toLowerCase().replace(/'/g, "''")}%'`);
understood.push(`role: ${explicitRole}`);
} else {
for (const [kw, sqlPart] of Object.entries(roleKeywords)) {
if (lower.includes(kw)) { filters.push(`LOWER(role) LIKE '%${sqlPart}%'`); understood.push(`role: ${kw}`); break; }
}
}
// Extract city
// Zip code wins over city-name parsing — it's more specific
// and the coordinator typed a number, not a casual mention.
if (zipCity) {
filters.push(`city = '${zipCity.city}'`);
understood.push(`city: ${zipCity.city}`);
} else {
const cities = ["chicago","springfield","rockford","peoria","joliet","indianapolis","fort wayne",
"evansville","south bend","columbus","cleveland","cincinnati","dayton","akron","toledo",
"st. louis","st louis","kansas city","nashville","memphis","knoxville","louisville","lexington",
"milwaukee","madison","detroit","grand rapids","lansing","des moines","minneapolis","terre haute",
"bloomington","decatur","mattoon","galesburg","danville","champaign"];
for (const city of cities) {
if (lower.includes(city)) {
const sqlCity = city.split(' ').map(w => w[0].toUpperCase() + w.slice(1)).join(' ');
filters.push(`city = '${sqlCity}'`);
understood.push(`city: ${sqlCity}`);
break;
}
}
}
// Extract state — dropdown wins; otherwise NL parse, but
// require either an explicit "in/from <STATE>" preposition
// OR an UPPERCASE 2-letter code, never a bare lowercase
// 2-letter token. Old regex matched "in" (preposition) as
// state IN (Indiana) because the /i flag made the standalone
// pattern case-insensitive — "forklift in IL" always returned
// Indiana workers.
const stateNames: Record<string, string> = {
"illinois":"IL","indiana":"IN","ohio":"OH","missouri":"MO","tennessee":"TN",
"kentucky":"KY","wisconsin":"WI","michigan":"MI","iowa":"IA","minnesota":"MN"
};
if (explicitState) {
if (!understood.some(u => u.startsWith('city'))) {
filters.push(`state = '${explicitState.replace(/'/g, "''")}'`);
understood.push(`state: ${explicitState}`);
}
} else {
const prepMatch = q.match(/\b(?:in|from)\s+(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/i);
const upperMatch = q.match(/\b(IL|IN|OH|MO|TN|KY|WI|MI|IA|MN)\b/); // no /i — must be uppercase
const stateMatch = prepMatch || upperMatch;
if (stateMatch && !understood.some(u => u.startsWith('city'))) {
filters.push(`state = '${stateMatch[1].toUpperCase()}'`);
understood.push(`state: ${stateMatch[1].toUpperCase()}`);
} else {
for (const [name, abbr] of Object.entries(stateNames)) {
if (lower.includes(name)) { filters.push(`state = '${abbr}'`); understood.push(`state: ${abbr}`); break; }
}
}
}
// Extract availability
if (/available|open|ready|today|now|immediate|asap|right away/i.test(lower)) {
filters.push("CAST(availability AS DOUBLE) > 0.5");
understood.push("available now");
}
// Extract reliability preference
if (/reliable|dependable|best|top|trusted|proven/i.test(lower)) {
filters[0] = "CAST(reliability AS DOUBLE) >= 0.8";
understood.push("high reliability");
}
const filterStr = filters.join(" AND ");
queries.push("Smart parse: " + (understood.length ? understood.join(", ") : "general search"));
queries.push("SQL filter: " + filterStr);
queries.push("Vector: semantic search for best skill match");
// Also run a direct SQL query to get exact counts and zip codes.
// LIMIT honors the parsed headcount (capped at 25 to keep the
// grid renderable; the staffer can ask for more).
const sqlFields = "name, role, city, state, zip, ROUND(CAST(reliability AS DOUBLE),2) rel, ROUND(CAST(availability AS DOUBLE),2) avail, skills, certifications, archetype";
const sqlLimit = Math.min(Math.max(topK, 5), 25);
const directSql = `SELECT ${sqlFields} FROM workers_500k WHERE ${filterStr} ORDER BY CAST(availability AS DOUBLE) DESC, CAST(reliability AS DOUBLE) DESC LIMIT ${sqlLimit}`;
// Derive role+geo for the pattern query so the meta-index
// surface lines up with what the user actually asked for.
// (G) When a staffer is acting, default the geo to their
// primary territory — their playbook view is shaped by
// where they actually fill, not the global Chicago/IL prior.
const roleForPatterns = understood.find(u => u.startsWith('role:'))?.split(': ')[1] || q;
const cityForPatterns = understood.find(u => u.startsWith('city:'))?.split(': ')[1]
|| staffer?.territory.cities[0] || 'Chicago';
const stateForPatterns = understood.find(u => u.startsWith('state:'))?.split(': ')[1]
|| staffer?.territory.state || 'IL';
const [searchR, directR, patternR] = await Promise.all([
api("POST", "/vectors/hybrid", {
question: q, index_name: "workers_500k_v1", sql_filter: filterStr,
filter_dataset: "ethereal_workers", id_column: "worker_id",
// Honor the parsed headcount (capped at 25 to keep the
// vector rerank from re-scoring more rows than render).
top_k: Math.min(Math.max(topK, 5), 25), generate: false,
// k=200 to catch compounding — direct measurement shows
// boost reliably fires only when ~all memory is scanned
// due to the narrow 0.55-0.67 cosine band in the 768d
// nomic-embed-text space. Brute force at 200 entries
// is sub-ms; no reason to underscan.
use_playbook_memory: true, playbook_memory_k: 200,
}),
api("POST", "/query/sql", { sql: directSql }),
api("POST", "/vectors/playbook_memory/patterns", {
query: `${roleForPatterns} in ${cityForPatterns}, ${stateForPatterns}`,
top_k_playbooks: 25, min_trait_frequency: 0.3,
}).catch(() => ({})),
]);
// Merge: use SQL results for structured data (zip, avail), vector for ranking
const sqlWorkers = directR.rows || [];
const vectorWorkers = (searchR.sources || []).map((s: any) => ({
doc_id: s.doc_id, score: s.score, text: s.chunk_text,
playbook_boost: s.playbook_boost || 0,
playbook_citations: s.playbook_citations || [],
}));
return ok({
type: "smart_search",
summary: `Found ${searchR.sql_matches || 0} workers matching your criteria${understood.length ? ' (' + understood.join(', ') + ')' : ''}`,
staffer: staffer ? { id: staffer.id, name: staffer.name, display: staffer.display, territory: staffer.territory } : null,
understood,
sql_results: sqlWorkers,
vector_results: vectorWorkers,
sql_matches: searchR.sql_matches,
queries_run: queries,
duration_ms: Date.now() - start,
// Meta-index signal — what similar past fills had in common.
// Non-empty when memory has ≥1 relevant playbook.
discovered_pattern: (patternR as any)?.discovered_pattern,
pattern_playbooks_matched: (patternR as any)?.matched_playbooks ?? 0,
});
}
}
activeTrace = null;
return err("Unknown path. Available: / /health /search /sql /match /worker/:id /ask /log /playbooks /profile/:id /vram /context /verify /simulation/run /console /intelligence/brief /intelligence/chat", 404);
} catch (e: any) {
if (activeTrace) { scoreTrace(activeTrace, "error", 0, e.message); }
activeTrace = null;
return err(e.message || String(e), 500);
} finally {
// Flush traces async — don't block the response
flushTraces().catch(() => {});
activeTrace = null;
}
},
});
console.error(`Lakehouse Agent Gateway :${PORT}${BASE}`);
}
main().catch(console.error);
// ─── Week simulation engine ───
const ROLES = ["Forklift Operator","Machine Operator","Assembler","Loader","Quality Tech","Welder","Sanitation Worker","Shipping Clerk","Production Worker","Maintenance Tech"];
const STATES = ["IL","IN","OH","MO","TN","KY","WI","MI"];
const CITIES: Record<string, string[]> = {
IL: ["Chicago","Springfield","Rockford","Peoria","Joliet"],
IN: ["Indianapolis","Fort Wayne","Evansville","South Bend"],
OH: ["Columbus","Cleveland","Cincinnati","Dayton"],
MO: ["St. Louis","Kansas City","Springfield"],
TN: ["Nashville","Memphis"], KY: ["Louisville","Lexington"],
WI: ["Milwaukee","Madison"], MI: ["Detroit","Grand Rapids"],
};
const CLIENT_PREFIXES = ["Midwest","Great Lakes","Prairie","Heartland","Summit","Valley","Central","Lakeside","Tri-State","Heritage","National","Premier","Metro","Capitol","Crossroads","Keystone","Riverfront","Gateway","Pinnacle","Cornerstone"];
const CLIENT_SUFFIXES = ["Logistics","Manufacturing","Assembly","Foods","Steel","Packaging","Health","Plastics","Energy","Solutions","Distribution","Services","Industries","Supply","Warehousing","Materials","Products","Corp","Group","Enterprises"];
function makeClient(): string { return pick(CLIENT_PREFIXES) + " " + pick(CLIENT_SUFFIXES); }
const STARTS = ["5:00 AM","6:00 AM","6:30 AM","7:00 AM","7:30 AM","8:00 AM"];
// Diverse scenarios — each tells a different story about WHY this contract exists
const SCENARIOS = [
// URGENT — real emergencies that need immediate action
{ priority: "urgent", weight: 8, note: "Worker walked off the job at 3 PM yesterday — client needs replacement by morning",
situation: "walkoff", action: "Replacement needed ASAP — previous worker quit mid-shift" },
{ priority: "urgent", weight: 5, note: "Client emailed at 11 PM — their regular crew has COVID exposure, entire team quarantined",
situation: "quarantine", action: "Full crew replacement — health emergency at job site" },
{ priority: "urgent", weight: 5, note: "2 no-shows this morning — client is short-staffed on the floor right now",
situation: "noshow", action: "Immediate backfill — client waiting on the phone" },
// HIGH — important but not crisis
{ priority: "high", weight: 10, note: "New contract starting Monday — client wants to meet workers this week",
situation: "new_client", action: "New client onboarding — first impression matters" },
{ priority: "high", weight: 8, note: "Client expanding to 2nd shift — need additional crew by next week",
situation: "expansion", action: "Growth opportunity — client adding a shift" },
{ priority: "high", weight: 6, note: "Worker's OSHA certification expires Friday — need certified replacement lined up",
situation: "cert_expiry", action: "Cert compliance — current worker can't continue without renewal" },
{ priority: "high", weight: 5, note: "Client requested specific workers back from last month's project",
situation: "client_request", action: "Client relationship — they asked for specific people" },
// MEDIUM — standard day-to-day operations
{ priority: "medium", weight: 15, note: "Ongoing weekly fill — same client, same role, reliable pipeline",
situation: "recurring", action: "Recurring contract — steady work" },
{ priority: "medium", weight: 12, note: "Seasonal uptick — warehouse volume increasing ahead of holidays",
situation: "seasonal", action: "Seasonal planning — volume ramping up" },
{ priority: "medium", weight: 10, note: "Backfill for worker on approved medical leave — returns in 3 weeks",
situation: "medical_leave", action: "Temporary coverage — worker returning soon" },
{ priority: "medium", weight: 8, note: "Client testing new role — wants to try 2 workers for a week before committing",
situation: "trial", action: "Trial placement — client evaluating the role" },
{ priority: "medium", weight: 6, note: "Cross-training opportunity — client wants workers who can learn a new skill",
situation: "cross_train", action: "Development opportunity — workers can learn new skills" },
// LOW — planning ahead
{ priority: "low", weight: 10, note: "Future fill — project starts in 2 weeks, gathering candidates now",
situation: "future", action: "Pipeline building — no rush, quality over speed" },
{ priority: "low", weight: 8, note: "Client exploring staffing options — not committed yet, just want to see who's available",
situation: "exploratory", action: "Exploratory — client shopping, impress them with quality" },
{ priority: "low", weight: 5, note: "Internal transfer — moving a worker from one site to another, need replacement at original",
situation: "transfer", action: "Planned transition — smooth handoff between sites" },
];
function pick<T>(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; }
// ─── Client-blacklist persistence (feature #2) ──────────────────────────
// Simple JSON file under mcp-server/data/. Synchronous writes are fine
// at the expected rate (a handful of blacklist adds per day).
const BLACKLIST_PATH = `${import.meta.dir}/data/client_blacklists.json`;
interface BlacklistEntry {
worker_id: string;
name: string;
reason: string;
added_at: string;
}
async function loadAllBlacklists(): Promise<Record<string, BlacklistEntry[]>> {
try {
const f = Bun.file(BLACKLIST_PATH);
if (!(await f.exists())) return {};
return await f.json() as Record<string, BlacklistEntry[]>;
} catch { return {}; }
}
async function saveAllBlacklists(all: Record<string, BlacklistEntry[]>): Promise<void> {
await Bun.write(BLACKLIST_PATH, JSON.stringify(all, null, 2));
}
async function loadClientBlacklist(client: string): Promise<BlacklistEntry[]> {
const all = await loadAllBlacklists();
return all[client] || [];
}
async function addToClientBlacklist(client: string, entry: BlacklistEntry): Promise<BlacklistEntry[]> {
const all = await loadAllBlacklists();
const list = all[client] || [];
// De-dupe: same worker_id replaces prior entry with fresher reason.
const filtered = list.filter(e => e.worker_id !== entry.worker_id);
filtered.push(entry);
all[client] = filtered;
await saveAllBlacklists(all);
return filtered;
}
async function removeFromClientBlacklist(client: string, worker_id: string): Promise<{ removed: boolean; total: number }> {
const all = await loadAllBlacklists();
const list = all[client] || [];
const filtered = list.filter(e => e.worker_id !== worker_id);
const removed = filtered.length < list.length;
all[client] = filtered;
await saveAllBlacklists(all);
return { removed, total: filtered.length };
}
// ─── Push daemon (alerts) ───────────────────────────────────────────────
// Background interval that detects notification-worthy events, assembles
// a digest, and dispatches to configured channels. Converts the app from
// "dashboard you visit" to "system that finds you" — essential for the
// phone-first shop that won't remember to open a URL.
const ALERTS_CFG_PATH = `${import.meta.dir}/data/notification_config.json`;
const ALERTS_STATE_PATH = `${import.meta.dir}/data/notification_state.json`;
const ALERTS_LOG_PATH = `${import.meta.dir}/data/notifications.jsonl`;
interface AlertsConfig {
enabled: boolean;
interval_minutes: number;
webhook_url?: string;
webhook_label?: string;
deadline_warn_days: number;
}
interface AlertsState {
last_run_at?: string;
last_forecast_by_role?: Record<string, { risk: string; coverage_pct: number; earliest_staffing_deadline: string }>;
last_playbook_entries?: number;
last_digest?: any;
}
async function loadAlertsConfig(): Promise<AlertsConfig> {
const f = Bun.file(ALERTS_CFG_PATH);
if (!(await f.exists())) {
return { enabled: true, interval_minutes: 15, deadline_warn_days: 7 };
}
try { return await f.json() as AlertsConfig; }
catch { return { enabled: true, interval_minutes: 15, deadline_warn_days: 7 }; }
}
async function saveAlertsConfig(c: AlertsConfig): Promise<void> {
await Bun.write(ALERTS_CFG_PATH, JSON.stringify(c, null, 2));
}
async function loadAlertsState(): Promise<AlertsState> {
const f = Bun.file(ALERTS_STATE_PATH);
if (!(await f.exists())) return {};
try { return await f.json() as AlertsState; } catch { return {}; }
}
async function saveAlertsState(s: AlertsState): Promise<void> {
await Bun.write(ALERTS_STATE_PATH, JSON.stringify(s, null, 2));
}
// Build a digest by diffing current state against last-observed state.
// Returns null if there's nothing worth sending.
async function buildDigest(): Promise<any | null> {
const cfg = await loadAlertsConfig();
const state = await loadAlertsState();
// Pull current snapshots in parallel. /intelligence/staffing_forecast
// is a BUN route (our localhost), not on the Rust gateway — reach it
// via in-process fetch. /vectors/playbook_memory/stats is on the
// gateway and gets there via api().
const bunPort = process.env.PORT || "3700";
const [forecast, memStats] = await Promise.all([
fetch(`http://localhost:${bunPort}/intelligence/staffing_forecast`, {
method: "POST", headers: { "Content-Type": "application/json" }, body: "{}"
}).then(r => r.json()).catch(() => null as any),
api("GET", "/vectors/playbook_memory/stats").catch(() => null as any),
]);
const events: any[] = [];
// Event: role risk status changed (new critical/tight)
const currentByRole: Record<string, any> = {};
const priorByRole = state.last_forecast_by_role || {};
if (forecast && Array.isArray(forecast.forecast)) {
for (const f of forecast.forecast) {
currentByRole[f.role] = {
risk: f.risk,
coverage_pct: f.coverage_pct,
earliest_staffing_deadline: f.earliest_staffing_deadline,
};
const prior = priorByRole[f.role];
const rank: Record<string, number> = { ok: 0, watch: 1, tight: 2, critical: 3 };
if (!prior || (rank[f.risk] ?? 0) > (rank[prior.risk] ?? 0)) {
// Risk got worse (or new role we haven't seen)
if (f.risk === "critical" || f.risk === "tight") {
events.push({
kind: "risk_escalation",
role: f.role,
risk: f.risk,
coverage_pct: f.coverage_pct,
demand: f.demand_workers,
available: f.bench_available,
prior_risk: prior?.risk ?? null,
});
}
}
// Event: staffing deadline within N days that wasn't there before
const d = f.days_to_deadline;
if (d !== undefined && d >= 0 && d <= cfg.deadline_warn_days) {
const priorD = prior?.earliest_staffing_deadline;
if (priorD !== f.earliest_staffing_deadline) {
events.push({
kind: "deadline_approaching",
role: f.role,
days_to_deadline: d,
date: f.earliest_staffing_deadline,
demand: f.demand_workers,
});
}
}
}
}
// Event: playbook memory grew significantly since last check
const nowEntries = memStats?.entries ?? 0;
const priorEntries = state.last_playbook_entries ?? 0;
const grewBy = nowEntries - priorEntries;
if (grewBy >= 5) {
events.push({
kind: "memory_growth",
new_entries: grewBy,
total_entries: nowEntries,
total_endorsed_names: memStats?.total_names_endorsed ?? 0,
});
}
// Only return a digest if there's something to say. First-ever run is
// a special case: surface the snapshot as a "welcome" digest.
const isFirstRun = !state.last_run_at;
if (events.length === 0 && !isFirstRun) return null;
const digest = {
generated_at: new Date().toISOString(),
is_first_run: isFirstRun,
events,
snapshot: {
forecast_roles: Object.keys(currentByRole).length,
critical: forecast?.critical_roles ?? 0,
tight: forecast?.tight_roles ?? 0,
playbook_entries: nowEntries,
permits_30d: forecast?.permit_count ?? 0,
construction_pipeline_usd: forecast?.total_cost ?? 0,
},
};
// Persist the updated state for next diff
await saveAlertsState({
last_run_at: digest.generated_at,
last_forecast_by_role: currentByRole,
last_playbook_entries: nowEntries,
last_digest: digest,
});
return digest;
}
function formatDigestText(d: any): string {
const lines: string[] = [];
lines.push(`LAKEHOUSE DIGEST — ${d.generated_at.slice(0, 16).replace("T", " ")}`);
lines.push("");
if (d.is_first_run) {
lines.push(`[initial snapshot] · ${d.snapshot.forecast_roles} roles tracked · `
+ `${d.snapshot.playbook_entries} playbooks in memory · `
+ `${d.snapshot.permits_30d} permits last 30d`);
lines.push("");
}
const risk = d.events.filter((e: any) => e.kind === "risk_escalation");
if (risk.length) {
lines.push(`${risk.length} role${risk.length !== 1 ? "s" : ""} escalated to ${risk.map((r: any) => r.risk).filter((v: string, i: number, a: string[]) => a.indexOf(v) === i).join("/")}:`);
for (const e of risk.slice(0, 5)) {
lines.push(`${e.role} — coverage ${e.coverage_pct}% (${e.available}/${e.demand})${e.prior_risk ? ` · was ${e.prior_risk}` : " · new"}`);
}
lines.push("");
}
const dead = d.events.filter((e: any) => e.kind === "deadline_approaching");
if (dead.length) {
lines.push(`${dead.length} staffing deadline${dead.length !== 1 ? "s" : ""} within window:`);
for (const e of dead.slice(0, 5)) {
lines.push(`${e.role}${e.days_to_deadline}d to ${e.date} · demand ${e.demand}`);
}
lines.push("");
}
const mem = d.events.filter((e: any) => e.kind === "memory_growth");
for (const e of mem) {
lines.push(`+${e.new_entries} new playbooks (total ${e.total_entries}, ${e.total_endorsed_names} endorsed names)`);
}
lines.push(`snapshot: ${d.snapshot.critical} critical · ${d.snapshot.tight} tight · `
+ `$${(d.snapshot.construction_pipeline_usd || 0).toLocaleString("en-US", { maximumFractionDigits: 0 })} pipeline`);
return lines.join("\n");
}
async function dispatchDigest(d: any, cfg: AlertsConfig): Promise<{ channels: string[]; errors: string[] }> {
const channels: string[] = [];
const errors: string[] = [];
const text = formatDigestText(d);
// Channel 1: console
console.log(`[alerts] ${text.split("\n").join(" | ")}`);
channels.push("console");
// Channel 2: JSONL file (always-on audit)
try {
await Bun.write(ALERTS_LOG_PATH,
(await Bun.file(ALERTS_LOG_PATH).exists() ? await Bun.file(ALERTS_LOG_PATH).text() : "")
+ JSON.stringify({ at: d.generated_at, text, digest: d }) + "\n"
);
channels.push("file");
} catch (e: any) { errors.push(`file: ${e.message}`); }
// Channel 3: webhook (opt-in)
if (cfg.webhook_url) {
try {
const r = await fetch(cfg.webhook_url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text, digest: d }),
});
if (r.ok) channels.push("webhook");
else errors.push(`webhook ${r.status}: ${(await r.text()).slice(0, 200)}`);
} catch (e: any) { errors.push(`webhook: ${e.message}`); }
}
return { channels, errors };
}
// Background daemon — kicked off once on module init. Guard via a
// globalThis sentinel so the startAlertsDaemon() call from near the
// top of the file (before this block evaluates) doesn't hit a temporal
// dead zone on a let/const binding.
async function startAlertsDaemon() {
const g = globalThis as any;
if (g.__lakehouse_alerts_armed) return;
g.__lakehouse_alerts_armed = true;
const cfg = await loadAlertsConfig();
if (!cfg.enabled) {
console.log("[alerts] daemon disabled via config");
return;
}
const ms = Math.max(60, cfg.interval_minutes * 60) * 1000;
console.log(`[alerts] daemon armed · interval ${cfg.interval_minutes}min · webhook ${cfg.webhook_url ? "configured" : "disabled"}`);
// Fire once shortly after startup, then on interval.
setTimeout(runAlertsOnce, 10_000);
setInterval(runAlertsOnce, ms);
}
async function runAlertsOnce() {
try {
const cfg = await loadAlertsConfig();
if (!cfg.enabled) return;
const d = await buildDigest();
if (!d) return;
await dispatchDigest(d, cfg);
} catch (e: any) {
console.error(`[alerts] cycle error: ${e.message}`);
}
}
// Seed playbook_memory from a filled contract so the next hybrid query
// ranks against it. Used by both runWeekSimulation (per-day) and the /log
// endpoint (per manual logging). Fail-soft — seeding is best-effort.
// ─── Sample CSV generator ───────────────────────────────────────────────
// Fresh randomized staffing roster per request. Prevents the "upload
// same file twice and it's a no-op" problem from the static sample,
// and makes the dashboard numbers visibly update after onboarding.
const SAMPLE_FIRST_NAMES = [
"Sarah","Michael","Maria","David","Jennifer","Robert","Amanda","Carlos",
"Kim","James","Priya","Thomas","Lisa","Brandon","Emily","Marcus","Anita",
"Dmitri","Rachel","Samuel","Jordan","Natalia","Henry","Ava","Tyler",
"Hannah","Luis","Aisha","Victor","Monica","Derek","Yuki","Fatima","Kwame",
"Isabel","Rafael","Elena","Hiroshi","Nadia","Oscar","Sofia","Anders",
"Leila","Jamal","Chioma","Pavel","Bianca","Tariq","Inez","Reuben","Mira",
];
const SAMPLE_LAST_NAMES = [
"Johnson","Chen","Rodriguez","Park","Lopez","Williams","Taylor","Mendoza",
"Nguyen","O'Brien","Patel","Anderson","Nakamura","Moore","Zhang","Brooks",
"Volkov","Kim","Thompson","Martinez","Soto","Robinson","Clark","Hayes",
"Reyes","Brown","Wright","Diaz","Powell","Green","Castillo","Iwu",
"Kowalski","Lindström","Oyelaran","Saitō","Abebe","Mehta","Blanchard",
];
const SAMPLE_ROLES = [
"Forklift Operator","Welder","Warehouse Associate","Machine Operator",
"Loader","Maintenance Tech","Quality Tech","Electrician","Line Lead",
"Material Handler","Production Worker","Assembler","Shipping Clerk",
];
const SAMPLE_CITY_STATE: Array<[string, string]> = [
["Chicago","IL"],["Springfield","IL"],["Rockford","IL"],["Peoria","IL"],
["Indianapolis","IN"],["Fort Wayne","IN"],["Evansville","IN"],["South Bend","IN"],
["Columbus","OH"],["Cleveland","OH"],["Cincinnati","OH"],["Toledo","OH"],
["St. Louis","MO"],["Kansas City","MO"],["Springfield","MO"],
["Nashville","TN"],["Memphis","TN"],["Knoxville","TN"],
["Louisville","KY"],["Lexington","KY"],
["Milwaukee","WI"],["Madison","WI"],["Green Bay","WI"],
["Detroit","MI"],["Grand Rapids","MI"],["Lansing","MI"],
];
const SAMPLE_SKILL_POOLS: Record<string, string[]> = {
"Forklift Operator": ["pallet jack","hazmat","loading dock","overhead crane","cold storage","shipping","team lead"],
"Welder": ["TIG","MIG","pipe welding","blueprint reading","grinder","confined space"],
"Warehouse Associate": ["inventory","RF scanner","pick-to-light","Excel","packaging","team lead"],
"Machine Operator": ["CNC","SPC","gauge R&R","lean manufacturing","conveyor ops","first article"],
"Loader": ["loading dock","team lead","cold storage","first aid","bilingual"],
"Maintenance Tech": ["electrical","PLC","hydraulics","CMMS","LOTO","troubleshooting"],
"Quality Tech": ["ISO 9001","calibration","root cause analysis","SPC","Six Sigma"],
"Electrician": ["conduit","motor controls","troubleshooting","PLC","NEC"],
"Line Lead": ["team lead","training","SPC","scheduling"],
"Material Handler": ["RF scanner","pallet jack","receiving","packaging"],
"Production Worker": ["line work","first article","labeling","packaging","quality inspection"],
"Assembler": ["assembly","gauge R&R","line lead","first article"],
"Shipping Clerk": ["shipping","receiving","RF scanner","bilingual"],
};
const SAMPLE_CERT_POOL = ["OSHA-10","OSHA-30","Forklift","Hazmat","First Aid","LOTO","Confined Space","AWS D1.1","ServSafe","Six Sigma Green"];
const SAMPLE_ARCHETYPES = ["reliable","specialist","leader","communicator","flexible"];
function pick<T>(arr: T[]): T { return arr[Math.floor(Math.random() * arr.length)]; }
function pickN<T>(arr: T[], n: number): T[] {
const copy = arr.slice();
const out: T[] = [];
for (let i = 0; i < n && copy.length > 0; i++) {
out.push(copy.splice(Math.floor(Math.random() * copy.length), 1)[0]);
}
return out;
}
function csvEscape(s: string): string {
if (s.indexOf(",") >= 0 || s.indexOf('"') >= 0 || s.indexOf("\n") >= 0) {
return `"${s.replace(/"/g, '""')}"`;
}
return s;
}
function generateSampleRosterCSV(): string {
const count = 120 + Math.floor(Math.random() * 61); // 120-180
const ts = Date.now();
const lines: string[] = [
"worker_id,name,role,city,state,email,phone,skills,certifications,availability,reliability,archetype",
];
for (let i = 0; i < count; i++) {
const first = pick(SAMPLE_FIRST_NAMES);
const last = pick(SAMPLE_LAST_NAMES);
const name = `${first} ${last}`;
const role = pick(SAMPLE_ROLES);
const [city, state] = pick(SAMPLE_CITY_STATE);
const handle = `${first}.${last}`.toLowerCase().replace(/[^a-z\.]/g, "");
const email = `${handle}${Math.floor(Math.random() * 1000)}@example.com`;
const area = ["312","773","630","708","331","815","217","219","260","614","216","513","419","314","816","615","901","502","414","608","313","616"][Math.floor(Math.random() * 22)];
const phone = `(${area}) 555-${String(1000 + Math.floor(Math.random() * 9000))}`;
const skillPool = SAMPLE_SKILL_POOLS[role] || ["general"];
const skills = pickN(skillPool, 2 + Math.floor(Math.random() * 3)).join("|");
const certs = pickN(SAMPLE_CERT_POOL, 1 + Math.floor(Math.random() * 3)).join("|");
const availability = (0.3 + Math.random() * 0.69).toFixed(2);
const reliability = (0.55 + Math.random() * 0.44).toFixed(2);
const archetype = pick(SAMPLE_ARCHETYPES);
lines.push([
`W-${ts}-${String(i).padStart(4, "0")}`,
csvEscape(name),
csvEscape(role),
csvEscape(city),
state,
email,
phone,
csvEscape(skills),
csvEscape(certs),
availability,
reliability,
archetype,
].join(","));
}
return lines.join("\n") + "\n";
}
// ─── Rate/margin awareness ──────────────────────────────────────────────
// Derive implied pay and bill rates per worker / per contract without
// schema changes. Numbers are industry heuristics — a real deployment
// would replace these with the client's actual ATS pay_rate column and
// contract bill_rate. The shape stays the same; only the source changes.
const ROLE_BASE_PAY_RATE: Record<string, number> = {
"Electrician": 28,
"Welder": 26,
"Machine Operator": 24,
"Maintenance Tech": 26,
"Forklift Operator": 20,
"Loader": 17,
"Warehouse Associate": 17,
"Material Handler": 18,
"Production Worker": 18,
"Quality Tech": 23,
"Line Lead": 22,
"Assembler": 18,
"Shipping Clerk": 19,
};
const DEFAULT_BASE_PAY = 19;
// Staffing firm typically marks up pay to bill by 35-45% to cover
// overhead, insurance, and margin. Using 40% as the midpoint.
const BILL_MARKUP = 1.4;
function impliedPayRate(w: { role?: string | null; reliability?: number | string | null; archetype?: string | null }): number {
const role = w.role || "";
const base = ROLE_BASE_PAY_RATE[role] ?? DEFAULT_BASE_PAY;
const rel = typeof w.reliability === "string" ? parseFloat(w.reliability) : (w.reliability ?? 0.5);
const relBump = (isFinite(rel) ? rel : 0.5) * 4;
const arch = (w.archetype || "").toLowerCase();
const archBump = arch === "specialist" ? 4 : arch === "leader" ? 3 : arch === "reliable" ? 1 : 0;
return Math.round((base + relBump + archBump) * 100) / 100;
}
function impliedBillRate(role: string | null | undefined): number {
const base = ROLE_BASE_PAY_RATE[role || ""] ?? DEFAULT_BASE_PAY;
// Contract bill rate = base pay × markup. This is what a staffing firm
// would typically quote for this role — the worker's rate has to be
// below this to keep margin.
return Math.round((base * BILL_MARKUP) * 100) / 100;
}
// Parse a worker's role / reliability / archetype from a vector chunk
// shaped like "Name — Role in City, ST. Skills: ... . Certs: ... .
// Archetype: reliable. Reliability: 0.93, Availability: 0.73"
function parseWorkerChunk(chunk: string): { role?: string; reliability?: number; archetype?: string } {
if (!chunk) return {};
const out: any = {};
const roleMatch = chunk.match(/—\s*([^\.]+?)\s+in\s+/);
if (roleMatch) out.role = roleMatch[1].trim();
const relMatch = chunk.match(/Reliability:\s*([\d\.]+)/i);
if (relMatch) out.reliability = parseFloat(relMatch[1]);
const archMatch = chunk.match(/Archetype:\s*([A-Za-z]+)/i);
if (archMatch) out.archetype = archMatch[1];
return out;
}
// Attach implied_pay_rate to each hybrid source in place, using either
// the row's native fields (from sql_results) or parsed from chunk_text.
function enrichWithRates(sources: any[]): void {
for (const s of sources || []) {
const parsed = parseWorkerChunk(s.chunk_text || "");
const w = {
role: s.role ?? parsed.role,
reliability: s.reliability ?? s.rel ?? parsed.reliability,
archetype: s.archetype ?? s.arch ?? parsed.archetype,
};
s.implied_pay_rate = impliedPayRate(w);
}
}
async function seedPlaybookFromContract(c: any) {
const names = (c.matches || []).slice(0, 5)
.map((m: any) => m.name || m.doc_id)
.filter((n: string) => n && !n.startsWith("W500-"));
if (!names.length) return;
const op = `fill: ${c.role} x${c.headcount} in ${c.city}, ${c.state}`;
try {
await api("POST", "/vectors/playbook_memory/seed", {
operation: op,
approach: `${c.situation || c.priority || "fill"} → hybrid search`,
context: `client=${c.client || ""} start=${c.start || ""}`,
endorsed_names: names,
append: true,
});
} catch {}
}
async function runWeekSimulation() {
const days = ["Monday","Tuesday","Wednesday","Thursday","Friday"];
const staffers = ["Sarah (Lead)","Mike (Senior)","Kim (Junior)"];
const results: any[] = [];
let totalFilled = 0, totalNeeded = 0, emergencies = 0, handoffs = 0, playbookEntries = 0;
for (let d = 0; d < days.length; d++) {
const dayLabel = days[d];
const numContracts = 4 + Math.floor(Math.random() * 5); // 4-8 per day
const contracts: any[] = [];
const staffer = staffers[d % staffers.length];
const handoffTo = staffers[(d + 1) % staffers.length];
for (let c = 0; c < numContracts; c++) {
const state = pick(STATES);
const city = pick(CITIES[state] || [state]);
const role = pick(ROLES);
// Weighted scenario selection
const totalWeight = SCENARIOS.reduce((s, sc) => s + sc.weight, 0);
let r = Math.random() * totalWeight;
let scenario = SCENARIOS[0];
for (const sc of SCENARIOS) { r -= sc.weight; if (r <= 0) { scenario = sc; break; } }
const priority = scenario.priority;
const headcount = priority === "urgent" ? 3 + Math.floor(Math.random() * 4) :
priority === "high" ? 2 + Math.floor(Math.random() * 3) :
priority === "medium" ? 2 + Math.floor(Math.random() * 3) :
1 + Math.floor(Math.random() * 2);
const minRel = priority === "urgent" ? 0.6 : priority === "high" ? 0.75 : 0.8;
const cid = `W${d+1}-${String(c+1).padStart(3,"0")}`;
if (priority === "urgent") emergencies++;
totalNeeded += headcount;
// Run hybrid search — Phase 19: boost on so past playbooks shape ranking
let filled = 0;
let matches: any[] = [];
try {
const filt = `role = '${role}' AND state = '${state}' AND reliability >= ${minRel}`;
const r = await api("POST", "/vectors/hybrid", {
question: `Find ${role} workers in ${city}, ${state} for ${scenario.situation}`,
index_name: "workers_500k_v1",
sql_filter: filt,
filter_dataset: "ethereal_workers",
id_column: "worker_id",
top_k: headcount + 2,
generate: false,
use_playbook_memory: true,
});
matches = (r.sources || []).slice(0, headcount).map((s: any) => ({
doc_id: s.doc_id,
name: s.chunk_text?.split("—")[0]?.trim() || s.doc_id,
score: s.score,
chunk_text: s.chunk_text || "",
playbook_boost: s.playbook_boost || 0,
playbook_citations: s.playbook_citations || [],
}));
filled = matches.length;
} catch {}
totalFilled += Math.min(filled, headcount);
contracts.push({
id: cid, client: makeClient(), role, state, city,
headcount, filled: Math.min(filled, headcount), priority,
start: pick(STARTS), notes: scenario.note, situation: scenario.situation,
action: scenario.action, matches,
staffer, handoff_to: d < 4 ? handoffTo : null,
});
}
// End of day: seed playbook_memory with TODAY's filled contracts so
// tomorrow's hybrid search ranks against them. This is the in-week
// feedback loop — without this, day 5 doesn't benefit from day 1.
for (const c of contracts) {
if (c.matches && c.matches.length) {
await seedPlaybookFromContract(c).catch(() => {});
}
}
if (d < 4) {
handoffs++;
try {
await api("POST", "/api/ingest/file?name=successful_playbooks", null); // just trigger
} catch {}
}
playbookEntries++;
results.push({
label: dayLabel,
staffer,
handoff_to: d < 4 ? handoffTo : null,
contracts,
filled: contracts.reduce((s: number, c: any) => s + c.filled, 0),
needed: contracts.reduce((s: number, c: any) => s + c.headcount, 0),
});
}
const summary = {
total_contracts: results.reduce((s, d) => s + d.contracts.length, 0),
total_needed: totalNeeded,
total_filled: totalFilled,
fill_pct: Math.round(totalFilled / Math.max(totalNeeded, 1) * 100),
emergencies,
handoffs,
playbook_entries: playbookEntries,
};
// BUG FIX 2026-04-20: previously this POSTed a multi-row CSV to
// /ingest/file?name=successful_playbooks at end of every simulation.
// That endpoint REPLACES the dataset's object list — so each
// /simulation/run wiped the prior simulation's rows. The SQL
// successful_playbooks table was never accumulating; it always reflected
// only the most-recent simulation batch.
//
// Per-day per-contract seeding via /vectors/playbook_memory/seed
// (added Pass 1, runs inside the day loop above) is the path that
// actually accumulates feedback. The SQL successful_playbooks table is
// intentionally not written by /simulation/run anymore until a proper
// append surface exists.
return { days: results, summary };
}
// Kick off the push/alerts daemon once per process. Placed at the END of
// the module so all const/let declarations in the alerts block (paths,
// helpers, etc.) have evaluated before the daemon reads them. Calling
// from earlier in the file would hit a temporal dead zone on these
// bindings.
startAlertsDaemon().catch(e => console.error(`[alerts] startup error: ${e.message}`));