root 89ca72d471 materializer + replay ports + vectord substrate fix verified at scale
Two threads landing together — the doc edits interleave so they ship
in a single commit.

1. **vectord substrate fix verified at original scale** (closes the
   2026-05-01 thread). Re-ran multitier 5min @ conc=50: 132,211
   scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix).
   Throughput dropped 1,115 → 438/sec because previously-broken
   scenarios now do real HNSW Add work — honest cost of correctness.
   The fix (i.vectors side-store + safeGraphAdd recover wrappers +
   smallIndexRebuildThreshold=32 + saveTask coalescing) holds at the
   footprint that originally surfaced the bug.

2. **Materializer port** — internal/materializer + cmd/materializer +
   scripts/materializer_smoke.sh. Ports scripts/distillation/transforms.ts
   (12 transforms) + build_evidence_index.ts (idempotency, day-partition,
   receipt). On-wire JSON shape matches TS so Bun and Go runs are
   interchangeable. 14 tests green.

3. **Replay port** — internal/replay + cmd/replay +
   scripts/replay_smoke.sh. Ports scripts/distillation/replay.ts
   (retrieve → bundle → /v1/chat → validate → log). Closes audit-FULL
   phase 7 live invocation on the Go side. Both runtimes append to the
   same data/_kb/replay_runs.jsonl (schema=replay_run.v1). 14 tests green.

Side effect on internal/distillation/types.go: EvidenceRecord gained
prompt_tokens, completion_tokens, and metadata fields to mirror the TS
shape the materializer transforms produce.

STATE_OF_PLAY refreshed to 2026-05-02; ARCHITECTURE_COMPARISON decisions
tracker moves the materializer + replay items from _open_ to DONE and
adds the substrate-fix scale verification row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:31:02 -05:00

288 lines
9.2 KiB
Go

package materializer
import (
"encoding/json"
"testing"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
)
const fixedRecordedAt = "2026-05-02T00:00:00Z"
const fixedSigHash = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
func ti(row map[string]any, source string, lineOffset int64) TransformInput {
return TransformInput{
Row: row,
LineOffset: lineOffset,
SourceFileRelPath: source,
RecordedAt: fixedRecordedAt,
SigHash: fixedSigHash,
}
}
func TestExtractorTransform_DistilledFacts(t *testing.T) {
in := ti(map[string]any{
"run_id": "run-1",
"source_label": "lab-3",
"created_at": "2026-04-01T00:00:00Z",
"extractor": "qwen3.5:latest",
"text": "Hello.",
}, "data/_kb/distilled_facts.jsonl", 0)
rec := extractorTransform(in)
if rec == nil {
t.Fatal("nil record")
}
if rec.RunID != "run-1" || rec.TaskID != "lab-3" {
t.Fatalf("ids: %+v", rec)
}
if rec.ModelRole != distillation.RoleExtractor {
t.Errorf("role=%v, want extractor", rec.ModelRole)
}
if rec.ModelProvider != "ollama" {
t.Errorf("provider=%q, want ollama", rec.ModelProvider)
}
if rec.Provenance.SigHash != fixedSigHash {
t.Errorf("provenance.sig_hash mismatch: %q", rec.Provenance.SigHash)
}
if rec.Text != "Hello." {
t.Errorf("text=%q", rec.Text)
}
}
func TestExtractorTransform_FallbackIDs(t *testing.T) {
in := ti(map[string]any{
"created_at": "2026-04-01T00:00:00Z",
"text": "row without ids",
}, "data/_kb/distilled_procedures.jsonl", 7)
rec := extractorTransform(in)
if rec.RunID != "distilled_procedures:7" || rec.TaskID != "distilled_procedures:7" {
t.Fatalf("fallback ids wrong: %+v", rec)
}
}
func TestContractAnalysesTransform_Fields(t *testing.T) {
in := ti(map[string]any{
"permit_id": "P-001",
"ts": "2026-04-26T12:00:00Z",
"matrix_corpora": map[string]any{"workers": 1, "candidates": 1},
"matrix_hits": 3.0,
"observer_notes": []any{"good", "spec match"},
"observer_verdict": "accept",
"observer_conf": 85.0,
"ok": true,
"cost": 2_500_000.0, // micro-units
"duration_ms": 1234.0,
"contractor": "Acme",
"analysis": "Looks good.",
}, "data/_kb/contract_analyses.jsonl", 0)
rec := contractAnalysesTransform(in)
if rec.RunID == "" || rec.TaskID != "permit:P-001" {
t.Fatalf("ids: %+v", rec)
}
if rec.ModelRole != distillation.RoleExecutor {
t.Errorf("role=%v", rec.ModelRole)
}
if rec.RetrievedContext == nil || len(rec.RetrievedContext.MatrixCorpora) != 2 || rec.RetrievedContext.MatrixHits != 3 {
t.Errorf("retrieved_context wrong: %+v", rec.RetrievedContext)
}
if len(rec.ObserverNotes) != 2 {
t.Errorf("observer_notes=%v", rec.ObserverNotes)
}
if string(rec.ObserverVerdict) != "accept" || rec.ObserverConfidence != 85 {
t.Errorf("observer fields: %+v", rec)
}
if rec.CostUSD != 2.5 {
t.Errorf("cost should convert micro→USD; got %v", rec.CostUSD)
}
if rec.LatencyMs != 1234 {
t.Errorf("latency: %v", rec.LatencyMs)
}
if rec.Metadata == nil || rec.Metadata["contractor"] != "Acme" {
t.Errorf("metadata.contractor missing: %v", rec.Metadata)
}
if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "matrix_hits_above_threshold" {
t.Errorf("success_markers: %v", rec.SuccessMarkers)
}
if len(rec.FailureMarkers) != 0 {
t.Errorf("expected no failure_markers when ok=true and verdict=accept, got %v", rec.FailureMarkers)
}
}
func TestContractAnalysesTransform_FailureMarkers(t *testing.T) {
in := ti(map[string]any{
"permit_id": "P-002",
"ts": "2026-04-26T12:00:00Z",
"observer_verdict": "reject",
"ok": false,
"analysis": "Issues found.",
}, "data/_kb/contract_analyses.jsonl", 1)
rec := contractAnalysesTransform(in)
if len(rec.FailureMarkers) != 1 || rec.FailureMarkers[0] != "observer_rejected" {
t.Errorf("failure_markers: %v", rec.FailureMarkers)
}
}
func TestModeExperimentsTransform_ProviderInference(t *testing.T) {
openrouter := ti(map[string]any{
"ts": "2026-04-26T12:00:00Z",
"task_class": "scrum_review",
"model": "anthropic/claude-opus-4-7",
"file_path": "src/foo.rs",
"sources": map[string]any{"matrix_corpus": []any{"docs"}, "matrix_chunks_kept": 4.0},
"latency_ms": 200.0,
"response": "ok",
}, "data/_kb/mode_experiments.jsonl", 0)
rec := modeExperimentsTransform(openrouter)
if rec.ModelProvider != "openrouter" {
t.Errorf("provider=%q, want openrouter", rec.ModelProvider)
}
cloud := ti(map[string]any{
"ts": "2026-04-26T12:00:00Z",
"task_class": "scrum_review",
"model": "qwen3-coder:480b",
"sources": map[string]any{"matrix_corpus": []any{"docs"}},
"response": "ok",
}, "data/_kb/mode_experiments.jsonl", 1)
rec2 := modeExperimentsTransform(cloud)
if rec2.ModelProvider != "ollama_cloud" {
t.Errorf("provider=%q, want ollama_cloud", rec2.ModelProvider)
}
if len(rec2.SourceFiles) != 0 {
t.Errorf("source_files should be empty when file_path missing; got %v", rec2.SourceFiles)
}
}
func TestObserverEscalationsTransform_Tokens(t *testing.T) {
in := ti(map[string]any{
"ts": "2026-04-26T12:00:00Z",
"sig_hash": "abc",
"cluster_endpoint": "/v1/chat",
"prompt_tokens": 100.0,
"completion_tokens": 50.0,
"analysis": "review",
}, "data/_kb/observer_escalations.jsonl", 0)
rec := observerEscalationsTransform(in)
if rec.PromptTokens != 100 || rec.CompletionTokens != 50 {
t.Errorf("tokens: prompt=%d completion=%d", rec.PromptTokens, rec.CompletionTokens)
}
if rec.TaskID != "observer_escalation:/v1/chat" {
t.Errorf("task_id=%q", rec.TaskID)
}
}
func TestAuditFactsTransform_TextIsSummary(t *testing.T) {
in := ti(map[string]any{
"head_sha": "abc123",
"pr_number": 11.0,
"extracted_at": "2026-04-26T12:00:00Z",
"extractor": "qwen2.5",
"facts": []any{"f1", "f2"},
"entities": []any{"e1"},
"relationships": []any{},
}, "data/_kb/audit_facts.jsonl", 0)
rec := auditFactsTransform(in)
var summary map[string]any
if err := json.Unmarshal([]byte(rec.Text), &summary); err != nil {
t.Fatalf("text not JSON: %v", err)
}
if summary["facts"].(float64) != 2 || summary["entities"].(float64) != 1 || summary["relationships"].(float64) != 0 {
t.Errorf("counts wrong: %+v", summary)
}
}
func TestAutoApplyTransform_DeterministicTimestampFallback(t *testing.T) {
in := ti(map[string]any{
"action": "committed",
"file": "src/x.rs",
}, "data/_kb/auto_apply.jsonl", 0)
rec := autoApplyTransform(in)
if rec.Timestamp != fixedRecordedAt {
t.Errorf("expected fallback to RecordedAt %q, got %q", fixedRecordedAt, rec.Timestamp)
}
if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "committed" {
t.Errorf("success_markers: %v", rec.SuccessMarkers)
}
revertedIn := ti(map[string]any{
"ts": "2026-04-26T12:00:00Z",
"action": "auto_reverted_after_test_fail",
"file": "src/x.rs",
}, "data/_kb/auto_apply.jsonl", 1)
rec2 := autoApplyTransform(revertedIn)
if len(rec2.FailureMarkers) != 1 || rec2.FailureMarkers[0] != "auto_reverted_after_test_fail" {
t.Errorf("failure_markers: %v", rec2.FailureMarkers)
}
}
func TestAuditsTransform_SeverityRouting(t *testing.T) {
cases := []struct {
sev string
success bool
blocking bool
medium bool
}{
{"info", true, false, false},
{"low", true, false, false},
{"medium", false, false, true},
{"high", false, true, false},
{"critical", false, true, false},
}
for _, c := range cases {
t.Run(c.sev, func(t *testing.T) {
in := ti(map[string]any{
"finding_id": "F-1",
"phase": "G2",
"severity": c.sev,
"ts": "2026-04-26T12:00:00Z",
"evidence": "details",
}, "data/_kb/audits.jsonl", 0)
rec := auditsTransform(in)
hasSuccess := len(rec.SuccessMarkers) > 0
hasFailure := len(rec.FailureMarkers) > 0
if hasSuccess != c.success {
t.Errorf("severity=%s success=%v wanted %v", c.sev, hasSuccess, c.success)
}
if hasFailure != (c.blocking || c.medium) {
t.Errorf("severity=%s failure=%v wanted %v", c.sev, hasFailure, c.blocking || c.medium)
}
})
}
}
func TestOutcomesTransform_LatencyAndSuccess(t *testing.T) {
in := ti(map[string]any{
"run_id": "r-1",
"created_at": "2026-04-26T12:00:00Z",
"sig_hash": "abc",
"elapsed_secs": 1.234,
"ok_events": 5.0,
"total_events": 5.0,
"total_gap_signals": 2.0,
"total_citations": 3.0,
}, "data/_kb/outcomes.jsonl", 0)
rec := outcomesTransform(in)
if rec.LatencyMs != 1234 {
t.Errorf("latency=%d", rec.LatencyMs)
}
if len(rec.SuccessMarkers) != 1 || rec.SuccessMarkers[0] != "all_events_ok" {
t.Errorf("success: %v", rec.SuccessMarkers)
}
if g, ok := rec.ValidationResults["gap_signals"].(int64); !ok || g != 2 {
t.Errorf("gap_signals: %v", rec.ValidationResults)
}
if c, ok := rec.ValidationResults["citation_count"].(int64); !ok || c != 3 {
t.Errorf("citation_count: %v", rec.ValidationResults)
}
}
func TestTransformByPath_Found(t *testing.T) {
td := TransformByPath("data/_kb/distilled_facts.jsonl")
if td == nil {
t.Fatal("expected to find distilled_facts transform")
}
if TransformByPath("data/_kb/never_existed.jsonl") != nil {
t.Fatal("expected nil for unknown path")
}
}