diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index c8230aa..ab5d7f4 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -149,12 +149,48 @@ pub struct BoostEntry { pub citations: Vec, // playbook_ids that endorsed this worker } +/// Phase 26 — what happened during an upsert. The seed endpoint +/// returns this shape so the caller sees whether its write was a new +/// entry, a merge, or a dedup'd no-op. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "mode", rename_all = "lowercase")] +pub enum UpsertOutcome { + /// New playbook appended. Carries the new playbook_id. + Added(String), + /// Existing same-day entry updated. Playbook_id unchanged; names + /// merged (union, original order preserved, new names appended). + Updated { + playbook_id: String, + merged_names: Vec, + }, + /// Identical same-day entry already exists; nothing changed. + /// Returns the stable playbook_id so caller still has a reference. + Noop(String), +} + +/// Return YYYY-MM-DD from an RFC3339 timestamp. Falls back to the +/// first 10 chars if parse fails — tolerant for legacy entries that +/// stored a bare date. +fn day_key(ts: &str) -> String { + chrono::DateTime::parse_from_rfc3339(ts) + .map(|t| t.format("%Y-%m-%d").to_string()) + .unwrap_or_else(|_| ts.chars().take(10).collect()) +} + /// Live handle passed around the service. Clone-cheap (all state is /// inside one Arc). #[derive(Clone)] pub struct PlaybookMemory { state: Arc>, store: Arc, + /// Phase 26 — hot geo index: (city_lower, state_upper) → sorted + /// Vec. Rebuilt on every mutation of `entries`. At + /// current scale (1.9K entries) a full scan is sub-ms; at 100K+ + /// the index skips the scan for geo-filtered queries, which is + /// the dominant code path. Letta-style working memory with a real + /// LRU is overkill here — entries are bounded and fit in RAM; + /// what we need is a precomputed seek, not a bounded cache. + geo_index: Arc>>>, } impl PlaybookMemory { @@ -162,9 +198,28 @@ impl PlaybookMemory { Self { state: Arc::new(RwLock::new(PlaybookMemoryState::default())), store, + geo_index: Arc::new(RwLock::new(HashMap::new())), } } + /// Rebuild the geo index from scratch. Called by every mutation + /// helper after persist succeeds. O(n) scan of entries; at current + /// scale ~40µs. Skips retired entries — they never participate in + /// boost filtering, so indexing them would just waste lookups. + async fn rebuild_geo_index(&self) { + let state = self.state.read().await; + let mut idx: HashMap<(String, String), Vec> = HashMap::new(); + for (i, e) in state.entries.iter().enumerate() { + if e.retired_at.is_some() { continue; } + let (Some(city), Some(st)) = (&e.city, &e.state) else { continue; }; + let key = (city.to_ascii_lowercase(), st.to_ascii_uppercase()); + idx.entry(key).or_default().push(i); + } + drop(state); + let mut guard = self.geo_index.write().await; + *guard = idx; + } + /// Best-effort load from primary storage. Missing = empty memory; the /// first `/rebuild` call will hydrate it. pub async fn load_from_storage(&self) -> Result { @@ -176,6 +231,7 @@ impl PlaybookMemory { .map_err(|e| format!("parse playbook_memory state: {e}"))?; let n = persisted.entries.len(); *self.state.write().await = persisted; + self.rebuild_geo_index().await; tracing::info!("playbook_memory: loaded {n} entries from {STATE_KEY}"); Ok(n) } @@ -192,7 +248,9 @@ impl PlaybookMemory { s.entries = entries; s.last_rebuilt_at = chrono::Utc::now().timestamp_millis(); drop(s); - self.persist().await + self.persist().await?; + self.rebuild_geo_index().await; + Ok(()) } /// Phase 25 — retire a specific playbook by id. Idempotent; repeat @@ -210,7 +268,10 @@ impl PlaybookMemory { } } } - if touched { self.persist().await?; } + if touched { + self.persist().await?; + self.rebuild_geo_index().await; + } Ok(touched) } @@ -244,7 +305,10 @@ impl PlaybookMemory { } } } - if count > 0 { self.persist().await?; } + if count > 0 { + self.persist().await?; + self.rebuild_geo_index().await; + } Ok(count) } @@ -257,6 +321,89 @@ impl PlaybookMemory { (total, retired, failures) } + /// Phase 26 — Mem0-style upsert. Decides ADD / UPDATE / NOOP based + /// on whether a non-retired entry with the same operation already + /// exists for the same day. Three outcomes: + /// + /// ADD → no matching entry, append the new one + /// UPDATE → existing same-day entry found, merge endorsed_names + /// (union, preserving order) and refresh timestamp. + /// Playbook_id is kept stable so citations from prior + /// boost calls stay valid. + /// NOOP → existing same-day entry with identical + /// endorsed_names. Skip — no duplicates accumulate. + /// + /// "Same day" keyed on YYYY-MM-DD of the entry's timestamp so + /// intraday re-seeding of the same operation dedups but tomorrow's + /// seeding for the same operation lands as a fresh ADD (which is + /// correct — a new day is a new event). + pub async fn upsert_entry(&self, new_entry: PlaybookEntry) -> Result { + let new_day = day_key(&new_entry.timestamp); + let new_names_sorted = { + let mut v = new_entry.endorsed_names.clone(); + v.sort(); + v + }; + let mut state = self.state.write().await; + // Find a non-retired entry with same operation + day + city + + // state. Operation string alone would false-match across days + // or across cities that happen to share role+count; city+state + // is already parsed out of operation so adding them to the key + // costs nothing. + let mut existing_idx: Option = None; + for (i, e) in state.entries.iter().enumerate() { + if e.retired_at.is_some() { continue; } + if e.operation != new_entry.operation { continue; } + if day_key(&e.timestamp) != new_day { continue; } + if e.city != new_entry.city || e.state != new_entry.state { continue; } + existing_idx = Some(i); + break; + } + + match existing_idx { + None => { + let pid = new_entry.playbook_id.clone(); + state.entries.push(new_entry); + drop(state); + self.persist().await?; + self.rebuild_geo_index().await; + Ok(UpsertOutcome::Added(pid)) + } + Some(i) => { + let mut existing_names_sorted = state.entries[i].endorsed_names.clone(); + existing_names_sorted.sort(); + if existing_names_sorted == new_names_sorted { + // NOOP — identical data, just report the existing id + let pid = state.entries[i].playbook_id.clone(); + Ok(UpsertOutcome::Noop(pid)) + } else { + // UPDATE — merge names (union, stable order). + let existing = state.entries.get_mut(i).ok_or("index invalidated")?; + let mut merged: Vec = existing.endorsed_names.clone(); + for n in &new_entry.endorsed_names { + if !merged.contains(n) { merged.push(n.clone()); } + } + existing.endorsed_names = merged.clone(); + existing.timestamp = new_entry.timestamp.clone(); + // Keep original playbook_id + embedding + schema fingerprint. + // Refresh embedding only if the caller passed a non-None + // one (indicates the text shape changed). + if new_entry.embedding.is_some() { + existing.embedding = new_entry.embedding.clone(); + } + if new_entry.schema_fingerprint.is_some() { + existing.schema_fingerprint = new_entry.schema_fingerprint.clone(); + } + let pid = existing.playbook_id.clone(); + drop(state); + self.persist().await?; + self.rebuild_geo_index().await; + Ok(UpsertOutcome::Updated { playbook_id: pid, merged_names: merged }) + } + } + } + } + pub async fn entry_count(&self) -> usize { self.state.read().await.entries.len() } @@ -376,33 +523,48 @@ impl PlaybookMemory { }) .collect(); - // Pre-filter by target_geo (city, state) before cosine. When - // target_geo is set, only playbooks from that city go into the - // ranking pool — prevents globally-popular semantic neighbors - // from drowning out the city's local successful playbooks. - let geo_filtered: Vec<&PlaybookEntry> = active_entries - .iter() - .copied() - .filter(|e| match (target_geo, &e.city, &e.state) { - (None, _, _) => true, - (Some((tc, ts)), Some(ec), Some(es)) => { - ec.eq_ignore_ascii_case(tc) && es.eq_ignore_ascii_case(ts) - } - _ => false, - }) - .collect(); + // Pre-filter by target_geo (city, state) before cosine. Phase 26 + // hot cache — use the geo index (O(1) key lookup) instead of a + // linear scan of all entries. Retired entries are excluded from + // the index; valid_until is still checked here since it can + // elapse between index rebuilds. + // + // Owned entries (not references) because the state read-lock is + // released between here and the cosine step — we don't want to + // hold a read lock across the scoring work. + let geo_filtered: Vec = if let Some((tc, ts)) = target_geo { + let key = (tc.to_ascii_lowercase(), ts.to_ascii_uppercase()); + let index = self.geo_index.read().await; + let Some(idxs) = index.get(&key) else { return HashMap::new(); }; + let idxs = idxs.clone(); + drop(index); + let state = self.state.read().await; + idxs.into_iter() + .filter_map(|i| state.entries.get(i)) + .filter(|e| { + if e.retired_at.is_some() { return false; } + if let Some(vu) = &e.valid_until { + if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { + if now > parsed.with_timezone(&chrono::Utc) { return false; } + } + } + true + }) + .cloned() + .collect() + } else { + active_entries.into_iter().cloned().collect() + }; // Multi-strategy: split the geo-filtered pool into (exact role // match) vs (other). Exact matches skip cosine — they're already - // the strongest signal possible. Operations are shaped - // "fill: Welder x3 in Toledo, OH" so we match role by checking - // whether `fill: {role} ` appears in the operation string, - // case-insensitive. + // the strongest signal possible. References into geo_filtered + // which owns the entries. let mut exact_matches: Vec<&PlaybookEntry> = Vec::new(); let mut cosine_pool: Vec<(f32, &PlaybookEntry)> = Vec::new(); let role_needle = target_role .map(|r| format!("fill: {} ", r).to_ascii_lowercase()); - for e in geo_filtered { + for e in geo_filtered.iter() { let is_exact = role_needle.as_ref() .map(|needle| e.operation.to_ascii_lowercase().contains(needle)) .unwrap_or(false); @@ -414,15 +576,11 @@ impl PlaybookMemory { } cosine_pool.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); - // Allocate top_k across the two pools — exact matches get first - // priority (up to min(exact_count, top_k/2) slots), then cosine - // fills the rest. This is rerank with hard preference for - // identity matches. + // Allocate top_k across the two pools — exact matches first, + // then cosine fills the rest. Identity beats similarity. let exact_take = exact_matches.len().min(top_k_playbooks.max(1) / 2 + 1); let cosine_take = top_k_playbooks.saturating_sub(exact_take); - // Score exact matches with max similarity (1.0) so downstream - // weighting treats them as the strongest possible signal. let mut scored: Vec<(f32, &PlaybookEntry)> = exact_matches .into_iter() .take(exact_take) @@ -1194,3 +1352,95 @@ mod validity_window_tests { assert_eq!(r, 1); } } + +#[cfg(test)] +mod upsert_tests { + use super::*; + use object_store::memory::InMemory; + + fn mk(op: &str, day: &str, names: &[&str]) -> PlaybookEntry { + PlaybookEntry { + playbook_id: format!("pb-{}-{}", op.replace(' ', "_"), day), + operation: op.into(), + approach: "seed".into(), + context: "test".into(), + timestamp: format!("{day}T12:00:00Z"), + endorsed_names: names.iter().map(|s| s.to_string()).collect(), + city: Some("Nashville".into()), + state: Some("TN".into()), + embedding: Some(vec![1.0, 0.0, 0.0]), + schema_fingerprint: None, + valid_until: None, + retired_at: None, + retirement_reason: None, + } + } + + #[tokio::test] + async fn first_seed_is_add() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]); + match pm.upsert_entry(e).await.unwrap() { + UpsertOutcome::Added(_) => {} + other => panic!("expected Added, got {:?}", other), + } + assert_eq!(pm.entry_count().await, 1); + } + + #[tokio::test] + async fn identical_reseed_is_noop() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]); + let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]); + pm.upsert_entry(e1).await.unwrap(); + let outcome = pm.upsert_entry(e2).await.unwrap(); + assert!(matches!(outcome, UpsertOutcome::Noop(_))); + // Still exactly one entry, no duplicate from the re-seed. + assert_eq!(pm.entry_count().await, 1); + } + + #[tokio::test] + async fn same_day_different_names_updates_and_merges() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]); + let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]); + let o1 = pm.upsert_entry(e1).await.unwrap(); + let pid = match o1 { + UpsertOutcome::Added(p) => p, + other => panic!("expected Added, got {:?}", other), + }; + let o2 = pm.upsert_entry(e2).await.unwrap(); + match o2 { + UpsertOutcome::Updated { playbook_id, merged_names } => { + assert_eq!(playbook_id, pid, "Updated should keep original playbook_id"); + assert_eq!(merged_names, vec!["Alice Smith".to_string(), "Bob Jones".to_string()]); + } + other => panic!("expected Updated, got {:?}", other), + } + assert_eq!(pm.entry_count().await, 1, "Updated must not create a duplicate"); + } + + #[tokio::test] + async fn different_day_same_op_is_add() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]); + let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-22", &["Alice Smith"]); + pm.upsert_entry(e1).await.unwrap(); + let o2 = pm.upsert_entry(e2).await.unwrap(); + assert!(matches!(o2, UpsertOutcome::Added(_)), "different day → fresh ADD"); + assert_eq!(pm.entry_count().await, 2); + } + + #[tokio::test] + async fn retired_entry_doesnt_block_new_seed() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]); + e1.retired_at = Some(chrono::Utc::now().to_rfc3339()); + pm.set_entries(vec![e1]).await.unwrap(); + // A new seed on same day should ADD, not merge into the retired one. + let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Carol Davis"]); + let o = pm.upsert_entry(e2).await.unwrap(); + assert!(matches!(o, UpsertOutcome::Added(_))); + assert_eq!(pm.entry_count().await, 2); + } +} diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 8259a55..6f72488 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -2289,16 +2289,30 @@ async fn seed_playbook_memory( retirement_reason: None, }; - let mut current = state.playbook_memory.snapshot().await; + // Phase 26 — when append=true (default), route through upsert so + // same-day re-seeds of the same operation merge instead of + // appending duplicates. When append=false, retain the old + // replace-all semantics for callers that want a hard reset. if req.append { - current.push(new_entry); + match state.playbook_memory.upsert_entry(new_entry).await { + Ok(outcome) => { + let entries_after = state.playbook_memory.entry_count().await; + Ok(Json(serde_json::json!({ + "outcome": outcome, + "entries_after": entries_after, + }))) + } + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("upsert: {e}"))), + } } else { - current = vec![new_entry]; + if let Err(e) = state.playbook_memory.set_entries(vec![new_entry]).await { + return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("persist: {e}"))); + } + Ok(Json(serde_json::json!({ + "outcome": { "mode": "replaced", "playbook_id": pid }, + "entries_after": state.playbook_memory.entry_count().await, + }))) } - if let Err(e) = state.playbook_memory.set_entries(current).await { - return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("persist: {e}"))); - } - Ok(Json(serde_json::json!({ "playbook_id": pid, "entries_after": state.playbook_memory.entry_count().await }))) } async fn rebuild_playbook_memory(