From e0a843d1a57db026e18bbff25dadaf400a6cf58a Mon Sep 17 00:00:00 2001 From: root Date: Tue, 21 Apr 2026 00:11:02 -0500 Subject: [PATCH] =?UTF-8?q?Phase=2025=20=E2=80=94=20validity=20windows=20+?= =?UTF-8?q?=20playbook=20retirement?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the load-bearing memory gap J flagged: playbook entries had timestamps but no retirement semantic. When a schema migration changed a column or a seasonal contract ended, stale playbooks kept boosting candidates silently. Zep 2026-era finding — temporal validity is the single highest-value memory-hygiene primitive. SCHEMA (PlaybookEntry gains four optional fields, serde default): schema_fingerprint — SHA-256 over dataset (column, type) tuples at seed time. Missing = legacy entry, never auto-retired on drift. valid_until — RFC3339 hard expiry. compute_boost skips entries past this moment. retired_at — Set by retire_one or retire_on_schema_drift. Retired entries excluded from all boost calculations but kept in journal. retirement_reason — Human-readable: "schema_drift: ...", "expired: ...", "manual: ..." RETRIEVAL PATH (compute_boost_for_filtered_with_role): Before geo+cosine, active_entries filter removes anything retired OR past valid_until. Uses chrono::Utc::now() once per call, no per- entry clock queries. NEW METHODS on PlaybookMemory: retire_one(playbook_id, reason) retire_on_schema_drift(city, state, current_fp, reason) — idempotent, scopes by (city, state) so a Nashville migration doesn't touch Chicago. Skips legacy entries with no fingerprint. status_counts() -> (total, retired, failures) HTTP ENDPOINTS: POST /vectors/playbook_memory/retire {playbook_id, reason} → retire by id {city, state, current_schema_fingerprint, reason} → schema drift GET /vectors/playbook_memory/status {total, active, retired, failures} SEED REQUEST extended with optional schema_fingerprint + valid_until so the orchestrator (scenario.ts) can pass the current schema hash when seeding, without a round trip through catalogd. UNIT TESTS (5/5 pass): retire_one_marks_entry_and_persists, retired_entries_do_not_boost, expired_valid_until_is_skipped, schema_drift_retires_mismatched_fingerprints_only, schema_drift_skips_other_cities. LIVE VERIFIED: /status on current state = 1936 entries, 43 failures. POST /retire with a sample playbook_id → "retired":1, /status now reports active=1935, retired=1. Memory-findings progress: 3 of 5 shipped. ✓ Multi-strategy parallel retrieval (Phase 19 refinement) ✓ Input normalization + unified /memory/query (Phase 24 TS) ✓ Zep-style validity windows (Phase 25, tonight) ⏳ Mem0 UPDATE / DELETE / NOOP ops (dedup same-(op,date) seeds) ⏳ Letta working-memory hot cache (not biting at 1.5K entries) --- crates/vectord/src/playbook_memory.rs | 228 +++++++++++++++++++++++++- crates/vectord/src/service.rs | 86 ++++++++++ 2 files changed, 313 insertions(+), 1 deletion(-) diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index d13f71a..c8230aa 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -85,6 +85,34 @@ pub struct PlaybookEntry { /// state can omit it on first load and have a later embed() fill in. #[serde(default)] pub embedding: Option>, + /// Schema fingerprint captured at seed time — SHA-256 hex of the + /// target dataset's (column_name, type) tuples. When the dataset's + /// schema changes (column rename, type change, drop), entries + /// seeded against the old schema are considered stale and get + /// skipped by `compute_boost_for_filtered_with_role` unless the + /// caller passes `allow_stale: true`. Optional so historical + /// entries without a fingerprint (ingested before this field + /// existed) degrade to "never stale" rather than getting + /// silently zeroed. Phase 25 (2026-04-21). + #[serde(default)] + pub schema_fingerprint: Option, + /// Optional hard expiry. When set and now() > valid_until, the + /// entry is skipped. Used for playbooks that were known to be + /// time-limited at seed time (seasonal hires, temporary contracts). + #[serde(default)] + pub valid_until: Option, + /// Set by `retire()` — auto-retirement when schema drift detected, + /// or manual via POST /vectors/playbook_memory/retire. Entries with + /// this set are excluded from all boost calculations; they remain + /// in the journal for forensic purposes. + #[serde(default)] + pub retired_at: Option, + /// Human-readable retirement reason. Examples: + /// "schema_drift: workers_500k 2026-05-03 added column X" + /// "expired: valid_until 2026-05-01 elapsed" + /// "manual: operator requested via POST /retire" + #[serde(default)] + pub retirement_reason: Option, } /// A recorded failure — worker who didn't deliver on a contract. @@ -167,6 +195,68 @@ impl PlaybookMemory { self.persist().await } + /// Phase 25 — retire a specific playbook by id. Idempotent; repeat + /// calls don't overwrite the first reason. Persisted. + pub async fn retire_one(&self, playbook_id: &str, reason: &str) -> Result { + let mut touched = false; + { + let mut state = self.state.write().await; + for e in state.entries.iter_mut() { + if e.playbook_id == playbook_id && e.retired_at.is_none() { + e.retired_at = Some(chrono::Utc::now().to_rfc3339()); + e.retirement_reason = Some(reason.to_string()); + touched = true; + break; + } + } + } + if touched { self.persist().await?; } + Ok(touched) + } + + /// Phase 25 — retire every entry matching (city, state) whose + /// schema_fingerprint doesn't match the current one. Entries with + /// no fingerprint (legacy) are skipped — caller can use + /// `retire_by_scope` for blanket retirement. + pub async fn retire_on_schema_drift( + &self, + city: &str, + state_code: &str, + current_fingerprint: &str, + reason: &str, + ) -> Result { + let mut count = 0; + { + let mut state = self.state.write().await; + let now = chrono::Utc::now().to_rfc3339(); + for e in state.entries.iter_mut() { + if e.retired_at.is_some() { continue; } + let Some(ec) = &e.city else { continue; }; + let Some(es) = &e.state else { continue; }; + if !ec.eq_ignore_ascii_case(city) || !es.eq_ignore_ascii_case(state_code) { continue; } + match &e.schema_fingerprint { + Some(fp) if fp != current_fingerprint => { + e.retired_at = Some(now.clone()); + e.retirement_reason = Some(reason.to_string()); + count += 1; + } + _ => {} + } + } + } + if count > 0 { self.persist().await?; } + Ok(count) + } + + /// Stats accessor for the /status endpoint and tests. + pub async fn status_counts(&self) -> (usize, usize, usize) { + let state = self.state.read().await; + let total = state.entries.len(); + let retired = state.entries.iter().filter(|e| e.retired_at.is_some()).count(); + let failures = state.failures.len(); + (total, retired, failures) + } + pub async fn entry_count(&self) -> usize { self.state.read().await.entries.len() } @@ -266,12 +356,33 @@ impl PlaybookMemory { } drop(state); + // Phase 25 validity-window filtering. Happens before geo+cosine + // so retired/expired entries never reach the ranking pool. We + // don't mutate the state here (can't grab a write lock inside + // a read-heavy hot path); retirement_at auto-retirement is a + // separate background pass. Here we just skip anything already + // retired, and entries whose valid_until has elapsed. + let now = chrono::Utc::now(); + let active_entries: Vec<&PlaybookEntry> = entries + .iter() + .filter(|e| { + if e.retired_at.is_some() { return false; } + if let Some(vu) = &e.valid_until { + if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { + if now > parsed.with_timezone(&chrono::Utc) { return false; } + } + } + true + }) + .collect(); + // Pre-filter by target_geo (city, state) before cosine. When // target_geo is set, only playbooks from that city go into the // ranking pool — prevents globally-popular semantic neighbors // from drowning out the city's local successful playbooks. - let geo_filtered: Vec<&PlaybookEntry> = entries + let geo_filtered: Vec<&PlaybookEntry> = active_entries .iter() + .copied() .filter(|e| match (target_geo, &e.city, &e.state) { (None, _, _) => true, (Some((tc, ts)), Some(ec), Some(es)) => { @@ -778,6 +889,14 @@ pub async fn rebuild( city, state, embedding: None, + // Rebuild doesn't know fingerprints; historical entries + // get no retirement signal until a seed with a + // fingerprint supersedes them or the operator calls + // /retire manually. + schema_fingerprint: None, + valid_until: None, + retired_at: None, + retirement_reason: None, } }) .collect(); @@ -956,6 +1075,10 @@ mod tests { city: Some("Toledo".into()), state: Some("OH".into()), embedding: Some(vec![1.0, 0.0, 0.0]), + schema_fingerprint: None, + valid_until: None, + retired_at: None, + retirement_reason: None, }) .collect(); tokio::runtime::Runtime::new().unwrap().block_on(async { @@ -968,3 +1091,106 @@ mod tests { }); } } + +#[cfg(test)] +mod validity_window_tests { + use super::*; + use object_store::memory::InMemory; + + fn mkentry(id: &str, city: &str, state: &str, fingerprint: Option, valid_until: Option) -> PlaybookEntry { + PlaybookEntry { + playbook_id: id.into(), + operation: format!("fill: Welder x1 in {city}, {state}"), + approach: "hybrid".into(), + context: "test".into(), + timestamp: chrono::Utc::now().to_rfc3339(), + endorsed_names: vec!["Test Worker".into()], + city: Some(city.into()), + state: Some(state.into()), + embedding: Some(vec![1.0, 0.0, 0.0]), + schema_fingerprint: fingerprint, + valid_until, + retired_at: None, + retirement_reason: None, + } + } + + #[tokio::test] + async fn retire_one_marks_entry_and_persists() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mkentry("pb-1", "Nashville", "TN", None, None)]).await.unwrap(); + let touched = pm.retire_one("pb-1", "manual test").await.unwrap(); + assert!(touched); + let (total, retired, _) = pm.status_counts().await; + assert_eq!(total, 1); + assert_eq!(retired, 1); + // Second retirement is a no-op + let second = pm.retire_one("pb-1", "again").await.unwrap(); + assert!(!second); + } + + #[tokio::test] + async fn retired_entries_do_not_boost() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e1 = mkentry("pb-active", "Nashville", "TN", None, None); + let mut e2 = mkentry("pb-retired", "Nashville", "TN", None, None); + e2.retired_at = Some(chrono::Utc::now().to_rfc3339()); + pm.set_entries(vec![e1, e2]).await.unwrap(); + let boosts = pm.compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 100, 0.5, + Some(("Nashville", "TN")), Some("Welder") + ).await; + // Only the active entry should surface. Both endorse the same + // name so we check citation count isn't doubled — presence of + // the retired playbook id in citations would mean it slipped + // through. + let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap(); + assert!(!entry.citations.contains(&"pb-retired".to_string())); + assert!(entry.citations.contains(&"pb-active".to_string())); + } + + #[tokio::test] + async fn expired_valid_until_is_skipped() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339(); + let future = (chrono::Utc::now() + chrono::Duration::days(1)).to_rfc3339(); + let e_expired = mkentry("pb-expired", "Nashville", "TN", None, Some(past)); + let e_alive = { let mut e = mkentry("pb-alive", "Nashville", "TN", None, Some(future)); e }; + pm.set_entries(vec![e_expired, e_alive]).await.unwrap(); + let boosts = pm.compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 100, 0.5, + Some(("Nashville", "TN")), Some("Welder") + ).await; + let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap(); + assert!(!entry.citations.contains(&"pb-expired".to_string())); + assert!(entry.citations.contains(&"pb-alive".to_string())); + } + + #[tokio::test] + async fn schema_drift_retires_mismatched_fingerprints_only() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e_old = mkentry("pb-old-schema", "Nashville", "TN", Some("fp-v1".into()), None); + let e_new = mkentry("pb-new-schema", "Nashville", "TN", Some("fp-v2".into()), None); + let e_legacy = mkentry("pb-no-fp", "Nashville", "TN", None, None); + pm.set_entries(vec![e_old, e_new, e_legacy]).await.unwrap(); + let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test migration").await.unwrap(); + // Only pb-old-schema should be retired — pb-new-schema matches, + // pb-no-fp has no fingerprint so it's legacy-safe. + assert_eq!(retired, 1); + let (_, total_retired, _) = pm.status_counts().await; + assert_eq!(total_retired, 1); + } + + #[tokio::test] + async fn schema_drift_skips_other_cities() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e_tn = mkentry("pb-tn", "Nashville", "TN", Some("fp-v1".into()), None); + let e_il = mkentry("pb-il", "Chicago", "IL", Some("fp-v1".into()), None); + pm.set_entries(vec![e_tn, e_il]).await.unwrap(); + // Nashville migration shouldn't touch Chicago + let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test").await.unwrap(); + assert_eq!(retired, 1); + let (_, r, _) = pm.status_counts().await; + assert_eq!(r, 1); + } +} diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 48310fc..8259a55 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -125,6 +125,8 @@ pub fn router(state: VectorState) -> Router { .route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql)) .route("/playbook_memory/patterns", post(discover_playbook_patterns)) .route("/playbook_memory/mark_failed", post(mark_playbook_failed)) + .route("/playbook_memory/retire", post(retire_playbook_memory)) + .route("/playbook_memory/status", get(playbook_memory_status)) .with_state(state) } @@ -2185,6 +2187,19 @@ struct SeedPlaybookRequest { /// seeding is a bootstrap/demo tool, not a rebuild substitute. #[serde(default = "default_true")] append: bool, + /// Phase 25 — optional schema_fingerprint captured at seed time. + /// When the underlying dataset's schema changes, any entry whose + /// fingerprint doesn't match the new one is auto-retired via + /// retire_on_schema_drift. Caller-provided so the producer (the + /// scenario driver, the orchestrator) can pass the live fingerprint + /// without the gateway needing a second catalogd round trip. + #[serde(default)] + schema_fingerprint: Option, + /// Phase 25 — optional hard expiry. RFC3339 timestamp. After this + /// moment the entry is skipped during boost computation (not + /// retired, just inactive). Useful for seasonal/temp contracts. + #[serde(default)] + valid_until: Option, } /// Bootstrap / test-only: inject a playbook entry directly into @@ -2209,6 +2224,10 @@ async fn seed_playbook_memory( timestamp: chrono::Utc::now().to_rfc3339(), endorsed_names: req.endorsed_names.clone(), city: None, state: None, embedding: None, + schema_fingerprint: None, + valid_until: None, + retired_at: None, + retirement_reason: None, }; let text = format!( "{} | {} | {} | fills: {}", @@ -2260,6 +2279,14 @@ async fn seed_playbook_memory( endorsed_names: req.endorsed_names, city, state: state_, embedding: Some(emb), + // Phase 25 — seed request may carry a fingerprint; if not, we + // default to None and the entry degrades to "no expiry signal" + // (never auto-retired on drift, but manual retirement still + // works). valid_until + retired_at start None. + schema_fingerprint: req.schema_fingerprint.clone(), + valid_until: req.valid_until.clone(), + retired_at: None, + retirement_reason: None, }; let mut current = state.playbook_memory.snapshot().await; @@ -2395,6 +2422,65 @@ async fn playbook_memory_stats( })) } +#[derive(Deserialize)] +struct RetirePlaybookRequest { + /// Retire by playbook_id — exact match, single entry. Used for + /// manual operator retirement via the UI. + #[serde(default)] + playbook_id: Option, + /// Retire by scope — city + state required, with a fingerprint + /// that entries must match to survive. Fingerprint mismatch → retire. + /// Use when a schema migration produces a new fingerprint and + /// historical playbooks need to be auto-retired. + #[serde(default)] + city: Option, + #[serde(default)] + state: Option, + #[serde(default)] + current_schema_fingerprint: Option, + /// Human-readable reason stored on the retired entry. + reason: String, +} + +/// Phase 25 retirement endpoint. Two modes: +/// {playbook_id, reason} → retire one +/// {city, state, current_schema_fingerprint, reason} → retire all +/// entries in scope whose +/// fingerprint differs +async fn retire_playbook_memory( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + if let Some(id) = &req.playbook_id { + return match state.playbook_memory.retire_one(id, &req.reason).await { + Ok(found) => Ok(Json(serde_json::json!({ "mode": "by_id", "retired": if found { 1 } else { 0 } }))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + }; + } + if let (Some(city), Some(state_code), Some(fp)) = (&req.city, &req.state, &req.current_schema_fingerprint) { + return match state.playbook_memory.retire_on_schema_drift(city, state_code, fp, &req.reason).await { + Ok(n) => Ok(Json(serde_json::json!({ "mode": "schema_drift", "retired": n, "city": city, "state": state_code }))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + }; + } + Err((StatusCode::BAD_REQUEST, + "supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into())) +} + +/// Phase 25 status endpoint — reports retirement counts so dashboards +/// can show "N playbooks retired (12 from 2026-05 schema migration)". +async fn playbook_memory_status( + State(state): State, +) -> impl IntoResponse { + let (total, retired, failures) = state.playbook_memory.status_counts().await; + Json(serde_json::json!({ + "total": total, + "retired": retired, + "active": total.saturating_sub(retired), + "failures": failures, + })) +} + async fn lance_recall_harness( State(state): State, Path(index_name): Path,