From 6e39d8778f4f5fcbf029170d0a694a54ed401820 Mon Sep 17 00:00:00 2001 From: profit Date: Wed, 22 Apr 2026 17:22:42 -0500 Subject: [PATCH] Cohesion: Python inventory + integration plan + Phase A verdict indexing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three artifacts in one PR: 1. docs/PYTHON_INVENTORY.md — every .py file in the repo classified: Production (sidecar routers + 3 systemd services), Documented (kb_measure, kb_staffer_report), Manual (one-off tools), Dead (sidecar/sidecar/lab_ui.py + pipeline_lab.py are genuinely not imported anywhere). 2. docs/COHESION_INTEGRATION_PLAN.md — the "smarter DB" loop J called out as missing. Six phases A-F. Phase A ships here; B-F are named + sequenced for follow-up PRs. Each phase adds ONE wire of the loop; no single PR does them all. 3. Phase A wire (auditor verdicts → observer + KB): - auditor/audit.ts: after assembleVerdict, fire-and-forget POST to :3800/event with source="auditor" AND append to data/_kb/outcomes.jsonl with kind="audit". Errors log + drop — the verdict is still on disk at _auditor/verdicts/. - mcp-server/observer.ts: extend source union to include "auditor" | "bot" (was "mcp" | "scenario" only, which silently coerced my first auditor POST to source="scenario"). Accept body.ok OR body.success. Accept body.audit_duration_ms as a fallback for duration_ms. Uses body.one_liner as output_summary when set. Live-verified after observer restart: re-audit PR #6 → verdict=request_changes, 4 findings (1 warn) observer: by_source={'auditor': 1} (previously coerced to 'scenario') _kb/outcomes.jsonl tail: kind=audit sig=pr6-7fe47bab pr=6 overall=request_changes The shape of the loop is now visible to downstream consumers. Phase B (auditor's kb_query check reads these audit rows for history) lands in a follow-up PR. Phase C-F similar. NOT in this PR: - Actually deleting lab_ui.py + pipeline_lab.py (operator decision, called out in the inventory doc) - Cleaning up the 5 overlapping Python scripts (same) - Phases B-F of the cohesion plan (separate PRs per wire) - Integration test that asserts "smarter DB" across runs (Phase F) Co-Authored-By: Claude Opus 4.7 (1M context) --- auditor/audit.ts | 72 +++++++++++++++++ docs/COHESION_INTEGRATION_PLAN.md | 126 ++++++++++++++++++++++++++++++ docs/PYTHON_INVENTORY.md | 75 ++++++++++++++++++ mcp-server/observer.ts | 31 ++++++-- 4 files changed, 298 insertions(+), 6 deletions(-) create mode 100644 docs/COHESION_INTEGRATION_PLAN.md create mode 100644 docs/PYTHON_INVENTORY.md diff --git a/auditor/audit.ts b/auditor/audit.ts index d626373..779e11e 100644 --- a/auditor/audit.ts +++ b/auditor/audit.ts @@ -80,6 +80,16 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise< await persistVerdict(verdict); + // Phase A of the cohesion plan (docs/COHESION_INTEGRATION_PLAN.md): + // make every audit verdict visible to the observer + KB. Enables + // future Phase B (kb_query sees prior audit history) without a + // separate backfill. Fire-and-forget: observer/KB failures don't + // block the Gitea post. + indexVerdictToObserver(verdict).catch(e => + console.error(`[auditor] observer indexing failed: ${(e as Error).message}`)); + appendVerdictToKbOutcomes(verdict).catch(e => + console.error(`[auditor] kb outcomes append failed: ${(e as Error).message}`)); + if (!opts.dry_run) { await postToGitea(verdict); } @@ -87,6 +97,68 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise< return verdict; } +// Phase A — verdict indexing. +// +// Two destinations, both append-only + non-blocking: +// 1. observer :3800/event — ring buffer + data/_observer/ops.jsonl +// 2. data/_kb/outcomes.jsonl — same file scenarios write to, with +// kind:"audit" so readers can filter +// +// Errors log + drop. The verdict is still on disk at +// _auditor/verdicts/{pr}-{sha}.json; observer + KB are a convenience +// surface, not a source of truth. + +const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800"; +const KB_OUTCOMES = "/home/profit/lakehouse/data/_kb/outcomes.jsonl"; + +async function indexVerdictToObserver(v: Verdict): Promise { + const payload = { + source: "auditor", + event_kind: "audit", + ok: v.overall === "approve", + sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`, + pr_number: v.pr_number, + head_sha: v.head_sha, + overall: v.overall, + one_liner: v.one_liner, + findings_block: v.metrics.findings_block, + findings_warn: v.metrics.findings_warn, + audit_duration_ms: v.metrics.audit_duration_ms, + audited_at: v.audited_at, + }; + const r = await fetch(`${OBSERVER_URL}/event`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(3000), + }); + if (!r.ok) throw new Error(`observer ${r.status}: ${await r.text()}`); +} + +async function appendVerdictToKbOutcomes(v: Verdict): Promise { + const { appendFile, mkdir } = await import("node:fs/promises"); + const { dirname } = await import("node:path"); + await mkdir(dirname(KB_OUTCOMES), { recursive: true }); + const row = { + kind: "audit", + sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`, + audited_at: v.audited_at, + pr_number: v.pr_number, + head_sha: v.head_sha, + overall: v.overall, + one_liner: v.one_liner, + ok_events: v.overall === "approve" ? 1 : 0, + total_events: 1, + findings: { + block: v.metrics.findings_block, + warn: v.metrics.findings_warn, + info: v.metrics.findings_info, + }, + elapsed_secs: (v.metrics.audit_duration_ms ?? 0) / 1000, + }; + await appendFile(KB_OUTCOMES, JSON.stringify(row) + "\n"); +} + async function persistVerdict(v: Verdict): Promise { await mkdir(VERDICTS_DIR, { recursive: true }); const filename = `${v.pr_number}-${v.head_sha.slice(0, 12)}.json`; diff --git a/docs/COHESION_INTEGRATION_PLAN.md b/docs/COHESION_INTEGRATION_PLAN.md new file mode 100644 index 0000000..bfc2ca1 --- /dev/null +++ b/docs/COHESION_INTEGRATION_PLAN.md @@ -0,0 +1,126 @@ +# Cohesion integration plan — the "smarter DB" loop + +**Written:** 2026-04-22, after J flagged that the system has good parts but they don't compose into the self-improving loop promised in the control-plane thesis (`project_control_plane_thesis.md` memory: 0→85% via hyperfocus-then-escalate). + +## The gap + +Each piece works in isolation. What's not wired: + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ observer │──✓──│ data/_kb/ │──?──│ auditor │ +│ :3800 │ │ outcomes. │ │ kb_query │ +│ │ │ jsonl │ │ check │ +└─────────────┘ └─────────────┘ └─────────────┘ + ▲ │ + │? ▼ +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ auditor │─────│ data/_audit │ │ cloud │ +│ verdicts │ │or/verdicts/ │ │ inference │ +│ │ │ │ │ │ +└─────────────┘ └─────────────┘ └─────────────┘ + │? + ▼ + ┌─────────────┐ + │ hybrid_srch │ + │ playbook_mem│ + │ context7 │ + └─────────────┘ +``` + +- **✓** solid: observer captures scenarios, writes `_kb/outcomes.jsonl`. +- **?** missing: + - auditor verdicts go to `_auditor/verdicts/` — not to observer or KB + - auditor kb_query reads `_kb/outcomes.jsonl` but doesn't query hybrid_search, playbook_memory, or context7 + - auditor inference sends claim+diff to cloud — but WITHOUT KB neighbors, WITHOUT drift context, WITHOUT tool-aware retrieval +- Result: every audit is stateless-ish. The DB doesn't get smarter across audits. + +## The target loop + +``` +PR opened + ↓ +auditor fetches diff + claims + ↓ +auditor enriches with: + — KB neighbors: past verdicts on similar claim-hashes + — hybrid_search: playbooks for task classes the diff touches + — context7 drift status: for any tool named in commit messages + — MCP tools: agent-style queries if relevant + ↓ +cloud inference sees: diff + claims + enrichment + ↓ +verdict posted to Gitea + ↓ +verdict ALSO persisted to: + — observer :3800/event (source=auditor) + — _kb/outcomes.jsonl (kind=audit, sig_hash=pr+sha) + ↓ +next audit on similar PR: + — KB neighbors find this prior verdict + — cloud sees "similar PRs ended in request_changes for reason X" + — verdict is more calibrated +``` + +## Phases of closure + +Building this in order from least-invasive: + +### Phase A — Verdict indexing (shipped in this PR) + +After every `auditPr()` completes, in addition to persisting to `_auditor/verdicts/`: + +1. POST the verdict to observer `:3800/event` with `source: "auditor"`, `ok: verdict === "approve"`, `event_kind: "audit"`, `sig_hash: `. +2. Append a simplified outcome to `data/_kb/outcomes.jsonl` with a special `kind: "audit"` row so the KB surface treats audits alongside scenarios. + +Minimal surgery. Doesn't change the verdict itself. Just makes it visible to downstream consumers. + +**Test:** after this PR lands, re-run the auditor once; observer stats should show `by_source.auditor > 0`; `_kb/outcomes.jsonl` should have one new row per audited SHA. + +### Phase B — KB query sees auditor history + +Extend `auditor/checks/kb_query.ts` to: + +1. Read `_kb/outcomes.jsonl`, filter `kind === "audit"`, find prior audits with matching `sig_hash` (same PR across SHAs) or similar claim hashes. +2. Emit finding: "prior N audits on this PR ended in [approve, block, request_changes], last reason was X." + +Gives the auditor memory across re-audits of the same PR. + +### Phase C — Hybrid search in kb_query + +When a PR touches `crates/vectord/src/playbook_memory.rs` (task-class matching), auditor calls `POST /vectors/hybrid` to find playbooks semantically related to the diff. Surfaces as "N playbooks in production rely on this code path, consider backward-compat." + +Requires: +- Task-class extractor from diff paths +- Cloud-free hybrid_search call (we have local Ollama for embeddings) + +### Phase D — Context7 drift awareness in auditor + +If the diff / commit message names a tool that's in any playbook's `doc_refs`, auditor calls the context7 bridge to check current drift status. Surfaces as "tool X has drifted since N playbooks referenced it; this PR may need to update those." + +### Phase E — Inference sees enrichment + +The cloud inference check currently sends `diff + claims`. Extend to send `diff + claims + kb_neighbors + drift_context + hybrid_search_matches`. The prompt becomes context-rich — exactly what makes the cloud model competent (per the control-plane thesis). + +### Phase F — Full integration test + +An auditor self-test that: + +1. Creates a synthetic PR with known-good and known-bad claims +2. Runs the enriched auditor +3. Asserts the verdict found the planted issues +4. Runs a second identical-claim PR +5. Asserts the SECOND verdict references the FIRST audit (via KB neighbor retrieval) +6. "Smarter DB" proof: two runs, measurable context gain. + +## Sequence + +Phase A lands in this PR alongside the inventory. Phases B-F are follow-up PRs, each with their own auditor gate. Order matters: A → B (reads A's output) → C+D (in parallel) → E (consumes B/C/D) → F (asserts E's behavior). + +## Not in this plan (deliberate) + +- **Rewriting the auditor to use a different architecture.** Current 4-check model stays; we just enrich the checks. +- **Tuning cloud inference precision.** The false-positive rate is a prompt-engineering concern; this plan is about context enrichment, which is separate. +- **Branch protection enforcement.** Stays off until Phase F passes. + +The overall bet: this is the "putting it all together coherently" J said was a real problem. Six phases over however many PRs it takes. Each one ships one wire of the loop; no single PR tries to do them all. diff --git a/docs/PYTHON_INVENTORY.md b/docs/PYTHON_INVENTORY.md new file mode 100644 index 0000000..c2a9671 --- /dev/null +++ b/docs/PYTHON_INVENTORY.md @@ -0,0 +1,75 @@ +# Python inventory — what's wired vs dead + +Generated 2026-04-22. Enumerates every `.py` file in the repo and classifies each by actual integration status. Goal: surface the "ambiguous names that don't hook in" J flagged. + +## Classification + +- **🟢 Production** — running as a systemd service, or imported by code that does +- **🟡 Documented** — referenced in `docs/PRD.md` / `docs/PHASES.md` / `docs/CONTROL_PLANE_PRD.md` or called from a shell/TS orchestrator +- **🟠 Manual** — runnable as a one-off utility but not in any automated pipeline +- **🔴 Dead** — not imported, not referenced, not called anywhere + +## Results + +### Sidecar (`sidecar/sidecar/`) + +The Python FastAPI sidecar on `:3200`. Runs as `lakehouse-sidecar.service`. + +| File | Status | Evidence | +|---|---|---| +| `main.py` | 🟢 Production | systemd ExecStart: `uvicorn sidecar.sidecar.main:app` | +| `admin.py` | 🟢 Production | imported by `main.py` as `admin_router` | +| `embed.py` | 🟢 Production | imported by `main.py` as `embed_router` | +| `generate.py` | 🟢 Production | imported by `main.py` as `generate_router` | +| `rerank.py` | 🟢 Production | imported by `main.py` as `rerank_router` | +| `ollama.py` | 🟢 Production | imported transitively by embed/generate/rerank | +| `__init__.py` | 🟢 Production | package marker | +| `lab_ui.py` | 🔴 **Dead** | **not imported by any file in `sidecar/`. `scripts/serve_lab.py` only uses stdlib — doesn't import this.** Mtime 2026-04-16 (pre-session). Safe to delete. | +| `pipeline_lab.py` | 🔴 **Dead** | **same — not imported anywhere. Mtime 2026-04-16.** Docstring says "iterative embedding/LLM pipeline experimentation" — looks like an abandoned prototype. Safe to delete. | + +### Scripts (`scripts/`) + +17 files. Only **2** are production (behind systemd), **3** are documented in phase/PRD docs, **12** are unreferenced. + +| File | Status | Evidence | +|---|---|---| +| `serve_ui.py` | 🟢 Production | `lakehouse-ui.service` ExecStart | +| `serve_imagegen.py` | 🟢 Production | `imagegen.service` ExecStart | +| `serve_lab.py` | 🟢 Production | `pipeline-lab.service` ExecStart | +| `kb_measure.py` | 🟡 Documented | referenced in `docs/PHASES.md` (Phase 22 aggregator) | +| `kb_staffer_report.py` | 🟡 Documented | referenced in `docs/PHASES.md` + `docs/PRD.md` (Phase 23), also called by `scripts/run_staffer_demo.sh` | +| `autonomous_agent.py` | 🟠 Manual | "Autonomous stress-test agent" — no docs refs, no callers. Mtime 2026-04-17. | +| `copilot.py` | 🟠 Manual | "Staffing Co-Pilot — the anticipation layer" — no docs refs. Writes briefing to `/tmp/copilot_briefing.json` on run. | +| `generate_demo.py` | 🟠 Manual | "realistic demo datasets" — mtime 2026-03-27, never touched this session | +| `generate_workers.py` | 🟠 Manual | "worker profiles at scale" — usage example in docstring: `python3 generate_workers.py 100000 > /tmp/workers_100k.csv`. Still useful, just not auto-run. | +| `lance_tune.py` | 🟠 Manual | "IVF_PQ parameter sweep" — one-shot experiment tool. | +| `qwen3_plan.py` | 🟠 Manual | "Qwen 3 agent plan — structured test with playbook building" | +| `quality_eval.py` | 🟠 Manual | "Quality evaluation pipeline — tests whether the system gives CORRECT" | +| `scale_test.py` | 🟠 Manual | called by `scripts/scale_10m_test.sh` (not in systemd). 2.5M row test. | +| `staffing_day.py` | 🟠 Manual | "Real-world staffing agency day simulation" — no docs refs. Overlaps conceptually with `tests/multi-agent/scenario.ts`. | +| `staffing_demo.py` | 🟠 Manual | "Realistic staffing company data generator" — demo fixture. | +| `staffing_simulation.py` | 🟠 Manual | "multi-agent stress test" — overlaps with `tests/multi-agent/scenario.ts`. | +| `stress_test.py` | 🟠 Manual | "prove this architecture is sound or find where it breaks" — overlaps with `tests/multi-agent/run_stress.ts`. | + +## Overlap with TypeScript / Rust equivalents + +Several Python scripts overlap conceptually with newer TS/Rust code. This is the "ambiguous names that don't hook in" pattern — the Python originals were superseded but not removed. + +| Python script | Newer equivalent | Integration drift | +|---|---|---| +| `scripts/staffing_simulation.py` | `tests/multi-agent/scenario.ts` | scenario.ts ships + observer-integrated (Phase 24); Python version is unwired. | +| `scripts/staffing_day.py` | `tests/multi-agent/scenario.ts` | same. | +| `scripts/stress_test.py` | `tests/multi-agent/run_stress.ts` | TS version is the one the auditor fixture exercises. | +| `scripts/autonomous_agent.py` | `bot/` + `auditor/` Bun sub-agents | new Bun pattern supersedes. Python version predates the control-plane pivot. | +| `scripts/qwen3_plan.py` | `tests/multi-agent/agent.ts` | TS agent is the one with Phase 21 continuation + cloud routing. | + +**Recommended cleanup actions (not in this PR — separate decision):** + +1. **Delete the two confirmed dead sidecar files:** `sidecar/sidecar/lab_ui.py`, `sidecar/sidecar/pipeline_lab.py`. Nothing calls them; removal is safe. +2. **Archive the 5 overlapping scripts** (staffing_simulation, staffing_day, stress_test, autonomous_agent, qwen3_plan) — either move to `scripts/legacy/` with a README explaining they predate the TS versions, or delete if J confirms no future use. +3. **Keep the one-off utilities** (generate_demo, generate_workers, lance_tune, quality_eval, copilot, scale_test, staffing_demo) — useful as manual tools. Maybe add a `scripts/README.md` documenting what each does and when to run. +4. **Promote `kb_measure.py` and `kb_staffer_report.py`** to be actually callable from the KB flow — they're documented but only run manually after scenario batches. + +## Honest next step + +The cleanup above isn't this session's work — it's a subsequent PR. This doc lands first so the state is visible and operator-auditable. diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index 13a2be2..c41fa1e 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -37,7 +37,11 @@ interface ObservedOp { // Phase 24 — optional provenance so error analyzer and playbook // builder can differentiate MCP-layer ops from scenario-sourced // events. Scenarios set source="scenario" + staffer_id + sig_hash. - source?: "mcp" | "scenario"; + // Phase A (cohesion plan 2026-04-22) — auditor verdicts also post + // to :3800/event with source="auditor"; stats endpoint counts them + // separately so KB readers can see audit cadence alongside + // scenario cadence. + source?: "mcp" | "scenario" | "auditor" | "bot"; staffer_id?: string; sig_hash?: string; event_kind?: string; @@ -253,15 +257,30 @@ function startHttpListener() { } if (req.method === "POST" && url.pathname === "/event") { return req.json().then((body: any) => { + // Accept caller-provided source when present and valid. + // Unrecognized sources fall back to "scenario" for + // backward-compat with older callers. + const allowedSource = ["mcp", "scenario", "auditor", "bot"] as const; + const bodySrc = typeof body.source === "string" ? body.source : ""; + const source = (allowedSource as readonly string[]).includes(bodySrc) + ? (bodySrc as typeof allowedSource[number]) + : "scenario"; + // Phase A: auditor+bot POST with ok=true|false (not success) + // for symmetry with the kind_of of their domain. Accept either. + const success = typeof body.success === "boolean" ? body.success + : typeof body.ok === "boolean" ? body.ok + : false; const op: ObservedOp = { timestamp: body.timestamp ?? new Date().toISOString(), - endpoint: body.endpoint ?? "scenario:fill", + endpoint: body.endpoint ?? (source === "auditor" ? "auditor:verdict" + : source === "bot" ? "bot:cycle" + : "scenario:fill"), input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`, - success: !!body.success, - duration_ms: Number(body.duration_ms ?? 0), - output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")), + success, + duration_ms: Number(body.duration_ms ?? body.audit_duration_ms ?? 0), + output_summary: body.output_summary ?? body.one_liner ?? (success ? "ok" : (body.error ?? "failed")), error: body.error, - source: "scenario", + source, staffer_id: body.staffer_id, sig_hash: body.sig_hash, event_kind: body.event_kind,