root eb0dfdff04 vectord: v2 envelope + handleMerge robustness — actions post_role_gate_v1 scrum
3-lineage scrum on 434f466..0d4f033 surfaced one convergent finding
(Opus + Kimi) and 3 Opus-only real bugs. All actioned in this
commit. Two false positives (Kimi rollback misreading, Opus stale-
comment claim) verified + rejected — both required manual control-
flow inspection to refute, matching the documented Kimi-truncation
behavior in feedback_cross_lineage_review.md.

Convergent fix — DecodeIndex lost nil-meta items:
- Envelope version bumped 1 → 2.
- New v2 field: IDs []string carries the canonical ID set
  explicitly, independent of meta map's nil-vs-{} sparseness.
- DecodeIndex accepts both versions: v2 reads from env.IDs; v1
  falls back to meta-key inference (with the documented
  limitation that nil-meta items are invisible — preserved for
  backward-compat with already-persisted indexes).
- Encode emits v2 going forward.
- 2 new regression tests:
  - TestEncodeDecode_NilMetaItemsSurviveRoundTrip: items added
    with nil metadata MUST survive Encode → Decode and remain
    visible to IDs(). Pre-fix would have yielded IDs() == [].
  - TestDecodeIndex_V1BackwardCompat: hand-crafted v1 envelope
    still decodes (proves the fallback path).

Opus-only fixes:
- handleMerge: non-ErrIndexNotFound errors at h.reg.Get(name) /
  h.reg.Get(req.Dest) now return 500 + log instead of falling
  through with nil src/dest pointers (which would panic on the
  next deref). Real bug — only the sentinel error was handled.
- internal/drift/drift.go: mathLog wrapper removed; math.Log
  inlined. Wrapper added no value (math was already imported).
- internal/distillation/audit_baseline.go: BuildAuditDriftTable's
  bubble sort replaced with sort.Slice. Idiomatic + shorter.

Rejected after verification:
- Kimi WARN "missing rollback on partial merge": misread the
  control flow. Code at cmd/vectord/main.go:404-414 does NOT
  delete from src when dest.Add fails (continue before reaching
  src.Delete). Only successful Adds trigger Deletes.
- Opus INFO "TimestampUnixNano comment references missing field":
  field exists at scripts/multi_coord_stress/main.go:128. Opus
  saw only the diff context, not the full file.

Deferred (no fired trigger):
- Opus WARN "no per-index lock during merge": no concurrent merge
  callers today (operators run merge as deliberate one-shot job).
  Worth a lock if/when matrixd or chatd start auto-triggering.

Disposition: reports/scrum/_evidence/2026-05-01/verdicts/post_role_gate_v1_disposition.md.

Build + vet + tests green; 2 new regression tests + all prior tests
unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 01:20:37 -05:00

360 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package drift quantifies when historical decisions stop matching
// current reality. Per the PRD's 5-loop substrate, this is loop 5
// (drift) — distinct from the rating+distillation loop because
// drift is about MEASUREMENT, not learning. The learning loop says
// "this match worked, remember it"; the drift loop says "the
// playbook entry from 4 months ago — does it still match what the
// substrate would surface today?"
//
// First-shipped drift shape: SCORER drift. When the deterministic
// scorer's logic changes (ScorerVersion bumped), historical
// ScoredRuns may no longer match what the current scorer would
// produce on the same EvidenceRecord. ComputeScorerDrift re-runs
// the current scorer over a slice of (EvidenceRecord, persisted
// category) pairs and reports mismatches.
//
// Why this matters: the rating+distillation loop only learns
// forward. Without a drift quantifier, a scorer-rule change
// silently invalidates the historical training data feeding the
// loop. With drift quantification, a rule change surfaces a
// concrete number ("847 of 4701 historical scoredruns now
// disagree") that triggers a re-score-and-retrain cycle rather
// than letting the substrate quietly rot.
//
// Future drift shapes (not in this commit):
// - PLAYBOOK drift: for each playbook entry, re-run its query
// through current matrix-search; if the recorded answer is no
// longer in top-K, the world has moved.
// - EMBEDDING drift: KS-test on the distribution of embedding
// vectors at T1 vs T2; large shifts = the corpus has changed
// materially.
// - AUDIT BASELINE drift: track how PR audit verdicts shift over
// scorer/auditor versions; matches the Rust audit_baselines.jsonl
// longitudinal signal.
package drift
import (
"math"
"sort"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
)
// ScorerDriftEntry is one mismatch — a historical (record, category)
// pair where the current scorer disagrees with the persisted
// verdict. Reasons captures the current scorer's explanation so
// operators can see WHY the verdict changed.
type ScorerDriftEntry struct {
EvidenceRunID string `json:"evidence_run_id"`
EvidenceTaskID string `json:"evidence_task_id"`
PersistedCategory distillation.ScoreCategory `json:"persisted_category"`
CurrentCategory distillation.ScoreCategory `json:"current_category"`
CurrentReasons []string `json:"current_reasons"`
SourceFile string `json:"source_file"`
}
// CategoryShift is one cell in the drift matrix — "X persisted
// records that NOW classify as Y." e.g. "12 records that were
// 'rejected' yesterday are 'partially_accepted' today."
type CategoryShift struct {
From distillation.ScoreCategory `json:"from"`
To distillation.ScoreCategory `json:"to"`
Count int `json:"count"`
}
// ScorerDriftReport is the summary returned by ComputeScorerDrift.
// The shape is intentionally machine-readable so a downstream
// dashboard / alerting layer can threshold on Drifted / TotalChecked
// without parsing the entries list.
type ScorerDriftReport struct {
ScorerVersion string `json:"scorer_version"` // current scorer's version
TotalChecked int `json:"total_checked"`
Matched int `json:"matched"` // current == persisted
Drifted int `json:"drifted"` // current != persisted
DriftRate float64 `json:"drift_rate"` // Drifted / TotalChecked
ShiftMatrix []CategoryShift `json:"shift_matrix,omitempty"`
Entries []ScorerDriftEntry `json:"entries,omitempty"` // mismatches only
}
// ScorerDriftInput is one (record, persisted_category) pair to check.
// Caller is responsible for materializing these from disk; this
// package is pure compute.
type ScorerDriftInput struct {
Record distillation.EvidenceRecord
PersistedCategory distillation.ScoreCategory
}
// ComputeScorerDrift re-runs distillation.ScoreRecord over each
// input and reports mismatches. Pure function — no I/O. The caller
// supplies the inputs (typically by reading a directory of
// scored-runs JSONL alongside the corresponding evidence JSONL).
//
// IncludeEntries controls whether the per-mismatch detail list is
// populated. For large corpora (e.g. 4,701 fill events) the
// summary numbers may be all the caller needs; setting this to
// false avoids allocating the entries slice.
func ComputeScorerDrift(inputs []ScorerDriftInput, includeEntries bool) ScorerDriftReport {
report := ScorerDriftReport{
ScorerVersion: distillation.ScorerVersion,
TotalChecked: len(inputs),
}
shiftCounts := make(map[[2]distillation.ScoreCategory]int)
for _, in := range inputs {
out := distillation.ScoreRecord(in.Record)
if out.Category == in.PersistedCategory {
report.Matched++
continue
}
report.Drifted++
shiftCounts[[2]distillation.ScoreCategory{in.PersistedCategory, out.Category}]++
if includeEntries {
report.Entries = append(report.Entries, ScorerDriftEntry{
EvidenceRunID: in.Record.RunID,
EvidenceTaskID: in.Record.TaskID,
PersistedCategory: in.PersistedCategory,
CurrentCategory: out.Category,
CurrentReasons: out.Reasons,
SourceFile: in.Record.Provenance.SourceFile,
})
}
}
if report.TotalChecked > 0 {
report.DriftRate = float64(report.Drifted) / float64(report.TotalChecked)
}
if len(shiftCounts) > 0 {
report.ShiftMatrix = make([]CategoryShift, 0, len(shiftCounts))
for k, v := range shiftCounts {
report.ShiftMatrix = append(report.ShiftMatrix, CategoryShift{
From: k[0], To: k[1], Count: v,
})
}
// Sort: largest shifts first, then alphabetical-ish for ties.
// Stable ordering matters for downstream display and JSON
// determinism in tests.
sort.Slice(report.ShiftMatrix, func(i, j int) bool {
a, b := report.ShiftMatrix[i], report.ShiftMatrix[j]
if a.Count != b.Count {
return a.Count > b.Count
}
if a.From != b.From {
return string(a.From) < string(b.From)
}
return string(a.To) < string(b.To)
})
}
return report
}
// ─────────────────────────────────────────────────────────────────
// Distribution drift via Population Stability Index (PSI).
//
// Closes OPEN #3's "concrete distribution-drift signal" — pairs with
// ComputeScorerDrift's "categorical drift" to give callers a
// continuous-value drift metric for things like cosine distances,
// judge ratings, query top-1 distance distributions over time.
//
// Why PSI: standard in finance/risk for distribution-shift monitoring,
// well-defined verdict tiers (< 0.1 stable, 0.1-0.25 minor, > 0.25
// major), tolerant of moderate sample sizes, no assumption about
// distribution shape. Alternatives (KS-test, KL-divergence) all
// have failure modes when one bucket has zero observations; PSI's
// epsilon-clamping handles that gracefully.
//
// Math: PSI = Σᵢ (actualᵢ - expectedᵢ) × ln(actualᵢ / expectedᵢ)
// where actualᵢ and expectedᵢ are the proportion of observations
// falling into bucket i (zero values are clamped to a small
// epsilon so the log doesn't blow up).
//
// First-shipped use case: comparing the distribution of cosine top-1
// distances at T0 vs T1 to detect when the embedder's behavior
// against the corpus has shifted materially (e.g. after a model
// upgrade or a corpus refresh).
// ─────────────────────────────────────────────────────────────────
// DistributionDriftTier is a verdict bucket for the PSI value.
// Standard PSI thresholds from finance/risk; documented in the
// function-level comment for ComputeDistributionDrift.
type DistributionDriftTier string
const (
DriftTierStable DistributionDriftTier = "stable" // PSI < 0.1
DriftTierMinor DistributionDriftTier = "minor" // 0.1 ≤ PSI < 0.25
DriftTierMajor DistributionDriftTier = "major" // PSI ≥ 0.25
)
// DistributionDriftBucket is one bucket's contribution to the PSI sum.
// Useful for drilldown — operators can see WHICH part of the
// distribution shifted (e.g. "the [0.3, 0.4) bucket gained 12% of
// observations from T0 to T1").
type DistributionDriftBucket struct {
Lower float64 `json:"lower"`
Upper float64 `json:"upper"`
BaselineCount int `json:"baseline_count"`
CurrentCount int `json:"current_count"`
BaselineRatio float64 `json:"baseline_ratio"` // [0, 1]
CurrentRatio float64 `json:"current_ratio"` // [0, 1]
PSIPart float64 `json:"psi_part"` // contribution to PSI
}
// DistributionDriftReport is the aggregate output of a PSI run.
type DistributionDriftReport struct {
PSI float64 `json:"psi"`
Tier DistributionDriftTier `json:"tier"`
Buckets []DistributionDriftBucket `json:"buckets"`
BaselineN int `json:"baseline_n"`
CurrentN int `json:"current_n"`
NumBuckets int `json:"num_buckets"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}
// DistributionDriftInput is the input shape for ComputeDistributionDrift.
// Baseline is the reference distribution (T0); Current is the new
// distribution (T1). NumBuckets defaults to 10 when zero, capped at
// 100 to keep the per-bucket math stable on small samples.
type DistributionDriftInput struct {
Baseline []float64
Current []float64
NumBuckets int
}
// driftEpsilon clamps zero-bucket ratios so log doesn't blow up.
// 1e-4 is the standard PSI choice — represents "less than 0.01% of
// observations." Anything smaller would let one rare-bucket shift
// dominate the PSI sum unfairly.
const driftEpsilon = 1e-4
// ComputeDistributionDrift returns a PSI-based drift report between
// the baseline and current distributions. Buckets span [min, max] of
// the COMBINED data (so neither side falls outside) with
// equal-frequency buckets on the baseline (so each baseline bucket
// has roughly the same count — robust to skewed distributions like
// cosine distances which cluster near 0).
//
// Empty inputs return PSI=0, tier=stable. Caller should also check
// BaselineN/CurrentN before trusting the verdict — PSI on tiny
// samples is statistical noise.
//
// Thresholds (industry-standard, citable):
// - PSI < 0.10 → stable: distributions are operationally equivalent
// - 0.10 ≤ PSI < 0.25 → minor: investigate but not alarming
// - PSI ≥ 0.25 → major: distributions have shifted materially;
// downstream decisions trained on baseline may be invalid
func ComputeDistributionDrift(in DistributionDriftInput) DistributionDriftReport {
report := DistributionDriftReport{
BaselineN: len(in.Baseline),
CurrentN: len(in.Current),
}
if report.BaselineN == 0 || report.CurrentN == 0 {
report.Tier = DriftTierStable
return report
}
n := in.NumBuckets
if n <= 0 {
n = 10
}
if n > 100 {
n = 100
}
report.NumBuckets = n
// Combined min/max so both distributions fit the bucket range.
minV, maxV := in.Baseline[0], in.Baseline[0]
for _, v := range in.Baseline {
if v < minV {
minV = v
}
if v > maxV {
maxV = v
}
}
for _, v := range in.Current {
if v < minV {
minV = v
}
if v > maxV {
maxV = v
}
}
report.Min = minV
report.Max = maxV
if maxV == minV {
// All values identical — trivially stable, no meaningful PSI.
report.Tier = DriftTierStable
return report
}
// Equal-WIDTH bucketing. Equal-frequency would be more robust on
// skewed distributions, but it complicates the math (need to
// quantile-sort the baseline) for marginal gain on the symmetric
// distributions PSI is most useful for. Operators with skewed
// data can pre-bucket their inputs into normalized scores.
width := (maxV - minV) / float64(n)
bucketIdx := func(v float64) int {
if v >= maxV {
return n - 1 // right-edge inclusive
}
idx := int((v - minV) / width)
if idx < 0 {
idx = 0
}
if idx >= n {
idx = n - 1
}
return idx
}
bCounts := make([]int, n)
cCounts := make([]int, n)
for _, v := range in.Baseline {
bCounts[bucketIdx(v)]++
}
for _, v := range in.Current {
cCounts[bucketIdx(v)]++
}
report.Buckets = make([]DistributionDriftBucket, 0, n)
bN := float64(report.BaselineN)
cN := float64(report.CurrentN)
psi := 0.0
for i := 0; i < n; i++ {
bRatio := float64(bCounts[i]) / bN
cRatio := float64(cCounts[i]) / cN
// Epsilon clamp so empty buckets don't blow up log.
if bRatio < driftEpsilon {
bRatio = driftEpsilon
}
if cRatio < driftEpsilon {
cRatio = driftEpsilon
}
part := (cRatio - bRatio) * math.Log(cRatio/bRatio)
psi += part
report.Buckets = append(report.Buckets, DistributionDriftBucket{
Lower: minV + float64(i)*width,
Upper: minV + float64(i+1)*width,
BaselineCount: bCounts[i],
CurrentCount: cCounts[i],
BaselineRatio: float64(bCounts[i]) / bN,
CurrentRatio: float64(cCounts[i]) / cN,
PSIPart: part,
})
}
report.PSI = psi
switch {
case psi < 0.10:
report.Tier = DriftTierStable
case psi < 0.25:
report.Tier = DriftTierMinor
default:
report.Tier = DriftTierMajor
}
return report
}