Cohesion: Python inventory + integration plan + Phase A (auditor→observer+KB) #7
@ -80,6 +80,16 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<
|
||||
|
||||
await persistVerdict(verdict);
|
||||
|
||||
// Phase A of the cohesion plan (docs/COHESION_INTEGRATION_PLAN.md):
|
||||
// make every audit verdict visible to the observer + KB. Enables
|
||||
// future Phase B (kb_query sees prior audit history) without a
|
||||
// separate backfill. Fire-and-forget: observer/KB failures don't
|
||||
// block the Gitea post.
|
||||
indexVerdictToObserver(verdict).catch(e =>
|
||||
console.error(`[auditor] observer indexing failed: ${(e as Error).message}`));
|
||||
appendVerdictToKbOutcomes(verdict).catch(e =>
|
||||
console.error(`[auditor] kb outcomes append failed: ${(e as Error).message}`));
|
||||
|
||||
if (!opts.dry_run) {
|
||||
await postToGitea(verdict);
|
||||
}
|
||||
@ -87,6 +97,68 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<
|
||||
return verdict;
|
||||
}
|
||||
|
||||
// Phase A — verdict indexing.
|
||||
//
|
||||
// Two destinations, both append-only + non-blocking:
|
||||
// 1. observer :3800/event — ring buffer + data/_observer/ops.jsonl
|
||||
// 2. data/_kb/outcomes.jsonl — same file scenarios write to, with
|
||||
// kind:"audit" so readers can filter
|
||||
//
|
||||
// Errors log + drop. The verdict is still on disk at
|
||||
// _auditor/verdicts/{pr}-{sha}.json; observer + KB are a convenience
|
||||
// surface, not a source of truth.
|
||||
|
||||
const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
|
||||
const KB_OUTCOMES = "/home/profit/lakehouse/data/_kb/outcomes.jsonl";
|
||||
|
||||
async function indexVerdictToObserver(v: Verdict): Promise<void> {
|
||||
const payload = {
|
||||
source: "auditor",
|
||||
event_kind: "audit",
|
||||
ok: v.overall === "approve",
|
||||
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
|
||||
pr_number: v.pr_number,
|
||||
head_sha: v.head_sha,
|
||||
overall: v.overall,
|
||||
one_liner: v.one_liner,
|
||||
findings_block: v.metrics.findings_block,
|
||||
findings_warn: v.metrics.findings_warn,
|
||||
audit_duration_ms: v.metrics.audit_duration_ms,
|
||||
audited_at: v.audited_at,
|
||||
};
|
||||
const r = await fetch(`${OBSERVER_URL}/event`, {
|
||||
method: "POST",
|
||||
headers: { "content-type": "application/json" },
|
||||
body: JSON.stringify(payload),
|
||||
signal: AbortSignal.timeout(3000),
|
||||
});
|
||||
if (!r.ok) throw new Error(`observer ${r.status}: ${await r.text()}`);
|
||||
}
|
||||
|
||||
async function appendVerdictToKbOutcomes(v: Verdict): Promise<void> {
|
||||
const { appendFile, mkdir } = await import("node:fs/promises");
|
||||
const { dirname } = await import("node:path");
|
||||
await mkdir(dirname(KB_OUTCOMES), { recursive: true });
|
||||
const row = {
|
||||
kind: "audit",
|
||||
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
|
||||
audited_at: v.audited_at,
|
||||
pr_number: v.pr_number,
|
||||
head_sha: v.head_sha,
|
||||
overall: v.overall,
|
||||
one_liner: v.one_liner,
|
||||
ok_events: v.overall === "approve" ? 1 : 0,
|
||||
total_events: 1,
|
||||
findings: {
|
||||
block: v.metrics.findings_block,
|
||||
warn: v.metrics.findings_warn,
|
||||
info: v.metrics.findings_info,
|
||||
},
|
||||
elapsed_secs: (v.metrics.audit_duration_ms ?? 0) / 1000,
|
||||
};
|
||||
await appendFile(KB_OUTCOMES, JSON.stringify(row) + "\n");
|
||||
}
|
||||
|
||||
async function persistVerdict(v: Verdict): Promise<void> {
|
||||
await mkdir(VERDICTS_DIR, { recursive: true });
|
||||
const filename = `${v.pr_number}-${v.head_sha.slice(0, 12)}.json`;
|
||||
|
||||
126
docs/COHESION_INTEGRATION_PLAN.md
Normal file
126
docs/COHESION_INTEGRATION_PLAN.md
Normal file
@ -0,0 +1,126 @@
|
||||
# Cohesion integration plan — the "smarter DB" loop
|
||||
|
||||
**Written:** 2026-04-22, after J flagged that the system has good parts but they don't compose into the self-improving loop promised in the control-plane thesis (`project_control_plane_thesis.md` memory: 0→85% via hyperfocus-then-escalate).
|
||||
|
||||
## The gap
|
||||
|
||||
Each piece works in isolation. What's not wired:
|
||||
|
||||
```
|
||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||||
│ observer │──✓──│ data/_kb/ │──?──│ auditor │
|
||||
│ :3800 │ │ outcomes. │ │ kb_query │
|
||||
│ │ │ jsonl │ │ check │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘
|
||||
▲ │
|
||||
│? ▼
|
||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||||
│ auditor │─────│ data/_audit │ │ cloud │
|
||||
│ verdicts │ │or/verdicts/ │ │ inference │
|
||||
│ │ │ │ │ │
|
||||
└─────────────┘ └─────────────┘ └─────────────┘
|
||||
│?
|
||||
▼
|
||||
┌─────────────┐
|
||||
│ hybrid_srch │
|
||||
│ playbook_mem│
|
||||
│ context7 │
|
||||
└─────────────┘
|
||||
```
|
||||
|
||||
- **✓** solid: observer captures scenarios, writes `_kb/outcomes.jsonl`.
|
||||
- **?** missing:
|
||||
- auditor verdicts go to `_auditor/verdicts/` — not to observer or KB
|
||||
- auditor kb_query reads `_kb/outcomes.jsonl` but doesn't query hybrid_search, playbook_memory, or context7
|
||||
- auditor inference sends claim+diff to cloud — but WITHOUT KB neighbors, WITHOUT drift context, WITHOUT tool-aware retrieval
|
||||
- Result: every audit is stateless-ish. The DB doesn't get smarter across audits.
|
||||
|
||||
## The target loop
|
||||
|
||||
```
|
||||
PR opened
|
||||
↓
|
||||
auditor fetches diff + claims
|
||||
↓
|
||||
auditor enriches with:
|
||||
— KB neighbors: past verdicts on similar claim-hashes
|
||||
— hybrid_search: playbooks for task classes the diff touches
|
||||
— context7 drift status: for any tool named in commit messages
|
||||
— MCP tools: agent-style queries if relevant
|
||||
↓
|
||||
cloud inference sees: diff + claims + enrichment
|
||||
↓
|
||||
verdict posted to Gitea
|
||||
↓
|
||||
verdict ALSO persisted to:
|
||||
— observer :3800/event (source=auditor)
|
||||
— _kb/outcomes.jsonl (kind=audit, sig_hash=pr+sha)
|
||||
↓
|
||||
next audit on similar PR:
|
||||
— KB neighbors find this prior verdict
|
||||
— cloud sees "similar PRs ended in request_changes for reason X"
|
||||
— verdict is more calibrated
|
||||
```
|
||||
|
||||
## Phases of closure
|
||||
|
||||
Building this in order from least-invasive:
|
||||
|
||||
### Phase A — Verdict indexing (shipped in this PR)
|
||||
|
||||
After every `auditPr()` completes, in addition to persisting to `_auditor/verdicts/`:
|
||||
|
||||
1. POST the verdict to observer `:3800/event` with `source: "auditor"`, `ok: verdict === "approve"`, `event_kind: "audit"`, `sig_hash: <stable hash of pr_number + head_sha>`.
|
||||
2. Append a simplified outcome to `data/_kb/outcomes.jsonl` with a special `kind: "audit"` row so the KB surface treats audits alongside scenarios.
|
||||
|
||||
Minimal surgery. Doesn't change the verdict itself. Just makes it visible to downstream consumers.
|
||||
|
||||
**Test:** after this PR lands, re-run the auditor once; observer stats should show `by_source.auditor > 0`; `_kb/outcomes.jsonl` should have one new row per audited SHA.
|
||||
|
||||
### Phase B — KB query sees auditor history
|
||||
|
||||
Extend `auditor/checks/kb_query.ts` to:
|
||||
|
||||
1. Read `_kb/outcomes.jsonl`, filter `kind === "audit"`, find prior audits with matching `sig_hash` (same PR across SHAs) or similar claim hashes.
|
||||
2. Emit finding: "prior N audits on this PR ended in [approve, block, request_changes], last reason was X."
|
||||
|
||||
Gives the auditor memory across re-audits of the same PR.
|
||||
|
||||
### Phase C — Hybrid search in kb_query
|
||||
|
||||
When a PR touches `crates/vectord/src/playbook_memory.rs` (task-class matching), auditor calls `POST /vectors/hybrid` to find playbooks semantically related to the diff. Surfaces as "N playbooks in production rely on this code path, consider backward-compat."
|
||||
|
||||
Requires:
|
||||
- Task-class extractor from diff paths
|
||||
- Cloud-free hybrid_search call (we have local Ollama for embeddings)
|
||||
|
||||
### Phase D — Context7 drift awareness in auditor
|
||||
|
||||
If the diff / commit message names a tool that's in any playbook's `doc_refs`, auditor calls the context7 bridge to check current drift status. Surfaces as "tool X has drifted since N playbooks referenced it; this PR may need to update those."
|
||||
|
||||
### Phase E — Inference sees enrichment
|
||||
|
||||
The cloud inference check currently sends `diff + claims`. Extend to send `diff + claims + kb_neighbors + drift_context + hybrid_search_matches`. The prompt becomes context-rich — exactly what makes the cloud model competent (per the control-plane thesis).
|
||||
|
||||
### Phase F — Full integration test
|
||||
|
||||
An auditor self-test that:
|
||||
|
||||
1. Creates a synthetic PR with known-good and known-bad claims
|
||||
2. Runs the enriched auditor
|
||||
3. Asserts the verdict found the planted issues
|
||||
4. Runs a second identical-claim PR
|
||||
5. Asserts the SECOND verdict references the FIRST audit (via KB neighbor retrieval)
|
||||
6. "Smarter DB" proof: two runs, measurable context gain.
|
||||
|
||||
## Sequence
|
||||
|
||||
Phase A lands in this PR alongside the inventory. Phases B-F are follow-up PRs, each with their own auditor gate. Order matters: A → B (reads A's output) → C+D (in parallel) → E (consumes B/C/D) → F (asserts E's behavior).
|
||||
|
||||
## Not in this plan (deliberate)
|
||||
|
||||
- **Rewriting the auditor to use a different architecture.** Current 4-check model stays; we just enrich the checks.
|
||||
- **Tuning cloud inference precision.** The false-positive rate is a prompt-engineering concern; this plan is about context enrichment, which is separate.
|
||||
- **Branch protection enforcement.** Stays off until Phase F passes.
|
||||
|
||||
The overall bet: this is the "putting it all together coherently" J said was a real problem. Six phases over however many PRs it takes. Each one ships one wire of the loop; no single PR tries to do them all.
|
||||
75
docs/PYTHON_INVENTORY.md
Normal file
75
docs/PYTHON_INVENTORY.md
Normal file
@ -0,0 +1,75 @@
|
||||
# Python inventory — what's wired vs dead
|
||||
|
||||
Generated 2026-04-22. Enumerates every `.py` file in the repo and classifies each by actual integration status. Goal: surface the "ambiguous names that don't hook in" J flagged.
|
||||
|
||||
## Classification
|
||||
|
||||
- **🟢 Production** — running as a systemd service, or imported by code that does
|
||||
- **🟡 Documented** — referenced in `docs/PRD.md` / `docs/PHASES.md` / `docs/CONTROL_PLANE_PRD.md` or called from a shell/TS orchestrator
|
||||
- **🟠 Manual** — runnable as a one-off utility but not in any automated pipeline
|
||||
- **🔴 Dead** — not imported, not referenced, not called anywhere
|
||||
|
||||
## Results
|
||||
|
||||
### Sidecar (`sidecar/sidecar/`)
|
||||
|
||||
The Python FastAPI sidecar on `:3200`. Runs as `lakehouse-sidecar.service`.
|
||||
|
||||
| File | Status | Evidence |
|
||||
|---|---|---|
|
||||
| `main.py` | 🟢 Production | systemd ExecStart: `uvicorn sidecar.sidecar.main:app` |
|
||||
| `admin.py` | 🟢 Production | imported by `main.py` as `admin_router` |
|
||||
| `embed.py` | 🟢 Production | imported by `main.py` as `embed_router` |
|
||||
| `generate.py` | 🟢 Production | imported by `main.py` as `generate_router` |
|
||||
| `rerank.py` | 🟢 Production | imported by `main.py` as `rerank_router` |
|
||||
| `ollama.py` | 🟢 Production | imported transitively by embed/generate/rerank |
|
||||
| `__init__.py` | 🟢 Production | package marker |
|
||||
| `lab_ui.py` | 🔴 **Dead** | **not imported by any file in `sidecar/`. `scripts/serve_lab.py` only uses stdlib — doesn't import this.** Mtime 2026-04-16 (pre-session). Safe to delete. |
|
||||
| `pipeline_lab.py` | 🔴 **Dead** | **same — not imported anywhere. Mtime 2026-04-16.** Docstring says "iterative embedding/LLM pipeline experimentation" — looks like an abandoned prototype. Safe to delete. |
|
||||
|
||||
### Scripts (`scripts/`)
|
||||
|
||||
17 files. Only **2** are production (behind systemd), **3** are documented in phase/PRD docs, **12** are unreferenced.
|
||||
|
||||
| File | Status | Evidence |
|
||||
|---|---|---|
|
||||
| `serve_ui.py` | 🟢 Production | `lakehouse-ui.service` ExecStart |
|
||||
| `serve_imagegen.py` | 🟢 Production | `imagegen.service` ExecStart |
|
||||
| `serve_lab.py` | 🟢 Production | `pipeline-lab.service` ExecStart |
|
||||
| `kb_measure.py` | 🟡 Documented | referenced in `docs/PHASES.md` (Phase 22 aggregator) |
|
||||
| `kb_staffer_report.py` | 🟡 Documented | referenced in `docs/PHASES.md` + `docs/PRD.md` (Phase 23), also called by `scripts/run_staffer_demo.sh` |
|
||||
| `autonomous_agent.py` | 🟠 Manual | "Autonomous stress-test agent" — no docs refs, no callers. Mtime 2026-04-17. |
|
||||
| `copilot.py` | 🟠 Manual | "Staffing Co-Pilot — the anticipation layer" — no docs refs. Writes briefing to `/tmp/copilot_briefing.json` on run. |
|
||||
| `generate_demo.py` | 🟠 Manual | "realistic demo datasets" — mtime 2026-03-27, never touched this session |
|
||||
| `generate_workers.py` | 🟠 Manual | "worker profiles at scale" — usage example in docstring: `python3 generate_workers.py 100000 > /tmp/workers_100k.csv`. Still useful, just not auto-run. |
|
||||
| `lance_tune.py` | 🟠 Manual | "IVF_PQ parameter sweep" — one-shot experiment tool. |
|
||||
| `qwen3_plan.py` | 🟠 Manual | "Qwen 3 agent plan — structured test with playbook building" |
|
||||
| `quality_eval.py` | 🟠 Manual | "Quality evaluation pipeline — tests whether the system gives CORRECT" |
|
||||
| `scale_test.py` | 🟠 Manual | called by `scripts/scale_10m_test.sh` (not in systemd). 2.5M row test. |
|
||||
| `staffing_day.py` | 🟠 Manual | "Real-world staffing agency day simulation" — no docs refs. Overlaps conceptually with `tests/multi-agent/scenario.ts`. |
|
||||
| `staffing_demo.py` | 🟠 Manual | "Realistic staffing company data generator" — demo fixture. |
|
||||
| `staffing_simulation.py` | 🟠 Manual | "multi-agent stress test" — overlaps with `tests/multi-agent/scenario.ts`. |
|
||||
| `stress_test.py` | 🟠 Manual | "prove this architecture is sound or find where it breaks" — overlaps with `tests/multi-agent/run_stress.ts`. |
|
||||
|
||||
## Overlap with TypeScript / Rust equivalents
|
||||
|
||||
Several Python scripts overlap conceptually with newer TS/Rust code. This is the "ambiguous names that don't hook in" pattern — the Python originals were superseded but not removed.
|
||||
|
||||
| Python script | Newer equivalent | Integration drift |
|
||||
|---|---|---|
|
||||
| `scripts/staffing_simulation.py` | `tests/multi-agent/scenario.ts` | scenario.ts ships + observer-integrated (Phase 24); Python version is unwired. |
|
||||
| `scripts/staffing_day.py` | `tests/multi-agent/scenario.ts` | same. |
|
||||
| `scripts/stress_test.py` | `tests/multi-agent/run_stress.ts` | TS version is the one the auditor fixture exercises. |
|
||||
| `scripts/autonomous_agent.py` | `bot/` + `auditor/` Bun sub-agents | new Bun pattern supersedes. Python version predates the control-plane pivot. |
|
||||
| `scripts/qwen3_plan.py` | `tests/multi-agent/agent.ts` | TS agent is the one with Phase 21 continuation + cloud routing. |
|
||||
|
||||
**Recommended cleanup actions (not in this PR — separate decision):**
|
||||
|
||||
1. **Delete the two confirmed dead sidecar files:** `sidecar/sidecar/lab_ui.py`, `sidecar/sidecar/pipeline_lab.py`. Nothing calls them; removal is safe.
|
||||
2. **Archive the 5 overlapping scripts** (staffing_simulation, staffing_day, stress_test, autonomous_agent, qwen3_plan) — either move to `scripts/legacy/` with a README explaining they predate the TS versions, or delete if J confirms no future use.
|
||||
3. **Keep the one-off utilities** (generate_demo, generate_workers, lance_tune, quality_eval, copilot, scale_test, staffing_demo) — useful as manual tools. Maybe add a `scripts/README.md` documenting what each does and when to run.
|
||||
4. **Promote `kb_measure.py` and `kb_staffer_report.py`** to be actually callable from the KB flow — they're documented but only run manually after scenario batches.
|
||||
|
||||
## Honest next step
|
||||
|
||||
The cleanup above isn't this session's work — it's a subsequent PR. This doc lands first so the state is visible and operator-auditable.
|
||||
@ -37,7 +37,11 @@ interface ObservedOp {
|
||||
// Phase 24 — optional provenance so error analyzer and playbook
|
||||
// builder can differentiate MCP-layer ops from scenario-sourced
|
||||
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
|
||||
source?: "mcp" | "scenario";
|
||||
// Phase A (cohesion plan 2026-04-22) — auditor verdicts also post
|
||||
// to :3800/event with source="auditor"; stats endpoint counts them
|
||||
// separately so KB readers can see audit cadence alongside
|
||||
// scenario cadence.
|
||||
source?: "mcp" | "scenario" | "auditor" | "bot";
|
||||
staffer_id?: string;
|
||||
sig_hash?: string;
|
||||
event_kind?: string;
|
||||
@ -253,15 +257,30 @@ function startHttpListener() {
|
||||
}
|
||||
if (req.method === "POST" && url.pathname === "/event") {
|
||||
return req.json().then((body: any) => {
|
||||
// Accept caller-provided source when present and valid.
|
||||
// Unrecognized sources fall back to "scenario" for
|
||||
// backward-compat with older callers.
|
||||
const allowedSource = ["mcp", "scenario", "auditor", "bot"] as const;
|
||||
const bodySrc = typeof body.source === "string" ? body.source : "";
|
||||
const source = (allowedSource as readonly string[]).includes(bodySrc)
|
||||
? (bodySrc as typeof allowedSource[number])
|
||||
: "scenario";
|
||||
// Phase A: auditor+bot POST with ok=true|false (not success)
|
||||
// for symmetry with the kind_of of their domain. Accept either.
|
||||
const success = typeof body.success === "boolean" ? body.success
|
||||
: typeof body.ok === "boolean" ? body.ok
|
||||
: false;
|
||||
const op: ObservedOp = {
|
||||
timestamp: body.timestamp ?? new Date().toISOString(),
|
||||
endpoint: body.endpoint ?? "scenario:fill",
|
||||
endpoint: body.endpoint ?? (source === "auditor" ? "auditor:verdict"
|
||||
: source === "bot" ? "bot:cycle"
|
||||
: "scenario:fill"),
|
||||
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
|
||||
success: !!body.success,
|
||||
duration_ms: Number(body.duration_ms ?? 0),
|
||||
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
|
||||
success,
|
||||
duration_ms: Number(body.duration_ms ?? body.audit_duration_ms ?? 0),
|
||||
output_summary: body.output_summary ?? body.one_liner ?? (success ? "ok" : (body.error ?? "failed")),
|
||||
error: body.error,
|
||||
source: "scenario",
|
||||
source,
|
||||
staffer_id: body.staffer_id,
|
||||
sig_hash: body.sig_hash,
|
||||
event_kind: body.event_kind,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user