Compare commits

...

1 Commits

Author SHA1 Message Date
profit
6e39d8778f Cohesion: Python inventory + integration plan + Phase A verdict indexing
All checks were successful
lakehouse/auditor all checks passed (3 findings, all info)
Three artifacts in one PR:

1. docs/PYTHON_INVENTORY.md — every .py file in the repo classified:
   Production (sidecar routers + 3 systemd services), Documented
   (kb_measure, kb_staffer_report), Manual (one-off tools), Dead
   (sidecar/sidecar/lab_ui.py + pipeline_lab.py are genuinely
   not imported anywhere).

2. docs/COHESION_INTEGRATION_PLAN.md — the "smarter DB" loop J
   called out as missing. Six phases A-F. Phase A ships here; B-F
   are named + sequenced for follow-up PRs. Each phase adds ONE
   wire of the loop; no single PR does them all.

3. Phase A wire (auditor verdicts → observer + KB):
   - auditor/audit.ts: after assembleVerdict, fire-and-forget POST
     to :3800/event with source="auditor" AND append to
     data/_kb/outcomes.jsonl with kind="audit". Errors log + drop
     — the verdict is still on disk at _auditor/verdicts/.
   - mcp-server/observer.ts: extend source union to include
     "auditor" | "bot" (was "mcp" | "scenario" only, which silently
     coerced my first auditor POST to source="scenario"). Accept
     body.ok OR body.success. Accept body.audit_duration_ms as a
     fallback for duration_ms. Uses body.one_liner as
     output_summary when set.

Live-verified after observer restart:
   re-audit PR #6 → verdict=request_changes, 4 findings (1 warn)
     observer: by_source={'auditor': 1}  (previously coerced to 'scenario')
     _kb/outcomes.jsonl tail: kind=audit sig=pr6-7fe47bab
       pr=6 overall=request_changes

The shape of the loop is now visible to downstream consumers. Phase
B (auditor's kb_query check reads these audit rows for history)
lands in a follow-up PR. Phase C-F similar.

NOT in this PR:
- Actually deleting lab_ui.py + pipeline_lab.py (operator decision,
  called out in the inventory doc)
- Cleaning up the 5 overlapping Python scripts (same)
- Phases B-F of the cohesion plan (separate PRs per wire)
- Integration test that asserts "smarter DB" across runs (Phase F)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:22:42 -05:00
4 changed files with 298 additions and 6 deletions

View File

@ -80,6 +80,16 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<
await persistVerdict(verdict);
// Phase A of the cohesion plan (docs/COHESION_INTEGRATION_PLAN.md):
// make every audit verdict visible to the observer + KB. Enables
// future Phase B (kb_query sees prior audit history) without a
// separate backfill. Fire-and-forget: observer/KB failures don't
// block the Gitea post.
indexVerdictToObserver(verdict).catch(e =>
console.error(`[auditor] observer indexing failed: ${(e as Error).message}`));
appendVerdictToKbOutcomes(verdict).catch(e =>
console.error(`[auditor] kb outcomes append failed: ${(e as Error).message}`));
if (!opts.dry_run) {
await postToGitea(verdict);
}
@ -87,6 +97,68 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<
return verdict;
}
// Phase A — verdict indexing.
//
// Two destinations, both append-only + non-blocking:
// 1. observer :3800/event — ring buffer + data/_observer/ops.jsonl
// 2. data/_kb/outcomes.jsonl — same file scenarios write to, with
// kind:"audit" so readers can filter
//
// Errors log + drop. The verdict is still on disk at
// _auditor/verdicts/{pr}-{sha}.json; observer + KB are a convenience
// surface, not a source of truth.
const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
const KB_OUTCOMES = "/home/profit/lakehouse/data/_kb/outcomes.jsonl";
async function indexVerdictToObserver(v: Verdict): Promise<void> {
const payload = {
source: "auditor",
event_kind: "audit",
ok: v.overall === "approve",
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
pr_number: v.pr_number,
head_sha: v.head_sha,
overall: v.overall,
one_liner: v.one_liner,
findings_block: v.metrics.findings_block,
findings_warn: v.metrics.findings_warn,
audit_duration_ms: v.metrics.audit_duration_ms,
audited_at: v.audited_at,
};
const r = await fetch(`${OBSERVER_URL}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(3000),
});
if (!r.ok) throw new Error(`observer ${r.status}: ${await r.text()}`);
}
async function appendVerdictToKbOutcomes(v: Verdict): Promise<void> {
const { appendFile, mkdir } = await import("node:fs/promises");
const { dirname } = await import("node:path");
await mkdir(dirname(KB_OUTCOMES), { recursive: true });
const row = {
kind: "audit",
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
audited_at: v.audited_at,
pr_number: v.pr_number,
head_sha: v.head_sha,
overall: v.overall,
one_liner: v.one_liner,
ok_events: v.overall === "approve" ? 1 : 0,
total_events: 1,
findings: {
block: v.metrics.findings_block,
warn: v.metrics.findings_warn,
info: v.metrics.findings_info,
},
elapsed_secs: (v.metrics.audit_duration_ms ?? 0) / 1000,
};
await appendFile(KB_OUTCOMES, JSON.stringify(row) + "\n");
}
async function persistVerdict(v: Verdict): Promise<void> {
await mkdir(VERDICTS_DIR, { recursive: true });
const filename = `${v.pr_number}-${v.head_sha.slice(0, 12)}.json`;

View File

@ -0,0 +1,126 @@
# Cohesion integration plan — the "smarter DB" loop
**Written:** 2026-04-22, after J flagged that the system has good parts but they don't compose into the self-improving loop promised in the control-plane thesis (`project_control_plane_thesis.md` memory: 0→85% via hyperfocus-then-escalate).
## The gap
Each piece works in isolation. What's not wired:
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ observer │──✓──│ data/_kb/ │──?──│ auditor │
│ :3800 │ │ outcomes. │ │ kb_query │
│ │ │ jsonl │ │ check │
└─────────────┘ └─────────────┘ └─────────────┘
▲ │
│? ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ auditor │─────│ data/_audit │ │ cloud │
│ verdicts │ │or/verdicts/ │ │ inference │
│ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│?
┌─────────────┐
│ hybrid_srch │
│ playbook_mem│
│ context7 │
└─────────────┘
```
- **✓** solid: observer captures scenarios, writes `_kb/outcomes.jsonl`.
- **?** missing:
- auditor verdicts go to `_auditor/verdicts/` — not to observer or KB
- auditor kb_query reads `_kb/outcomes.jsonl` but doesn't query hybrid_search, playbook_memory, or context7
- auditor inference sends claim+diff to cloud — but WITHOUT KB neighbors, WITHOUT drift context, WITHOUT tool-aware retrieval
- Result: every audit is stateless-ish. The DB doesn't get smarter across audits.
## The target loop
```
PR opened
auditor fetches diff + claims
auditor enriches with:
— KB neighbors: past verdicts on similar claim-hashes
— hybrid_search: playbooks for task classes the diff touches
— context7 drift status: for any tool named in commit messages
— MCP tools: agent-style queries if relevant
cloud inference sees: diff + claims + enrichment
verdict posted to Gitea
verdict ALSO persisted to:
— observer :3800/event (source=auditor)
— _kb/outcomes.jsonl (kind=audit, sig_hash=pr+sha)
next audit on similar PR:
— KB neighbors find this prior verdict
— cloud sees "similar PRs ended in request_changes for reason X"
— verdict is more calibrated
```
## Phases of closure
Building this in order from least-invasive:
### Phase A — Verdict indexing (shipped in this PR)
After every `auditPr()` completes, in addition to persisting to `_auditor/verdicts/`:
1. POST the verdict to observer `:3800/event` with `source: "auditor"`, `ok: verdict === "approve"`, `event_kind: "audit"`, `sig_hash: <stable hash of pr_number + head_sha>`.
2. Append a simplified outcome to `data/_kb/outcomes.jsonl` with a special `kind: "audit"` row so the KB surface treats audits alongside scenarios.
Minimal surgery. Doesn't change the verdict itself. Just makes it visible to downstream consumers.
**Test:** after this PR lands, re-run the auditor once; observer stats should show `by_source.auditor > 0`; `_kb/outcomes.jsonl` should have one new row per audited SHA.
### Phase B — KB query sees auditor history
Extend `auditor/checks/kb_query.ts` to:
1. Read `_kb/outcomes.jsonl`, filter `kind === "audit"`, find prior audits with matching `sig_hash` (same PR across SHAs) or similar claim hashes.
2. Emit finding: "prior N audits on this PR ended in [approve, block, request_changes], last reason was X."
Gives the auditor memory across re-audits of the same PR.
### Phase C — Hybrid search in kb_query
When a PR touches `crates/vectord/src/playbook_memory.rs` (task-class matching), auditor calls `POST /vectors/hybrid` to find playbooks semantically related to the diff. Surfaces as "N playbooks in production rely on this code path, consider backward-compat."
Requires:
- Task-class extractor from diff paths
- Cloud-free hybrid_search call (we have local Ollama for embeddings)
### Phase D — Context7 drift awareness in auditor
If the diff / commit message names a tool that's in any playbook's `doc_refs`, auditor calls the context7 bridge to check current drift status. Surfaces as "tool X has drifted since N playbooks referenced it; this PR may need to update those."
### Phase E — Inference sees enrichment
The cloud inference check currently sends `diff + claims`. Extend to send `diff + claims + kb_neighbors + drift_context + hybrid_search_matches`. The prompt becomes context-rich — exactly what makes the cloud model competent (per the control-plane thesis).
### Phase F — Full integration test
An auditor self-test that:
1. Creates a synthetic PR with known-good and known-bad claims
2. Runs the enriched auditor
3. Asserts the verdict found the planted issues
4. Runs a second identical-claim PR
5. Asserts the SECOND verdict references the FIRST audit (via KB neighbor retrieval)
6. "Smarter DB" proof: two runs, measurable context gain.
## Sequence
Phase A lands in this PR alongside the inventory. Phases B-F are follow-up PRs, each with their own auditor gate. Order matters: A → B (reads A's output) → C+D (in parallel) → E (consumes B/C/D) → F (asserts E's behavior).
## Not in this plan (deliberate)
- **Rewriting the auditor to use a different architecture.** Current 4-check model stays; we just enrich the checks.
- **Tuning cloud inference precision.** The false-positive rate is a prompt-engineering concern; this plan is about context enrichment, which is separate.
- **Branch protection enforcement.** Stays off until Phase F passes.
The overall bet: this is the "putting it all together coherently" J said was a real problem. Six phases over however many PRs it takes. Each one ships one wire of the loop; no single PR tries to do them all.

75
docs/PYTHON_INVENTORY.md Normal file
View File

@ -0,0 +1,75 @@
# Python inventory — what's wired vs dead
Generated 2026-04-22. Enumerates every `.py` file in the repo and classifies each by actual integration status. Goal: surface the "ambiguous names that don't hook in" J flagged.
## Classification
- **🟢 Production** — running as a systemd service, or imported by code that does
- **🟡 Documented** — referenced in `docs/PRD.md` / `docs/PHASES.md` / `docs/CONTROL_PLANE_PRD.md` or called from a shell/TS orchestrator
- **🟠 Manual** — runnable as a one-off utility but not in any automated pipeline
- **🔴 Dead** — not imported, not referenced, not called anywhere
## Results
### Sidecar (`sidecar/sidecar/`)
The Python FastAPI sidecar on `:3200`. Runs as `lakehouse-sidecar.service`.
| File | Status | Evidence |
|---|---|---|
| `main.py` | 🟢 Production | systemd ExecStart: `uvicorn sidecar.sidecar.main:app` |
| `admin.py` | 🟢 Production | imported by `main.py` as `admin_router` |
| `embed.py` | 🟢 Production | imported by `main.py` as `embed_router` |
| `generate.py` | 🟢 Production | imported by `main.py` as `generate_router` |
| `rerank.py` | 🟢 Production | imported by `main.py` as `rerank_router` |
| `ollama.py` | 🟢 Production | imported transitively by embed/generate/rerank |
| `__init__.py` | 🟢 Production | package marker |
| `lab_ui.py` | 🔴 **Dead** | **not imported by any file in `sidecar/`. `scripts/serve_lab.py` only uses stdlib — doesn't import this.** Mtime 2026-04-16 (pre-session). Safe to delete. |
| `pipeline_lab.py` | 🔴 **Dead** | **same — not imported anywhere. Mtime 2026-04-16.** Docstring says "iterative embedding/LLM pipeline experimentation" — looks like an abandoned prototype. Safe to delete. |
### Scripts (`scripts/`)
17 files. Only **2** are production (behind systemd), **3** are documented in phase/PRD docs, **12** are unreferenced.
| File | Status | Evidence |
|---|---|---|
| `serve_ui.py` | 🟢 Production | `lakehouse-ui.service` ExecStart |
| `serve_imagegen.py` | 🟢 Production | `imagegen.service` ExecStart |
| `serve_lab.py` | 🟢 Production | `pipeline-lab.service` ExecStart |
| `kb_measure.py` | 🟡 Documented | referenced in `docs/PHASES.md` (Phase 22 aggregator) |
| `kb_staffer_report.py` | 🟡 Documented | referenced in `docs/PHASES.md` + `docs/PRD.md` (Phase 23), also called by `scripts/run_staffer_demo.sh` |
| `autonomous_agent.py` | 🟠 Manual | "Autonomous stress-test agent" — no docs refs, no callers. Mtime 2026-04-17. |
| `copilot.py` | 🟠 Manual | "Staffing Co-Pilot — the anticipation layer" — no docs refs. Writes briefing to `/tmp/copilot_briefing.json` on run. |
| `generate_demo.py` | 🟠 Manual | "realistic demo datasets" — mtime 2026-03-27, never touched this session |
| `generate_workers.py` | 🟠 Manual | "worker profiles at scale" — usage example in docstring: `python3 generate_workers.py 100000 > /tmp/workers_100k.csv`. Still useful, just not auto-run. |
| `lance_tune.py` | 🟠 Manual | "IVF_PQ parameter sweep" — one-shot experiment tool. |
| `qwen3_plan.py` | 🟠 Manual | "Qwen 3 agent plan — structured test with playbook building" |
| `quality_eval.py` | 🟠 Manual | "Quality evaluation pipeline — tests whether the system gives CORRECT" |
| `scale_test.py` | 🟠 Manual | called by `scripts/scale_10m_test.sh` (not in systemd). 2.5M row test. |
| `staffing_day.py` | 🟠 Manual | "Real-world staffing agency day simulation" — no docs refs. Overlaps conceptually with `tests/multi-agent/scenario.ts`. |
| `staffing_demo.py` | 🟠 Manual | "Realistic staffing company data generator" — demo fixture. |
| `staffing_simulation.py` | 🟠 Manual | "multi-agent stress test" — overlaps with `tests/multi-agent/scenario.ts`. |
| `stress_test.py` | 🟠 Manual | "prove this architecture is sound or find where it breaks" — overlaps with `tests/multi-agent/run_stress.ts`. |
## Overlap with TypeScript / Rust equivalents
Several Python scripts overlap conceptually with newer TS/Rust code. This is the "ambiguous names that don't hook in" pattern — the Python originals were superseded but not removed.
| Python script | Newer equivalent | Integration drift |
|---|---|---|
| `scripts/staffing_simulation.py` | `tests/multi-agent/scenario.ts` | scenario.ts ships + observer-integrated (Phase 24); Python version is unwired. |
| `scripts/staffing_day.py` | `tests/multi-agent/scenario.ts` | same. |
| `scripts/stress_test.py` | `tests/multi-agent/run_stress.ts` | TS version is the one the auditor fixture exercises. |
| `scripts/autonomous_agent.py` | `bot/` + `auditor/` Bun sub-agents | new Bun pattern supersedes. Python version predates the control-plane pivot. |
| `scripts/qwen3_plan.py` | `tests/multi-agent/agent.ts` | TS agent is the one with Phase 21 continuation + cloud routing. |
**Recommended cleanup actions (not in this PR — separate decision):**
1. **Delete the two confirmed dead sidecar files:** `sidecar/sidecar/lab_ui.py`, `sidecar/sidecar/pipeline_lab.py`. Nothing calls them; removal is safe.
2. **Archive the 5 overlapping scripts** (staffing_simulation, staffing_day, stress_test, autonomous_agent, qwen3_plan) — either move to `scripts/legacy/` with a README explaining they predate the TS versions, or delete if J confirms no future use.
3. **Keep the one-off utilities** (generate_demo, generate_workers, lance_tune, quality_eval, copilot, scale_test, staffing_demo) — useful as manual tools. Maybe add a `scripts/README.md` documenting what each does and when to run.
4. **Promote `kb_measure.py` and `kb_staffer_report.py`** to be actually callable from the KB flow — they're documented but only run manually after scenario batches.
## Honest next step
The cleanup above isn't this session's work — it's a subsequent PR. This doc lands first so the state is visible and operator-auditable.

View File

@ -37,7 +37,11 @@ interface ObservedOp {
// Phase 24 — optional provenance so error analyzer and playbook
// builder can differentiate MCP-layer ops from scenario-sourced
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
source?: "mcp" | "scenario";
// Phase A (cohesion plan 2026-04-22) — auditor verdicts also post
// to :3800/event with source="auditor"; stats endpoint counts them
// separately so KB readers can see audit cadence alongside
// scenario cadence.
source?: "mcp" | "scenario" | "auditor" | "bot";
staffer_id?: string;
sig_hash?: string;
event_kind?: string;
@ -253,15 +257,30 @@ function startHttpListener() {
}
if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => {
// Accept caller-provided source when present and valid.
// Unrecognized sources fall back to "scenario" for
// backward-compat with older callers.
const allowedSource = ["mcp", "scenario", "auditor", "bot"] as const;
const bodySrc = typeof body.source === "string" ? body.source : "";
const source = (allowedSource as readonly string[]).includes(bodySrc)
? (bodySrc as typeof allowedSource[number])
: "scenario";
// Phase A: auditor+bot POST with ok=true|false (not success)
// for symmetry with the kind_of of their domain. Accept either.
const success = typeof body.success === "boolean" ? body.success
: typeof body.ok === "boolean" ? body.ok
: false;
const op: ObservedOp = {
timestamp: body.timestamp ?? new Date().toISOString(),
endpoint: body.endpoint ?? "scenario:fill",
endpoint: body.endpoint ?? (source === "auditor" ? "auditor:verdict"
: source === "bot" ? "bot:cycle"
: "scenario:fill"),
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
success: !!body.success,
duration_ms: Number(body.duration_ms ?? 0),
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
success,
duration_ms: Number(body.duration_ms ?? body.audit_duration_ms ?? 0),
output_summary: body.output_summary ?? body.one_liner ?? (success ? "ok" : (body.error ?? "failed")),
error: body.error,
source: "scenario",
source,
staffer_id: body.staffer_id,
sig_hash: body.sig_hash,
event_kind: body.event_kind,