diff --git a/scripts/analyze_chicago_contracts.ts b/scripts/analyze_chicago_contracts.ts new file mode 100644 index 0000000..8438bc4 --- /dev/null +++ b/scripts/analyze_chicago_contracts.ts @@ -0,0 +1,258 @@ +#!/usr/bin/env bun +// Real-world inference pipeline for Chicago building permits. +// Uses the unified matrix retriever (chicago_permits + entity_brief + +// sec_tickers + llm_team_runs + distilled_procedural) to enrich a +// Grok 4.1 fast analysis. Observer hand-reviews each result. +// +// First true USE of the matrix architecture on real ingested data — +// not the scrum self-improvement loop, the staffing intelligence loop. +// +// Usage: +// bun run scripts/analyze_chicago_contracts.ts [N] +// N = number of permits to analyze (default 5) + +const GATEWAY = process.env.LAKEHOUSE_URL ?? "http://localhost:3100"; +const OBSERVER = process.env.LH_OBSERVER_URL ?? "http://localhost:3800"; +const RAW_BUCKET = "raw"; +const MC_ALIAS = "local"; +const STAGE_DIR = "/tmp/chicago_analyze"; +const OUTPUT = "/home/profit/lakehouse/data/_kb/contract_analyses.jsonl"; + +const CONTRACT_CORPORA = [ + "chicago_permits_v1", + "entity_brief_v1", + "sec_tickers_v1", + "llm_team_runs_v1", + "llm_team_response_cache_v1", + "distilled_procedural_v20260423102847", +]; + +interface Permit { + permit_?: string; + permit_type?: string; + permit_status?: string; + work_description?: string; + reported_cost?: string | number; + contact_1?: any; + contact_2?: any; + contact_3_name?: string; + street_number?: string; + street_direction?: string; + street_name?: string; + suffix?: string; + issue_date?: string; + community_area?: string; + ward?: string; + [k: string]: any; +} + +interface MatrixHit { + source_corpus: string; + score: number; + doc_id: string; + text: string; +} + +function log(msg: string) { console.log(`[contract ${new Date().toISOString().slice(11,19)}] ${msg}`); } + +async function fetchPermits(n: number): Promise { + const fs = await import("node:fs/promises"); + await fs.mkdir(STAGE_DIR, { recursive: true }); + const local = `${STAGE_DIR}/permits.json`; + const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/chicago/permits_2026-04-25.json`, local]); + await proc.exited; + const all: Permit[] = JSON.parse(await Bun.file(local).text()); + // Pick high-cost permits with named contractors — most interesting for staffing analysis + const meaningful = all.filter(p => + p.reported_cost && Number(p.reported_cost) >= 100000 && + (p.contact_1 || p.contact_2) + ); + log(`raw permits: ${all.length} · meaningful (cost >= $100k + has contractor): ${meaningful.length}`); + // Sample evenly across the meaningful set + const sampled: Permit[] = []; + const stride = Math.max(1, Math.floor(meaningful.length / n)); + for (let i = 0; i < meaningful.length && sampled.length < n; i += stride) { + sampled.push(meaningful[i]); + } + return sampled; +} + +function permitToText(p: Permit): string { + const addr = `${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.replace(/\s+/g, " ").trim(); + const c1 = typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? p.contact_1?.contact_name ?? ""); + const c2 = typeof p.contact_2 === "string" ? p.contact_2 : (p.contact_2?.name ?? ""); + return [ + `Chicago Building Permit ${p.permit_ ?? "?"}`, + `Type: ${p.permit_type ?? "?"} · Status: ${p.permit_status ?? "?"}`, + `Address: ${addr} · Community ${p.community_area ?? "?"} · Ward ${p.ward ?? "?"}`, + `Issued: ${p.issue_date ?? "?"}`, + `Reported cost: $${Number(p.reported_cost ?? 0).toLocaleString()}`, + `Primary contractor: ${c1 || "unknown"}`, + c2 ? `Secondary: ${c2}` : "", + `Owner: ${p.contact_3_name ?? "?"}`, + `Work description: ${(p.work_description ?? "").slice(0, 800)}`, + ].filter(Boolean).join("\n"); +} + +async function fetchMatrixHits(query: string): Promise<{ hits: MatrixHit[]; by_corpus: Record; latency_ms: number }> { + const t0 = Date.now(); + const all: MatrixHit[] = []; + const byCorpus: Record = {}; + await Promise.all(CONTRACT_CORPORA.map(async (idx) => { + try { + const r = await fetch(`${GATEWAY}/vectors/search`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ index_name: idx, query, top_k: 3 }), + signal: AbortSignal.timeout(15000), + }); + if (!r.ok) { byCorpus[idx] = -1; return; } + const data: any = await r.json(); + const results = data.results ?? []; + byCorpus[idx] = results.length; + for (const h of results) { + all.push({ + source_corpus: idx, + score: Number(h.score ?? 0), + doc_id: String(h.doc_id ?? "?"), + text: String(h.chunk_text ?? "").slice(0, 400), + }); + } + } catch { byCorpus[idx] = -1; } + })); + all.sort((a, b) => b.score - a.score); + return { hits: all.slice(0, 10), by_corpus: byCorpus, latency_ms: Date.now() - t0 }; +} + +function buildMatrixPreamble(hits: MatrixHit[]): string { + if (hits.length === 0) return ""; + const lines = [ + `═══ 📖 MATRIX CONTEXT — ${hits.length} relevant hits across the knowledge base ═══`, + "Reference material from prior contractor data, SEC tickers, LLM team analyses, and distilled procedures. Use as evidence; do NOT invent.", + "", + ]; + for (let i = 0; i < hits.length; i++) { + const h = hits[i]; + lines.push(`[${i + 1}] ${h.source_corpus} (score=${h.score.toFixed(2)}, doc=${h.doc_id}): ${h.text.replace(/\s+/g, " ").trim()}`); + } + lines.push("═══"); + lines.push(""); + return lines.join("\n"); +} + +async function chat(model: string, prompt: string): Promise<{ content: string; error?: string }> { + try { + const r = await fetch(`${GATEWAY}/v1/chat`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + provider: "openrouter", + model, + messages: [{ role: "user", content: prompt }], + max_tokens: 1500, + temperature: 0.1, + }), + signal: AbortSignal.timeout(90000), + }); + if (!r.ok) return { content: "", error: `HTTP ${r.status}: ${(await r.text()).slice(0, 200)}` }; + const j: any = await r.json(); + return { content: j.choices?.[0]?.message?.content ?? "" }; + } catch (e: any) { return { content: "", error: e.message }; } +} + +async function observerReview(input: { permit_id: string; model: string; response: string; permit_text: string }): Promise<{ verdict: string; confidence: number; notes: string; source: string }> { + try { + const r = await fetch(`${OBSERVER}/review`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + file_path: `chicago_permit/${input.permit_id}`, + model: input.model, + response: input.response, + source_content: input.permit_text, + grounding_stats: { total: 0, grounded: 0, groundedPct: null }, + attempt: 1, + }), + signal: AbortSignal.timeout(90000), + }); + if (!r.ok) return { verdict: "accept", confidence: 50, notes: `observer ${r.status}`, source: "fallthrough" }; + return await r.json(); + } catch (e: any) { return { verdict: "accept", confidence: 50, notes: `observer error: ${e.message}`, source: "fallthrough" }; } +} + +async function analyzeOne(p: Permit, idx: number, total: number): Promise { + const permit_id = p.permit_ ?? `unknown_${idx}`; + const t0 = Date.now(); + log(`══ permit ${idx + 1}/${total} · ${permit_id} · type=${p.permit_type} · cost=$${Number(p.reported_cost ?? 0).toLocaleString()}`); + const permitText = permitToText(p); + + // Build matrix query: combine type + work description + contractor name for retrieval anchoring + const c1 = typeof p.contact_1 === "string" ? p.contact_1 : (p.contact_1?.name ?? ""); + const matrixQuery = `${p.permit_type ?? ""} ${(p.work_description ?? "").slice(0, 300)} ${c1}`; + const matrix = await fetchMatrixHits(matrixQuery); + const corporaSummary = Object.entries(matrix.by_corpus).map(([k, v]) => `${k.split("_v")[0]}=${v}`).join(" "); + log(` 📖 matrix: ${matrix.hits.length} hits in ${matrix.latency_ms}ms · ${corporaSummary}`); + + const preamble = buildMatrixPreamble(matrix.hits); + const task = `${preamble}You are a staffing-intelligence analyst reviewing a real Chicago building permit. Using the MATRIX CONTEXT above as evidence, produce a structured analysis: + +PERMIT: +${permitText} + +Produce a markdown analysis with: +1. **Permit summary** — 2 sentences on what this is +2. **Contractor signal** — what we know about the named contractor(s) from matrix context (cite [N] hits). If unknown, say so. +3. **Staffing fit** — what trades/headcount/skills this permit implies +4. **Risk flags** — anything in matrix context that suggests caution (debarment, prior incidents, low-quality history). If none, say so. +5. **Opportunity score** — 0-100 with one-sentence rationale + +Cite matrix hits as [N] inline. If matrix has no relevant hits, say "no matrix evidence" — do NOT invent contractor history.`; + + const resp = await chat("x-ai/grok-4.1-fast", task); + if (resp.error) { + log(` ✗ chat error: ${resp.error.slice(0, 100)}`); + return { permit_id, ok: false, error: resp.error, ts: new Date().toISOString() }; + } + log(` ✓ analysis ${resp.content.length} chars`); + + const verdict = await observerReview({ permit_id, model: "openrouter/x-ai/grok-4.1-fast", response: resp.content, permit_text: permitText }); + log(` observer: ${verdict.verdict} (conf=${verdict.confidence}, src=${verdict.source})`); + + return { + permit_id, ok: true, + permit_type: p.permit_type, cost: Number(p.reported_cost ?? 0), + contractor: c1, matrix_hits: matrix.hits.length, matrix_corpora: matrix.by_corpus, matrix_ms: matrix.latency_ms, + analysis: resp.content, + observer_verdict: verdict.verdict, observer_conf: verdict.confidence, observer_notes: verdict.notes, observer_src: verdict.source, + duration_ms: Date.now() - t0, ts: new Date().toISOString(), + }; +} + +async function main() { + const n = Number(process.argv[2] ?? 5); + log(`fetching ${n} permits from raw bucket...`); + const permits = await fetchPermits(n); + log(`analyzing ${permits.length} permits sequentially...`); + + const fs = await import("node:fs/promises"); + const { appendFile } = fs; + const results: any[] = []; + for (let i = 0; i < permits.length; i++) { + const r = await analyzeOne(permits[i], i, permits.length); + results.push(r); + await appendFile(OUTPUT, JSON.stringify(r) + "\n"); + } + + log(`\n══ SUMMARY ══`); + const ok = results.filter(r => r.ok).length; + const accepted = results.filter(r => r.observer_verdict === "accept").length; + const cycled = results.filter(r => r.observer_verdict === "cycle").length; + const rejected = results.filter(r => r.observer_verdict === "reject").length; + const avgHits = results.reduce((a, r) => a + (r.matrix_hits ?? 0), 0) / Math.max(1, results.length); + log(` permits analyzed: ${ok}/${results.length}`); + log(` observer: accept=${accepted} cycle=${cycled} reject=${rejected}`); + log(` avg matrix hits per permit: ${avgHits.toFixed(1)}`); + log(` output: ${OUTPUT}`); +} + +main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); }); diff --git a/scripts/dump_raw_corpus.sh b/scripts/dump_raw_corpus.sh new file mode 100755 index 0000000..eb1d2f0 --- /dev/null +++ b/scripts/dump_raw_corpus.sh @@ -0,0 +1,95 @@ +#!/bin/bash +# One-shot dump of all testing data into the `raw` MinIO bucket. +# Persistent test corpus so we don't re-extract every run. +# +# Layout: +# raw/ +# staffing/ — workers_500k.parquet, resumes.parquet +# entities/ — entities.jsonl, sec_company_tickers.json +# llm_team/ — *.jsonl extracts from knowledge_base PG tables +# chicago/ — permits_.json (last 30 days) +# MANIFEST.json — documents what's here + when + +set -euo pipefail + +REPO=/home/profit/lakehouse +BUCKET=raw +ALIAS=local +STAGE=$(mktemp -d /tmp/raw_dump.XXXXX) +trap 'rm -rf "$STAGE"' EXIT +DATE=$(date -u +%Y-%m-%d) + +log() { echo "[dump $(date -u +%H:%M:%S)] $*"; } + +log "creating bucket ${ALIAS}/${BUCKET} (idempotent)" +mc mb --ignore-existing ${ALIAS}/${BUCKET} + +# ─── 1. STAFFING ─── +log "staffing/ — workers_500k.parquet (323 MB) + resumes.parquet" +mc cp -q ${REPO}/data/datasets/workers_500k.parquet ${ALIAS}/${BUCKET}/staffing/workers_500k.parquet +mc cp -q ${REPO}/data/datasets/resumes.parquet ${ALIAS}/${BUCKET}/staffing/resumes.parquet + +# ─── 2. ENTITIES + SEC + GEO ─── +log "entities/ — contractor entities cache + SEC tickers + svep + tif districts" +mc cp -q ${REPO}/data/_entity_cache/entities.jsonl ${ALIAS}/${BUCKET}/entities/entities.jsonl +mc cp -q ${REPO}/data/_entity_cache/sec_company_tickers.json ${ALIAS}/${BUCKET}/sec/company_tickers.json +mc cp -q ${REPO}/data/_entity_cache/svep_log.json ${ALIAS}/${BUCKET}/entities/svep_log.json +mc cp -q ${REPO}/data/_entity_cache/tif_districts.geojson ${ALIAS}/${BUCKET}/chicago/tif_districts.geojson + +# ─── 3. LLM TEAM HISTORY (Postgres → JSONL → S3) ─── +log "llm_team/ — extracting from knowledge_base PG tables" +LLM_TABLES=(team_runs pipeline_runs lab_experiments lab_trials meta_pipelines meta_runs conversations response_cache memory_entries adaptive_runs) +for tbl in "${LLM_TABLES[@]}"; do + out=${STAGE}/${tbl}.jsonl + rows=$(sudo -u postgres psql -d knowledge_base -At -c "SELECT COUNT(*) FROM ${tbl};" 2>/dev/null || echo 0) + if [ "$rows" -eq 0 ]; then + log " · ${tbl}: 0 rows, skipping" + continue + fi + sudo -u postgres psql -d knowledge_base -At -c "COPY (SELECT row_to_json(t) FROM ${tbl} t) TO STDOUT;" > "$out" 2>/dev/null + size=$(du -h "$out" | awk '{print $1}') + log " · ${tbl}: ${rows} rows (${size})" + mc cp -q "$out" ${ALIAS}/${BUCKET}/llm_team/${tbl}.jsonl +done + +# ─── 4. CHICAGO PERMITS (last 30 days, paginated) ─── +log "chicago/ — pulling last 30 days of permits from data.cityofchicago.org" +since=$(date -u -d '30 days ago' +%Y-%m-%d) +out=${STAGE}/permits_${DATE}.json +url="https://data.cityofchicago.org/resource/ydr8-5enu.json?\$where=issue_date%3E='${since}'&\$limit=10000&\$order=issue_date%20DESC" +if curl -sf --max-time 60 "$url" -o "$out"; then + count=$(python3 -c "import json; print(len(json.load(open('${out}'))))") + size=$(du -h "$out" | awk '{print $1}') + log " · permits since ${since}: ${count} records (${size})" + mc cp -q "$out" ${ALIAS}/${BUCKET}/chicago/permits_${DATE}.json +else + log " · WARN: chicago permits fetch failed; skipping" +fi + +# ─── 5. MANIFEST ─── +log "writing MANIFEST.json" +manifest=${STAGE}/MANIFEST.json +python3 - < Doc[]; +} + +// Spawn mc to copy from S3 → local stage so we can read it +async function fetchFromRaw(key: string): Promise { + const fs = await import("node:fs/promises"); + await fs.mkdir(STAGE_DIR, { recursive: true }); + const local = `${STAGE_DIR}/${key.replace(/\//g, "_")}`; + const proc = Bun.spawn(["mc", "cp", "-q", `${MC_ALIAS}/${RAW_BUCKET}/${key}`, local]); + await proc.exited; + if (proc.exitCode !== 0) throw new Error(`mc cp failed for ${key}`); + return local; +} + +async function readJsonl(path: string): Promise { + const text = await Bun.file(path).text(); + return text.split("\n").filter(l => l.trim()).map(l => JSON.parse(l)); +} + +function truncate(s: string, n = 4000): string { + return s == null ? "" : (s.length > n ? s.slice(0, n) : s); +} + +// ─── EXTRACTORS — one per source ─── +// Each shapes raw rows into {id, text} for the gateway's chunker. + +function extractChicagoPermits(raw: string): Doc[] { + const arr = JSON.parse(raw); + return arr.map((p: any, i: number) => { + const text = [ + `Permit ${p.permit_ ?? p.permit_number ?? `unknown_${i}`}`, + `Type: ${p.permit_type ?? "?"} Status: ${p.permit_status ?? "?"}`, + `Address: ${p.street_number ?? ""} ${p.street_direction ?? ""} ${p.street_name ?? ""} ${p.suffix ?? ""}`.trim(), + `Issued: ${p.issue_date ?? "?"} Applied: ${p.application_start_date ?? "?"}`, + `Work: ${truncate(p.work_description ?? "", 800)}`, + `Estimated cost: ${p.reported_cost ?? p.estimated_cost ?? "?"}`, + `Contractors: ${p.contact_1 ?? ""} | ${p.contact_2 ?? ""}`, + `Owner: ${p.contact_3_name ?? ""} (${p.contact_3_type ?? ""})`, + `Subtypes: ${p.subtotal_paid ?? ""} community area=${p.community_area ?? ""} ward=${p.ward ?? ""}`, + ].filter(Boolean).join("\n"); + return { id: `permit_${p.permit_ ?? p.id ?? i}`, text }; + }); +} + +function extractEntities(raw: string): Doc[] { + return raw.split("\n").filter(l => l.trim()).map((line, i) => { + try { + const e = JSON.parse(line); + const name = e.normalized_name ?? e.name ?? e.display_name ?? `entity_${i}`; + const text = [ + `Entity: ${name}`, + `Display: ${e.display_name ?? name}`, + e.ticker ? `Ticker: ${e.ticker}` : "", + e.cik ? `CIK: ${e.cik}` : "", + e.aliases ? `Aliases: ${(e.aliases ?? []).join(", ")}` : "", + e.last_seen ? `Last seen: ${e.last_seen}` : "", + e.notes ? `Notes: ${truncate(JSON.stringify(e.notes), 600)}` : "", + `Raw: ${truncate(JSON.stringify(e), 1500)}`, + ].filter(Boolean).join("\n"); + return { id: `entity_${name}_${i}`, text }; + } catch { + return { id: `entity_${i}`, text: line.slice(0, 1000) }; + } + }); +} + +function extractSecTickers(raw: string): Doc[] { + // SEC tickers JSON: {"_fetched_at": ..., "rows": {"0": {cik_str, ticker, title}, ...}} + const obj = JSON.parse(raw); + // The actual rows are under .rows; fall back to top-level if no wrapper. + const rows = obj.rows ?? obj; + return Object.values(rows) + .filter((r: any) => r && typeof r === "object" && r.ticker) + .map((row: any, i: number) => ({ + id: `sec_${row.ticker ?? i}`, + text: `Ticker: ${row.ticker}\nCompany: ${row.title ?? "?"}\nCIK: ${row.cik_str ?? "?"}`, + })); +} + +function extractLlmTeamRuns(raw: string): Doc[] { + return raw.split("\n").filter(l => l.trim()).map((line, i) => { + try { + const r = JSON.parse(line); + const text = [ + `Team run ${r.id ?? i}`, + `Mode: ${r.mode ?? "?"} Created: ${r.created_at ?? "?"}`, + r.prompt ? `Prompt: ${truncate(r.prompt, 1200)}` : "", + r.input ? `Input: ${truncate(typeof r.input === "string" ? r.input : JSON.stringify(r.input), 1200)}` : "", + r.output ? `Output: ${truncate(typeof r.output === "string" ? r.output : JSON.stringify(r.output), 2000)}` : "", + r.result ? `Result: ${truncate(typeof r.result === "string" ? r.result : JSON.stringify(r.result), 2000)}` : "", + r.metadata ? `Meta: ${truncate(JSON.stringify(r.metadata), 600)}` : "", + ].filter(Boolean).join("\n"); + return { id: `team_run_${r.id ?? i}`, text }; + } catch { + return { id: `team_run_${i}`, text: line.slice(0, 2000) }; + } + }); +} + +function extractLlmTeamResponseCache(raw: string): Doc[] { + return raw.split("\n").filter(l => l.trim()).map((line, i) => { + try { + const r = JSON.parse(line); + const text = [ + `Cached response ${r.cache_key ?? r.id ?? i}`, + `Created: ${r.created_at ?? "?"}`, + r.prompt ? `Prompt: ${truncate(r.prompt, 1500)}` : "", + r.response ? `Response: ${truncate(r.response, 2500)}` : "", + r.model ? `Model: ${r.model}` : "", + ].filter(Boolean).join("\n"); + return { id: `resp_${r.cache_key ?? r.id ?? i}`, text }; + } catch { + return { id: `resp_${i}`, text: line.slice(0, 2000) }; + } + }); +} + +const SOURCES: SourceSpec[] = [ + { name: "chicago", index_name: "chicago_permits_v1", s3_key: "chicago/permits_2026-04-25.json", + source_label: "chicago_permits", chunk_size: 600, overlap: 80, extractor: extractChicagoPermits }, + { name: "entities", index_name: "entity_brief_v1", s3_key: "entities/entities.jsonl", + source_label: "entity_brief", chunk_size: 500, overlap: 60, extractor: extractEntities }, + { name: "sec", index_name: "sec_tickers_v1", s3_key: "sec/company_tickers.json", + source_label: "sec_tickers", chunk_size: 200, overlap: 20, extractor: extractSecTickers }, + { name: "llm_team_runs", index_name: "llm_team_runs_v1", s3_key: "llm_team/team_runs.jsonl", + source_label: "llm_team_runs", chunk_size: 800, overlap: 100, extractor: extractLlmTeamRuns }, + { name: "llm_team_response", index_name: "llm_team_response_cache_v1", s3_key: "llm_team/response_cache.jsonl", + source_label: "llm_team_response_cache", chunk_size: 800, overlap: 100, extractor: extractLlmTeamResponseCache }, +]; + +async function vectorizeOne(spec: SourceSpec): Promise<{ ok: boolean; chunks: number; job_id?: string; err?: string }> { + const t0 = Date.now(); + console.log(`\n━━━ ${spec.name} → ${spec.index_name} ━━━`); + console.log(`fetching s3://${RAW_BUCKET}/${spec.s3_key}`); + + let local: string; + try { local = await fetchFromRaw(spec.s3_key); } + catch (e: any) { return { ok: false, chunks: 0, err: `fetch: ${e.message}` }; } + + console.log(`reading + extracting...`); + const raw = await Bun.file(local).text(); + const docs = spec.extractor(raw); + if (docs.length === 0) return { ok: false, chunks: 0, err: "0 docs after extraction" }; + console.log(` ${docs.length} docs (avg ${Math.round(docs.reduce((a, d) => a + d.text.length, 0) / docs.length)} chars)`); + + console.log(`POST /vectors/index ${spec.index_name} ...`); + const resp = await fetch(`${GATEWAY}/vectors/index`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + index_name: spec.index_name, + source: spec.source_label, + documents: docs, + chunk_size: spec.chunk_size, + overlap: spec.overlap, + }), + signal: AbortSignal.timeout(300000), + }); + + if (!resp.ok) { + const body = await resp.text(); + return { ok: false, chunks: 0, err: `HTTP ${resp.status}: ${body.slice(0, 300)}` }; + } + const j: any = await resp.json(); + const ms = Date.now() - t0; + console.log(` ✓ submitted: job=${j.job_id} chunks=${j.chunks} (extract+submit ${(ms/1000).toFixed(1)}s)`); + return { ok: true, chunks: j.chunks, job_id: j.job_id }; +} + +async function pollJob(jobId: string): Promise<{ status: string; processed: number; total: number }> { + const r = await fetch(`${GATEWAY}/vectors/jobs/${jobId}`, { signal: AbortSignal.timeout(5000) }); + if (!r.ok) return { status: "unknown", processed: 0, total: 0 }; + const j: any = await r.json(); + return { status: j.status ?? "?", processed: j.processed ?? 0, total: j.total ?? 0 }; +} + +async function waitForJob(jobId: string, label: string, maxSec = 600): Promise { + const t0 = Date.now(); + let lastLog = 0; + while ((Date.now() - t0) / 1000 < maxSec) { + const s = await pollJob(jobId); + if (s.status === "complete" || s.status === "completed" || s.status === "done") { + console.log(` ✓ ${label} job ${jobId.slice(0,8)} complete (${s.processed}/${s.total} in ${((Date.now()-t0)/1000).toFixed(0)}s)`); + return; + } + if (s.status === "failed" || s.status === "error") { + console.log(` ✗ ${label} job ${jobId.slice(0,8)} failed at ${s.processed}/${s.total}`); + return; + } + if (Date.now() - lastLog > 15000) { + console.log(` · ${label} progress ${s.processed}/${s.total} (${s.status})`); + lastLog = Date.now(); + } + await new Promise(r => setTimeout(r, 3000)); + } + console.log(` ⚠ ${label} job ${jobId.slice(0,8)} still running after ${maxSec}s — leaving in background`); +} + +async function main() { + const args = process.argv.slice(2); + const targets = args.length > 0 ? SOURCES.filter(s => args.includes(s.name)) : SOURCES; + console.log(`Vectorizing ${targets.length} source(s): ${targets.map(t => t.name).join(", ")}`); + + const results: Array<{ name: string; result: any }> = []; + for (const spec of targets) { + try { + const r = await vectorizeOne(spec); + if (r.ok && r.job_id) await waitForJob(r.job_id, spec.name); + results.push({ name: spec.name, result: r }); + } catch (e: any) { + console.error(`! ${spec.name}: ${e.message}`); + results.push({ name: spec.name, result: { ok: false, err: e.message } }); + } + } + + console.log(`\n━━━ SUMMARY ━━━`); + for (const { name, result } of results) { + console.log(` ${result.ok ? "✓" : "✗"} ${name.padEnd(20)} chunks=${result.chunks ?? 0} ${result.err ? `err=${result.err}` : ""}`); + } +} + +main().catch(e => { console.error(`FATAL: ${e.message}`); process.exit(1); }); diff --git a/tests/real-world/scrum_master_pipeline.ts b/tests/real-world/scrum_master_pipeline.ts index e230218..d4b16d6 100644 --- a/tests/real-world/scrum_master_pipeline.ts +++ b/tests/real-world/scrum_master_pipeline.ts @@ -448,12 +448,35 @@ async function fetchProvenApproaches( // to indexes that actually contain code-review-relevant context — staffing // data (workers_500k_*, resumes_*) is excluded by design. const MATRIX_CORPORA_FOR_TASK: Record = { + // Code review task — distilled facts/procedures/hints from prior reviews + // plus team-run history. Staffing data deliberately excluded. scrum_review: [ "distilled_factual_v20260423095819", "distilled_procedural_v20260423102847", "distilled_config_hint_v20260423102847", "kb_team_runs_v1", ], + // Chicago contract / permit analysis — pulls actual permits + contractor + // entities + SEC tickers + LLM team historical reasoning + lake-house + // distilled procedures (which encode prior task patterns). + contract_analysis: [ + "chicago_permits_v1", + "entity_brief_v1", + "sec_tickers_v1", + "llm_team_runs_v1", + "llm_team_response_cache_v1", + "distilled_procedural_v20260423102847", + ], + // Staffing inference — workers data + entity briefs + Chicago permit + // demand signal + LLM team patterns. workers_500k_v8 is the most + // recent dense version. + staffing_inference: [ + "workers_500k_v8", + "entity_brief_v1", + "chicago_permits_v1", + "llm_team_runs_v1", + "distilled_procedural_v20260423102847", + ], }; interface MatrixHit {