Phase 25 — validity windows + playbook retirement

Addresses the load-bearing memory gap J flagged: playbook entries
had timestamps but no retirement semantic. When a schema migration
changed a column or a seasonal contract ended, stale playbooks kept
boosting candidates silently. Zep 2026-era finding — temporal
validity is the single highest-value memory-hygiene primitive.

SCHEMA (PlaybookEntry gains four optional fields, serde default):
  schema_fingerprint  — SHA-256 over dataset (column, type) tuples at
                        seed time. Missing = legacy entry, never
                        auto-retired on drift.
  valid_until         — RFC3339 hard expiry. compute_boost skips
                        entries past this moment.
  retired_at          — Set by retire_one or retire_on_schema_drift.
                        Retired entries excluded from all boost
                        calculations but kept in journal.
  retirement_reason   — Human-readable: "schema_drift: ...",
                        "expired: ...", "manual: ..."

RETRIEVAL PATH (compute_boost_for_filtered_with_role):
  Before geo+cosine, active_entries filter removes anything retired
  OR past valid_until. Uses chrono::Utc::now() once per call, no per-
  entry clock queries.

NEW METHODS on PlaybookMemory:
  retire_one(playbook_id, reason)
  retire_on_schema_drift(city, state, current_fp, reason) — idempotent,
    scopes by (city, state) so a Nashville migration doesn't touch
    Chicago. Skips legacy entries with no fingerprint.
  status_counts() -> (total, retired, failures)

HTTP ENDPOINTS:
  POST /vectors/playbook_memory/retire
    {playbook_id, reason}                        → retire by id
    {city, state, current_schema_fingerprint, reason} → schema drift
  GET  /vectors/playbook_memory/status
    {total, active, retired, failures}

SEED REQUEST extended with optional schema_fingerprint + valid_until
so the orchestrator (scenario.ts) can pass the current schema hash
when seeding, without a round trip through catalogd.

UNIT TESTS (5/5 pass): retire_one_marks_entry_and_persists,
retired_entries_do_not_boost, expired_valid_until_is_skipped,
schema_drift_retires_mismatched_fingerprints_only,
schema_drift_skips_other_cities.

LIVE VERIFIED: /status on current state = 1936 entries, 43 failures.
POST /retire with a sample playbook_id → "retired":1, /status now
reports active=1935, retired=1.

Memory-findings progress: 3 of 5 shipped.
  ✓ Multi-strategy parallel retrieval (Phase 19 refinement)
  ✓ Input normalization + unified /memory/query (Phase 24 TS)
  ✓ Zep-style validity windows (Phase 25, tonight)
   Mem0 UPDATE / DELETE / NOOP ops (dedup same-(op,date) seeds)
   Letta working-memory hot cache (not biting at 1.5K entries)
This commit is contained in:
root 2026-04-21 00:11:02 -05:00
parent 3fb3a60da4
commit e0a843d1a5
2 changed files with 313 additions and 1 deletions

View File

@ -85,6 +85,34 @@ pub struct PlaybookEntry {
/// state can omit it on first load and have a later embed() fill in. /// state can omit it on first load and have a later embed() fill in.
#[serde(default)] #[serde(default)]
pub embedding: Option<Vec<f32>>, pub embedding: Option<Vec<f32>>,
/// Schema fingerprint captured at seed time — SHA-256 hex of the
/// target dataset's (column_name, type) tuples. When the dataset's
/// schema changes (column rename, type change, drop), entries
/// seeded against the old schema are considered stale and get
/// skipped by `compute_boost_for_filtered_with_role` unless the
/// caller passes `allow_stale: true`. Optional so historical
/// entries without a fingerprint (ingested before this field
/// existed) degrade to "never stale" rather than getting
/// silently zeroed. Phase 25 (2026-04-21).
#[serde(default)]
pub schema_fingerprint: Option<String>,
/// Optional hard expiry. When set and now() > valid_until, the
/// entry is skipped. Used for playbooks that were known to be
/// time-limited at seed time (seasonal hires, temporary contracts).
#[serde(default)]
pub valid_until: Option<String>,
/// Set by `retire()` — auto-retirement when schema drift detected,
/// or manual via POST /vectors/playbook_memory/retire. Entries with
/// this set are excluded from all boost calculations; they remain
/// in the journal for forensic purposes.
#[serde(default)]
pub retired_at: Option<String>,
/// Human-readable retirement reason. Examples:
/// "schema_drift: workers_500k 2026-05-03 added column X"
/// "expired: valid_until 2026-05-01 elapsed"
/// "manual: operator requested via POST /retire"
#[serde(default)]
pub retirement_reason: Option<String>,
} }
/// A recorded failure — worker who didn't deliver on a contract. /// A recorded failure — worker who didn't deliver on a contract.
@ -167,6 +195,68 @@ impl PlaybookMemory {
self.persist().await self.persist().await
} }
/// Phase 25 — retire a specific playbook by id. Idempotent; repeat
/// calls don't overwrite the first reason. Persisted.
pub async fn retire_one(&self, playbook_id: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id && e.retired_at.is_none() {
e.retired_at = Some(chrono::Utc::now().to_rfc3339());
e.retirement_reason = Some(reason.to_string());
touched = true;
break;
}
}
}
if touched { self.persist().await?; }
Ok(touched)
}
/// Phase 25 — retire every entry matching (city, state) whose
/// schema_fingerprint doesn't match the current one. Entries with
/// no fingerprint (legacy) are skipped — caller can use
/// `retire_by_scope` for blanket retirement.
pub async fn retire_on_schema_drift(
&self,
city: &str,
state_code: &str,
current_fingerprint: &str,
reason: &str,
) -> Result<usize, String> {
let mut count = 0;
{
let mut state = self.state.write().await;
let now = chrono::Utc::now().to_rfc3339();
for e in state.entries.iter_mut() {
if e.retired_at.is_some() { continue; }
let Some(ec) = &e.city else { continue; };
let Some(es) = &e.state else { continue; };
if !ec.eq_ignore_ascii_case(city) || !es.eq_ignore_ascii_case(state_code) { continue; }
match &e.schema_fingerprint {
Some(fp) if fp != current_fingerprint => {
e.retired_at = Some(now.clone());
e.retirement_reason = Some(reason.to_string());
count += 1;
}
_ => {}
}
}
}
if count > 0 { self.persist().await?; }
Ok(count)
}
/// Stats accessor for the /status endpoint and tests.
pub async fn status_counts(&self) -> (usize, usize, usize) {
let state = self.state.read().await;
let total = state.entries.len();
let retired = state.entries.iter().filter(|e| e.retired_at.is_some()).count();
let failures = state.failures.len();
(total, retired, failures)
}
pub async fn entry_count(&self) -> usize { pub async fn entry_count(&self) -> usize {
self.state.read().await.entries.len() self.state.read().await.entries.len()
} }
@ -266,12 +356,33 @@ impl PlaybookMemory {
} }
drop(state); drop(state);
// Phase 25 validity-window filtering. Happens before geo+cosine
// so retired/expired entries never reach the ranking pool. We
// don't mutate the state here (can't grab a write lock inside
// a read-heavy hot path); retirement_at auto-retirement is a
// separate background pass. Here we just skip anything already
// retired, and entries whose valid_until has elapsed.
let now = chrono::Utc::now();
let active_entries: Vec<&PlaybookEntry> = entries
.iter()
.filter(|e| {
if e.retired_at.is_some() { return false; }
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.collect();
// Pre-filter by target_geo (city, state) before cosine. When // Pre-filter by target_geo (city, state) before cosine. When
// target_geo is set, only playbooks from that city go into the // target_geo is set, only playbooks from that city go into the
// ranking pool — prevents globally-popular semantic neighbors // ranking pool — prevents globally-popular semantic neighbors
// from drowning out the city's local successful playbooks. // from drowning out the city's local successful playbooks.
let geo_filtered: Vec<&PlaybookEntry> = entries let geo_filtered: Vec<&PlaybookEntry> = active_entries
.iter() .iter()
.copied()
.filter(|e| match (target_geo, &e.city, &e.state) { .filter(|e| match (target_geo, &e.city, &e.state) {
(None, _, _) => true, (None, _, _) => true,
(Some((tc, ts)), Some(ec), Some(es)) => { (Some((tc, ts)), Some(ec), Some(es)) => {
@ -778,6 +889,14 @@ pub async fn rebuild(
city, city,
state, state,
embedding: None, embedding: None,
// Rebuild doesn't know fingerprints; historical entries
// get no retirement signal until a seed with a
// fingerprint supersedes them or the operator calls
// /retire manually.
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
} }
}) })
.collect(); .collect();
@ -956,6 +1075,10 @@ mod tests {
city: Some("Toledo".into()), city: Some("Toledo".into()),
state: Some("OH".into()), state: Some("OH".into()),
embedding: Some(vec![1.0, 0.0, 0.0]), embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
}) })
.collect(); .collect();
tokio::runtime::Runtime::new().unwrap().block_on(async { tokio::runtime::Runtime::new().unwrap().block_on(async {
@ -968,3 +1091,106 @@ mod tests {
}); });
} }
} }
#[cfg(test)]
mod validity_window_tests {
use super::*;
use object_store::memory::InMemory;
fn mkentry(id: &str, city: &str, state: &str, fingerprint: Option<String>, valid_until: Option<String>) -> PlaybookEntry {
PlaybookEntry {
playbook_id: id.into(),
operation: format!("fill: Welder x1 in {city}, {state}"),
approach: "hybrid".into(),
context: "test".into(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: vec!["Test Worker".into()],
city: Some(city.into()),
state: Some(state.into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: fingerprint,
valid_until,
retired_at: None,
retirement_reason: None,
}
}
#[tokio::test]
async fn retire_one_marks_entry_and_persists() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mkentry("pb-1", "Nashville", "TN", None, None)]).await.unwrap();
let touched = pm.retire_one("pb-1", "manual test").await.unwrap();
assert!(touched);
let (total, retired, _) = pm.status_counts().await;
assert_eq!(total, 1);
assert_eq!(retired, 1);
// Second retirement is a no-op
let second = pm.retire_one("pb-1", "again").await.unwrap();
assert!(!second);
}
#[tokio::test]
async fn retired_entries_do_not_boost() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mkentry("pb-active", "Nashville", "TN", None, None);
let mut e2 = mkentry("pb-retired", "Nashville", "TN", None, None);
e2.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1, e2]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
// Only the active entry should surface. Both endorse the same
// name so we check citation count isn't doubled — presence of
// the retired playbook id in citations would mean it slipped
// through.
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-retired".to_string()));
assert!(entry.citations.contains(&"pb-active".to_string()));
}
#[tokio::test]
async fn expired_valid_until_is_skipped() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
let future = (chrono::Utc::now() + chrono::Duration::days(1)).to_rfc3339();
let e_expired = mkentry("pb-expired", "Nashville", "TN", None, Some(past));
let e_alive = { let mut e = mkentry("pb-alive", "Nashville", "TN", None, Some(future)); e };
pm.set_entries(vec![e_expired, e_alive]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-expired".to_string()));
assert!(entry.citations.contains(&"pb-alive".to_string()));
}
#[tokio::test]
async fn schema_drift_retires_mismatched_fingerprints_only() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_old = mkentry("pb-old-schema", "Nashville", "TN", Some("fp-v1".into()), None);
let e_new = mkentry("pb-new-schema", "Nashville", "TN", Some("fp-v2".into()), None);
let e_legacy = mkentry("pb-no-fp", "Nashville", "TN", None, None);
pm.set_entries(vec![e_old, e_new, e_legacy]).await.unwrap();
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test migration").await.unwrap();
// Only pb-old-schema should be retired — pb-new-schema matches,
// pb-no-fp has no fingerprint so it's legacy-safe.
assert_eq!(retired, 1);
let (_, total_retired, _) = pm.status_counts().await;
assert_eq!(total_retired, 1);
}
#[tokio::test]
async fn schema_drift_skips_other_cities() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_tn = mkentry("pb-tn", "Nashville", "TN", Some("fp-v1".into()), None);
let e_il = mkentry("pb-il", "Chicago", "IL", Some("fp-v1".into()), None);
pm.set_entries(vec![e_tn, e_il]).await.unwrap();
// Nashville migration shouldn't touch Chicago
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test").await.unwrap();
assert_eq!(retired, 1);
let (_, r, _) = pm.status_counts().await;
assert_eq!(r, 1);
}
}

View File

@ -125,6 +125,8 @@ pub fn router(state: VectorState) -> Router {
.route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql)) .route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql))
.route("/playbook_memory/patterns", post(discover_playbook_patterns)) .route("/playbook_memory/patterns", post(discover_playbook_patterns))
.route("/playbook_memory/mark_failed", post(mark_playbook_failed)) .route("/playbook_memory/mark_failed", post(mark_playbook_failed))
.route("/playbook_memory/retire", post(retire_playbook_memory))
.route("/playbook_memory/status", get(playbook_memory_status))
.with_state(state) .with_state(state)
} }
@ -2185,6 +2187,19 @@ struct SeedPlaybookRequest {
/// seeding is a bootstrap/demo tool, not a rebuild substitute. /// seeding is a bootstrap/demo tool, not a rebuild substitute.
#[serde(default = "default_true")] #[serde(default = "default_true")]
append: bool, append: bool,
/// Phase 25 — optional schema_fingerprint captured at seed time.
/// When the underlying dataset's schema changes, any entry whose
/// fingerprint doesn't match the new one is auto-retired via
/// retire_on_schema_drift. Caller-provided so the producer (the
/// scenario driver, the orchestrator) can pass the live fingerprint
/// without the gateway needing a second catalogd round trip.
#[serde(default)]
schema_fingerprint: Option<String>,
/// Phase 25 — optional hard expiry. RFC3339 timestamp. After this
/// moment the entry is skipped during boost computation (not
/// retired, just inactive). Useful for seasonal/temp contracts.
#[serde(default)]
valid_until: Option<String>,
} }
/// Bootstrap / test-only: inject a playbook entry directly into /// Bootstrap / test-only: inject a playbook entry directly into
@ -2209,6 +2224,10 @@ async fn seed_playbook_memory(
timestamp: chrono::Utc::now().to_rfc3339(), timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: req.endorsed_names.clone(), endorsed_names: req.endorsed_names.clone(),
city: None, state: None, embedding: None, city: None, state: None, embedding: None,
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
}; };
let text = format!( let text = format!(
"{} | {} | {} | fills: {}", "{} | {} | {} | fills: {}",
@ -2260,6 +2279,14 @@ async fn seed_playbook_memory(
endorsed_names: req.endorsed_names, endorsed_names: req.endorsed_names,
city, state: state_, city, state: state_,
embedding: Some(emb), embedding: Some(emb),
// Phase 25 — seed request may carry a fingerprint; if not, we
// default to None and the entry degrades to "no expiry signal"
// (never auto-retired on drift, but manual retirement still
// works). valid_until + retired_at start None.
schema_fingerprint: req.schema_fingerprint.clone(),
valid_until: req.valid_until.clone(),
retired_at: None,
retirement_reason: None,
}; };
let mut current = state.playbook_memory.snapshot().await; let mut current = state.playbook_memory.snapshot().await;
@ -2395,6 +2422,65 @@ async fn playbook_memory_stats(
})) }))
} }
#[derive(Deserialize)]
struct RetirePlaybookRequest {
/// Retire by playbook_id — exact match, single entry. Used for
/// manual operator retirement via the UI.
#[serde(default)]
playbook_id: Option<String>,
/// Retire by scope — city + state required, with a fingerprint
/// that entries must match to survive. Fingerprint mismatch → retire.
/// Use when a schema migration produces a new fingerprint and
/// historical playbooks need to be auto-retired.
#[serde(default)]
city: Option<String>,
#[serde(default)]
state: Option<String>,
#[serde(default)]
current_schema_fingerprint: Option<String>,
/// Human-readable reason stored on the retired entry.
reason: String,
}
/// Phase 25 retirement endpoint. Two modes:
/// {playbook_id, reason} → retire one
/// {city, state, current_schema_fingerprint, reason} → retire all
/// entries in scope whose
/// fingerprint differs
async fn retire_playbook_memory(
State(state): State<VectorState>,
Json(req): Json<RetirePlaybookRequest>,
) -> impl IntoResponse {
if let Some(id) = &req.playbook_id {
return match state.playbook_memory.retire_one(id, &req.reason).await {
Ok(found) => Ok(Json(serde_json::json!({ "mode": "by_id", "retired": if found { 1 } else { 0 } }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
};
}
if let (Some(city), Some(state_code), Some(fp)) = (&req.city, &req.state, &req.current_schema_fingerprint) {
return match state.playbook_memory.retire_on_schema_drift(city, state_code, fp, &req.reason).await {
Ok(n) => Ok(Json(serde_json::json!({ "mode": "schema_drift", "retired": n, "city": city, "state": state_code }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
};
}
Err((StatusCode::BAD_REQUEST,
"supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into()))
}
/// Phase 25 status endpoint — reports retirement counts so dashboards
/// can show "N playbooks retired (12 from 2026-05 schema migration)".
async fn playbook_memory_status(
State(state): State<VectorState>,
) -> impl IntoResponse {
let (total, retired, failures) = state.playbook_memory.status_counts().await;
Json(serde_json::json!({
"total": total,
"retired": retired,
"active": total.saturating_sub(retired),
"failures": failures,
}))
}
async fn lance_recall_harness( async fn lance_recall_harness(
State(state): State<VectorState>, State(state): State<VectorState>,
Path(index_name): Path<String>, Path(index_name): Path<String>,