Phase 26 — Mem0 upsert + Letta geo hot cache

Closes the two remaining 2026-era memory findings. Both are
optimizations per J's framing — not load-bearing, but good data
hygiene + future-proofing at scale.

MEM0 UPSERT (data hygiene):
Before: /seed always appended. A scenario re-running the same
operation on the same day wrote duplicate entries, inflating the
playbook corpus with near-identical rows.

Now: upsert_entry(new) inspects existing non-retired entries and
decides ADD / UPDATE / NOOP:
  ADD     → no matching (operation, day, city, state) tuple, append
  UPDATE  → match exists with different names → merge (union, stable
            order), refresh timestamp, keep original playbook_id so
            citations stay valid
  NOOP    → match exists with identical names → skip, return id

Day-granularity keying on timestamp YYYY-MM-DD means intraday
re-seeds dedup but tomorrow's same-operation is a fresh ADD. Retired
entries don't block new seeds — they're out of scope anyway.

Seed endpoint returns {outcome: {mode, playbook_id, merged_names?},
entries_after}. Append=false retains old replace-all semantics.

5 unit tests pass: first_seed_is_add, identical_reseed_is_noop,
same_day_different_names_updates_and_merges, different_day_same_op_is_add,
retired_entry_doesnt_block_new_seed.

Live verified: three successive seeds with (Alice), (Alice),
(Alice, Bob) left entry count unchanged at 1936 with merged names
{Alejandro, Lauren, Alice, Bob}. Previously would have been 3
appends.

LETTA GEO HOT CACHE (scale primitive):
Added geo_index: HashMap<(city_lower, state_upper), Vec<usize>>
alongside PlaybookMemoryState. Rebuilt on every mutation: set_entries,
retire_one, retire_on_schema_drift, upsert_entry, load_from_storage.

compute_boost_for_filtered_with_role now uses the index for O(1) geo
lookup instead of scanning all entries. At current scale (1.9K) the
scan was sub-ms; at 100K+ the scan becomes the dominant cost. The
hot cache future-proofs without adding an LRU abstraction.

Retired entries excluded from index; valid_until still checked on the
hot path since it can elapse between rebuilds.

Owns cloned PlaybookEntries in the geo_filtered vector so the state
read-lock is released before cosine scoring — avoids lock contention
on the scoring path.

Memory-findings progress: 5 of 5 shipped.
  ✓ Multi-strategy parallel retrieval (Phase 19 refinement)
  ✓ Input normalization + unified /memory/query (Phase 24 TS)
  ✓ Zep validity windows (Phase 25)
  ✓ Mem0 UPSERT (Phase 26 today)
  ✓ Letta geo hot cache (Phase 26 today)

All 18 playbook_memory tests pass.
This commit is contained in:
root 2026-04-21 00:24:05 -05:00
parent 138592dc56
commit 640db8c63c
2 changed files with 300 additions and 36 deletions

View File

@ -149,12 +149,48 @@ pub struct BoostEntry {
pub citations: Vec<String>, // playbook_ids that endorsed this worker
}
/// Phase 26 — what happened during an upsert. The seed endpoint
/// returns this shape so the caller sees whether its write was a new
/// entry, a merge, or a dedup'd no-op.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "mode", rename_all = "lowercase")]
pub enum UpsertOutcome {
/// New playbook appended. Carries the new playbook_id.
Added(String),
/// Existing same-day entry updated. Playbook_id unchanged; names
/// merged (union, original order preserved, new names appended).
Updated {
playbook_id: String,
merged_names: Vec<String>,
},
/// Identical same-day entry already exists; nothing changed.
/// Returns the stable playbook_id so caller still has a reference.
Noop(String),
}
/// Return YYYY-MM-DD from an RFC3339 timestamp. Falls back to the
/// first 10 chars if parse fails — tolerant for legacy entries that
/// stored a bare date.
fn day_key(ts: &str) -> String {
chrono::DateTime::parse_from_rfc3339(ts)
.map(|t| t.format("%Y-%m-%d").to_string())
.unwrap_or_else(|_| ts.chars().take(10).collect())
}
/// Live handle passed around the service. Clone-cheap (all state is
/// inside one Arc<RwLock>).
#[derive(Clone)]
pub struct PlaybookMemory {
state: Arc<RwLock<PlaybookMemoryState>>,
store: Arc<dyn ObjectStore>,
/// Phase 26 — hot geo index: (city_lower, state_upper) → sorted
/// Vec<entry_idx>. Rebuilt on every mutation of `entries`. At
/// current scale (1.9K entries) a full scan is sub-ms; at 100K+
/// the index skips the scan for geo-filtered queries, which is
/// the dominant code path. Letta-style working memory with a real
/// LRU is overkill here — entries are bounded and fit in RAM;
/// what we need is a precomputed seek, not a bounded cache.
geo_index: Arc<RwLock<HashMap<(String, String), Vec<usize>>>>,
}
impl PlaybookMemory {
@ -162,9 +198,28 @@ impl PlaybookMemory {
Self {
state: Arc::new(RwLock::new(PlaybookMemoryState::default())),
store,
geo_index: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Rebuild the geo index from scratch. Called by every mutation
/// helper after persist succeeds. O(n) scan of entries; at current
/// scale ~40µs. Skips retired entries — they never participate in
/// boost filtering, so indexing them would just waste lookups.
async fn rebuild_geo_index(&self) {
let state = self.state.read().await;
let mut idx: HashMap<(String, String), Vec<usize>> = HashMap::new();
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
let (Some(city), Some(st)) = (&e.city, &e.state) else { continue; };
let key = (city.to_ascii_lowercase(), st.to_ascii_uppercase());
idx.entry(key).or_default().push(i);
}
drop(state);
let mut guard = self.geo_index.write().await;
*guard = idx;
}
/// Best-effort load from primary storage. Missing = empty memory; the
/// first `/rebuild` call will hydrate it.
pub async fn load_from_storage(&self) -> Result<usize, String> {
@ -176,6 +231,7 @@ impl PlaybookMemory {
.map_err(|e| format!("parse playbook_memory state: {e}"))?;
let n = persisted.entries.len();
*self.state.write().await = persisted;
self.rebuild_geo_index().await;
tracing::info!("playbook_memory: loaded {n} entries from {STATE_KEY}");
Ok(n)
}
@ -192,7 +248,9 @@ impl PlaybookMemory {
s.entries = entries;
s.last_rebuilt_at = chrono::Utc::now().timestamp_millis();
drop(s);
self.persist().await
self.persist().await?;
self.rebuild_geo_index().await;
Ok(())
}
/// Phase 25 — retire a specific playbook by id. Idempotent; repeat
@ -210,7 +268,10 @@ impl PlaybookMemory {
}
}
}
if touched { self.persist().await?; }
if touched {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(touched)
}
@ -244,7 +305,10 @@ impl PlaybookMemory {
}
}
}
if count > 0 { self.persist().await?; }
if count > 0 {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(count)
}
@ -257,6 +321,89 @@ impl PlaybookMemory {
(total, retired, failures)
}
/// Phase 26 — Mem0-style upsert. Decides ADD / UPDATE / NOOP based
/// on whether a non-retired entry with the same operation already
/// exists for the same day. Three outcomes:
///
/// ADD → no matching entry, append the new one
/// UPDATE → existing same-day entry found, merge endorsed_names
/// (union, preserving order) and refresh timestamp.
/// Playbook_id is kept stable so citations from prior
/// boost calls stay valid.
/// NOOP → existing same-day entry with identical
/// endorsed_names. Skip — no duplicates accumulate.
///
/// "Same day" keyed on YYYY-MM-DD of the entry's timestamp so
/// intraday re-seeding of the same operation dedups but tomorrow's
/// seeding for the same operation lands as a fresh ADD (which is
/// correct — a new day is a new event).
pub async fn upsert_entry(&self, new_entry: PlaybookEntry) -> Result<UpsertOutcome, String> {
let new_day = day_key(&new_entry.timestamp);
let new_names_sorted = {
let mut v = new_entry.endorsed_names.clone();
v.sort();
v
};
let mut state = self.state.write().await;
// Find a non-retired entry with same operation + day + city +
// state. Operation string alone would false-match across days
// or across cities that happen to share role+count; city+state
// is already parsed out of operation so adding them to the key
// costs nothing.
let mut existing_idx: Option<usize> = None;
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
if e.operation != new_entry.operation { continue; }
if day_key(&e.timestamp) != new_day { continue; }
if e.city != new_entry.city || e.state != new_entry.state { continue; }
existing_idx = Some(i);
break;
}
match existing_idx {
None => {
let pid = new_entry.playbook_id.clone();
state.entries.push(new_entry);
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Added(pid))
}
Some(i) => {
let mut existing_names_sorted = state.entries[i].endorsed_names.clone();
existing_names_sorted.sort();
if existing_names_sorted == new_names_sorted {
// NOOP — identical data, just report the existing id
let pid = state.entries[i].playbook_id.clone();
Ok(UpsertOutcome::Noop(pid))
} else {
// UPDATE — merge names (union, stable order).
let existing = state.entries.get_mut(i).ok_or("index invalidated")?;
let mut merged: Vec<String> = existing.endorsed_names.clone();
for n in &new_entry.endorsed_names {
if !merged.contains(n) { merged.push(n.clone()); }
}
existing.endorsed_names = merged.clone();
existing.timestamp = new_entry.timestamp.clone();
// Keep original playbook_id + embedding + schema fingerprint.
// Refresh embedding only if the caller passed a non-None
// one (indicates the text shape changed).
if new_entry.embedding.is_some() {
existing.embedding = new_entry.embedding.clone();
}
if new_entry.schema_fingerprint.is_some() {
existing.schema_fingerprint = new_entry.schema_fingerprint.clone();
}
let pid = existing.playbook_id.clone();
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Updated { playbook_id: pid, merged_names: merged })
}
}
}
}
pub async fn entry_count(&self) -> usize {
self.state.read().await.entries.len()
}
@ -376,33 +523,48 @@ impl PlaybookMemory {
})
.collect();
// Pre-filter by target_geo (city, state) before cosine. When
// target_geo is set, only playbooks from that city go into the
// ranking pool — prevents globally-popular semantic neighbors
// from drowning out the city's local successful playbooks.
let geo_filtered: Vec<&PlaybookEntry> = active_entries
.iter()
.copied()
.filter(|e| match (target_geo, &e.city, &e.state) {
(None, _, _) => true,
(Some((tc, ts)), Some(ec), Some(es)) => {
ec.eq_ignore_ascii_case(tc) && es.eq_ignore_ascii_case(ts)
}
_ => false,
})
.collect();
// Pre-filter by target_geo (city, state) before cosine. Phase 26
// hot cache — use the geo index (O(1) key lookup) instead of a
// linear scan of all entries. Retired entries are excluded from
// the index; valid_until is still checked here since it can
// elapse between index rebuilds.
//
// Owned entries (not references) because the state read-lock is
// released between here and the cosine step — we don't want to
// hold a read lock across the scoring work.
let geo_filtered: Vec<PlaybookEntry> = if let Some((tc, ts)) = target_geo {
let key = (tc.to_ascii_lowercase(), ts.to_ascii_uppercase());
let index = self.geo_index.read().await;
let Some(idxs) = index.get(&key) else { return HashMap::new(); };
let idxs = idxs.clone();
drop(index);
let state = self.state.read().await;
idxs.into_iter()
.filter_map(|i| state.entries.get(i))
.filter(|e| {
if e.retired_at.is_some() { return false; }
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.cloned()
.collect()
} else {
active_entries.into_iter().cloned().collect()
};
// Multi-strategy: split the geo-filtered pool into (exact role
// match) vs (other). Exact matches skip cosine — they're already
// the strongest signal possible. Operations are shaped
// "fill: Welder x3 in Toledo, OH" so we match role by checking
// whether `fill: {role} ` appears in the operation string,
// case-insensitive.
// the strongest signal possible. References into geo_filtered
// which owns the entries.
let mut exact_matches: Vec<&PlaybookEntry> = Vec::new();
let mut cosine_pool: Vec<(f32, &PlaybookEntry)> = Vec::new();
let role_needle = target_role
.map(|r| format!("fill: {} ", r).to_ascii_lowercase());
for e in geo_filtered {
for e in geo_filtered.iter() {
let is_exact = role_needle.as_ref()
.map(|needle| e.operation.to_ascii_lowercase().contains(needle))
.unwrap_or(false);
@ -414,15 +576,11 @@ impl PlaybookMemory {
}
cosine_pool.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
// Allocate top_k across the two pools — exact matches get first
// priority (up to min(exact_count, top_k/2) slots), then cosine
// fills the rest. This is rerank with hard preference for
// identity matches.
// Allocate top_k across the two pools — exact matches first,
// then cosine fills the rest. Identity beats similarity.
let exact_take = exact_matches.len().min(top_k_playbooks.max(1) / 2 + 1);
let cosine_take = top_k_playbooks.saturating_sub(exact_take);
// Score exact matches with max similarity (1.0) so downstream
// weighting treats them as the strongest possible signal.
let mut scored: Vec<(f32, &PlaybookEntry)> = exact_matches
.into_iter()
.take(exact_take)
@ -1194,3 +1352,95 @@ mod validity_window_tests {
assert_eq!(r, 1);
}
}
#[cfg(test)]
mod upsert_tests {
use super::*;
use object_store::memory::InMemory;
fn mk(op: &str, day: &str, names: &[&str]) -> PlaybookEntry {
PlaybookEntry {
playbook_id: format!("pb-{}-{}", op.replace(' ', "_"), day),
operation: op.into(),
approach: "seed".into(),
context: "test".into(),
timestamp: format!("{day}T12:00:00Z"),
endorsed_names: names.iter().map(|s| s.to_string()).collect(),
city: Some("Nashville".into()),
state: Some("TN".into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
}
}
#[tokio::test]
async fn first_seed_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
match pm.upsert_entry(e).await.unwrap() {
UpsertOutcome::Added(_) => {}
other => panic!("expected Added, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn identical_reseed_is_noop() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
pm.upsert_entry(e1).await.unwrap();
let outcome = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(outcome, UpsertOutcome::Noop(_)));
// Still exactly one entry, no duplicate from the re-seed.
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn same_day_different_names_updates_and_merges() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let o1 = pm.upsert_entry(e1).await.unwrap();
let pid = match o1 {
UpsertOutcome::Added(p) => p,
other => panic!("expected Added, got {:?}", other),
};
let o2 = pm.upsert_entry(e2).await.unwrap();
match o2 {
UpsertOutcome::Updated { playbook_id, merged_names } => {
assert_eq!(playbook_id, pid, "Updated should keep original playbook_id");
assert_eq!(merged_names, vec!["Alice Smith".to_string(), "Bob Jones".to_string()]);
}
other => panic!("expected Updated, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1, "Updated must not create a duplicate");
}
#[tokio::test]
async fn different_day_same_op_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-22", &["Alice Smith"]);
pm.upsert_entry(e1).await.unwrap();
let o2 = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o2, UpsertOutcome::Added(_)), "different day → fresh ADD");
assert_eq!(pm.entry_count().await, 2);
}
#[tokio::test]
async fn retired_entry_doesnt_block_new_seed() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
e1.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1]).await.unwrap();
// A new seed on same day should ADD, not merge into the retired one.
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Carol Davis"]);
let o = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o, UpsertOutcome::Added(_)));
assert_eq!(pm.entry_count().await, 2);
}
}

View File

@ -2289,16 +2289,30 @@ async fn seed_playbook_memory(
retirement_reason: None,
};
let mut current = state.playbook_memory.snapshot().await;
// Phase 26 — when append=true (default), route through upsert so
// same-day re-seeds of the same operation merge instead of
// appending duplicates. When append=false, retain the old
// replace-all semantics for callers that want a hard reset.
if req.append {
current.push(new_entry);
match state.playbook_memory.upsert_entry(new_entry).await {
Ok(outcome) => {
let entries_after = state.playbook_memory.entry_count().await;
Ok(Json(serde_json::json!({
"outcome": outcome,
"entries_after": entries_after,
})))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("upsert: {e}"))),
}
} else {
current = vec![new_entry];
if let Err(e) = state.playbook_memory.set_entries(vec![new_entry]).await {
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("persist: {e}")));
}
Ok(Json(serde_json::json!({
"outcome": { "mode": "replaced", "playbook_id": pid },
"entries_after": state.playbook_memory.entry_count().await,
})))
}
if let Err(e) = state.playbook_memory.set_entries(current).await {
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("persist: {e}")));
}
Ok(Json(serde_json::json!({ "playbook_id": pid, "entries_after": state.playbook_memory.entry_count().await })))
}
async fn rebuild_playbook_memory(