lakehouse/crates/vectord/src/playbook_memory.rs
profit a6f12e2609 Phase 21 Rust port + Phase 27 playbook versioning + doc-sync
Phase 21 — Rust port of scratchpad + tree-split primitives (companion to
the 2026-04-21 TS shipment). New crates/aibridge modules:

  context.rs       — estimate_tokens (chars/4 ceil), context_window_for,
                     assert_context_budget returning a BudgetCheck with
                     numeric diagnostics on both success and overflow.
                     Windows table mirrors config/models.json.
  continuation.rs  — generate_continuable<G: TextGenerator>. Handles the
                     two failure modes: empty-response from thinking
                     models (geometric 2x budget backoff up to budget_cap)
                     and truncated-non-empty (continuation with partial
                     as scratchpad). is_structurally_complete balances
                     braces then JSON.parse-checks. Guards the degen case
                     "all retries empty, don't loop on empty partial".
  tree_split.rs    — generate_tree_split map->reduce with running
                     scratchpad. Per-shard + reduce-prompt go through
                     assert_context_budget first; loud-fails rather than
                     silently truncating. Oldest-digest-first scratchpad
                     truncation at scratchpad_budget (default 6000 t).

TextGenerator trait (native async-fn-in-trait, edition 2024). AiClient
implements it; ScriptedGenerator test double lets tests inject canned
sequences without a live Ollama.

GenerateRequest gained think: Option<bool> — forwards to sidecar for
per-call hidden-reasoning opt-out on hot-path JSON emitters. Three
existing callsites updated (rag.rs x2, service.rs hybrid answer).

Phase 27 — Playbook versioning. PlaybookEntry gained four optional
fields (all #[serde(default)] so pre-Phase-27 state loads as roots):

  version           u32, default 1
  parent_id         Option<String>, previous version's playbook_id
  superseded_at     Option<String>, set when newer version replaces
  superseded_by     Option<String>, the playbook_id that replaced

New methods:

  revise_entry(parent_id, new_entry) — appends new version, stamps
    superseded_at+superseded_by on parent, inherits parent_id and sets
    version = parent + 1 on the new entry. Rejects revising a retired
    or already-superseded parent (tip-of-chain is the only valid
    revise target).
  history(playbook_id) — returns full chain root->tip from any node.
    Walks parent_id back to root, then superseded_by forward to tip.
    Cycle-safe.

Superseded entries excluded from boost (same rule as retired): filter
in compute_boost_for_filtered_with_role (both active-entries prefilter
and geo-filtered path), rebuild_geo_index, and upsert_entry's existing-
idx search. status_counts returns (total, retired, superseded, failures);
/status JSON reports active = total - retired - superseded.

Endpoints:
  POST /vectors/playbook_memory/revise
  GET  /vectors/playbook_memory/history/{id}

Doc-sync — PHASES.md + PRD.md drifted from git after Phases 24-26
shipped. Fixes applied:

  - Phase 24 marked shipped (commit b95dd86) with detail of observer
    HTTP ingest + scenario outcome streaming. PRD "NOT YET WIRED"
    rewritten to reflect shipped state.
  - Phase 25 (validity windows, commit e0a843d) added to PHASES +
    PRD.
  - Phase 26 (Mem0 upsert + Letta hot cache, commit 640db8c) added.
  - Phase 27 entry added to both docs.
  - Phase 19.6 time decay corrected: was documented as "deferred",
    actually wired via BOOST_HALF_LIFE_DAYS = 30.0 in playbook_memory.rs.
  - Phase E/Phase 8 tombstone-at-compaction limit note updated —
    Phase E.2 closed it.

Tests: 8 new version_tests in vectord (chain-metadata stamping,
retired/superseded parent rejection, boost exclusion, history from
root/tip/middle, legacy default round-trip, status counts). 25 new
aibridge tests (context/continuation/tree_split). Workspace total
145 green (was 120).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 17:40:49 -05:00

1786 lines
75 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase 19: Playbook memory — the feedback loop that makes the index
//! learn from real outcomes instead of just logging them.
//!
//! When an agent (multi-agent orchestrator or human operator) seals a
//! successful playbook, it lands in the `successful_playbooks` dataset.
//! Historically that was a write-only log. This module turns it into a
//! re-ranking signal:
//!
//! 1. `rebuild` reads every row of `successful_playbooks`, embeds the
//! operation+approach+context as one vector per playbook, parses
//! out the worker names from the `result` column, and stores both
//! the vectors and the (playbook → names) endorsement map in memory.
//!
//! 2. At query time, `compute_boost_for` takes a new operation text
//! (e.g. "fill: Welder x2 in Toledo, OH"), embeds it, brute-force
//! ranks past playbooks by cosine similarity, and returns a boost
//! map keyed by (city, state, worker_name) → `BoostEntry`. Each
//! entry carries its similarity score and the citing playbook_ids,
//! so explanations ("ranked higher because of 3 similar past fills
//! in Toledo") are free.
//!
//! 3. The `use_playbook_memory` flag on `/vectors/hybrid` adds those
//! boosts to matching search hits and re-sorts.
//!
//! Why brute force instead of another HNSW: `successful_playbooks` grows
//! by operators, not automation. A few thousand rows is the realistic
//! ceiling for years. Brute force at 10K × 768d is <10ms on this hardware
//! — not worth the operational cost of another indexed surface.
//!
//! Persistence: the endorsements map round-trips through
//! `_playbook_memory/state.json` in primary storage so the cache
//! survives restarts without a full rebuild.
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use aibridge::client::{AiClient, EmbedRequest};
use object_store::ObjectStore;
use storaged::ops;
const STATE_KEY: &str = "_playbook_memory/state.json";
/// Maximum boost a single worker can accumulate across all similar past
/// playbooks. Prevents one very popular worker from always winning.
pub const MAX_BOOST_PER_WORKER: f32 = 0.25;
/// Default number of past playbooks to consider when ranking the current
/// operation. Bumped 25 → 100 on 2026-04-20 (second revision) after
/// direct measurement showed cosine similarities cluster in a narrow band
/// (0.55-0.67) across all playbooks regardless of geo — the embedding
/// model doesn't discriminate city/role strongly enough. k=25 missed
/// relevant Toledo Welder playbooks even when they existed; k=100
/// includes them comfortably. Brute-force remains sub-ms at this size.
/// Deeper fix: filter playbooks by (target_city, target_state) in the
/// request before similarity ranking — deferred.
pub const DEFAULT_TOP_K_PLAYBOOKS: usize = 100;
/// Half-life of a playbook's contribution to boost, in days. A playbook
/// 30 days old contributes half what a fresh one would; 60 days old, a
/// quarter; etc. Per Path 1 (deepen statistical) — stale endorsements
/// shouldn't dominate fresh signal. Recruiter trust depends on this.
pub const BOOST_HALF_LIFE_DAYS: f32 = 30.0;
/// Shape of one playbook in memory. The embedding is optional so we can
/// round-trip a cached state without re-embedding; the rebuild path
/// populates it.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlaybookEntry {
pub playbook_id: String,
pub operation: String,
pub approach: String,
pub context: String,
pub timestamp: String,
/// Parsed out of `result` (e.g. "2/2 filled → Matthew Roberts, Amy Davis").
/// Stored as raw names; matching against search results happens on
/// (city, state, name) tuples at boost time.
pub endorsed_names: Vec<String>,
/// City + state parsed out of the operation string. Kept separately
/// so boost matching doesn't re-parse on every query.
pub city: Option<String>,
pub state: Option<String>,
/// Embedding of `operation + approach + context`. Option so persisted
/// state can omit it on first load and have a later embed() fill in.
#[serde(default)]
pub embedding: Option<Vec<f32>>,
/// Schema fingerprint captured at seed time — SHA-256 hex of the
/// target dataset's (column_name, type) tuples. When the dataset's
/// schema changes (column rename, type change, drop), entries
/// seeded against the old schema are considered stale and get
/// skipped by `compute_boost_for_filtered_with_role` unless the
/// caller passes `allow_stale: true`. Optional so historical
/// entries without a fingerprint (ingested before this field
/// existed) degrade to "never stale" rather than getting
/// silently zeroed. Phase 25 (2026-04-21).
#[serde(default)]
pub schema_fingerprint: Option<String>,
/// Optional hard expiry. When set and now() > valid_until, the
/// entry is skipped. Used for playbooks that were known to be
/// time-limited at seed time (seasonal hires, temporary contracts).
#[serde(default)]
pub valid_until: Option<String>,
/// Set by `retire()` — auto-retirement when schema drift detected,
/// or manual via POST /vectors/playbook_memory/retire. Entries with
/// this set are excluded from all boost calculations; they remain
/// in the journal for forensic purposes.
#[serde(default)]
pub retired_at: Option<String>,
/// Human-readable retirement reason. Examples:
/// "schema_drift: workers_500k 2026-05-03 added column X"
/// "expired: valid_until 2026-05-01 elapsed"
/// "manual: operator requested via POST /retire"
#[serde(default)]
pub retirement_reason: Option<String>,
/// Phase 27 — monotonic version counter within a playbook chain.
/// First version is 1; `revise_entry` sets the new entry's version
/// to parent.version + 1. Entries persisted before Phase 27 get
/// version=1 via serde default and are treated as roots.
#[serde(default = "default_version")]
pub version: u32,
/// Phase 27 — playbook_id of the prior version in this chain. None
/// for root entries (first version).
#[serde(default)]
pub parent_id: Option<String>,
/// Phase 27 — timestamp set when a newer version replaced this
/// entry via `revise_entry`. Superseded entries are excluded from
/// boost calculations (same rule as `retired_at`) but remain
/// queryable via `history` for audit.
#[serde(default)]
pub superseded_at: Option<String>,
/// Phase 27 — playbook_id of the entry that replaced this one.
/// Walking `superseded_by` from the root forward reconstructs the
/// full version chain.
#[serde(default)]
pub superseded_by: Option<String>,
}
fn default_version() -> u32 { 1 }
/// A recorded failure — worker who didn't deliver on a contract.
/// Tracked per (city, state, name) so a single worker's failures on
/// Toledo Welder contracts don't penalize the same name in Chicago.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureRecord {
pub city: String,
pub state: String,
pub name: String,
pub reason: String,
pub timestamp: String,
}
/// Persisted / in-memory state.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct PlaybookMemoryState {
entries: Vec<PlaybookEntry>,
/// Unix epoch millis when the last rebuild completed. Caller can
/// use this to gate "stale > N hours → trigger rebuild" behavior.
last_rebuilt_at: i64,
/// Failed-fill records. Path 1 negative signal — every entry here
/// dampens the positive boost for its (city, state, name) key by
/// half per failure count, so three failures zero the boost.
#[serde(default)]
failures: Vec<FailureRecord>,
}
/// Per-worker boost payload. `citations` lets the response layer show
/// "boosted because of these past fills" without a second lookup.
#[derive(Debug, Clone, Serialize)]
pub struct BoostEntry {
pub boost: f32,
pub citations: Vec<String>, // playbook_ids that endorsed this worker
}
/// Phase 26 — what happened during an upsert. The seed endpoint
/// returns this shape so the caller sees whether its write was a new
/// entry, a merge, or a dedup'd no-op.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "mode", rename_all = "lowercase")]
pub enum UpsertOutcome {
/// New playbook appended. Carries the new playbook_id.
Added(String),
/// Existing same-day entry updated. Playbook_id unchanged; names
/// merged (union, original order preserved, new names appended).
Updated {
playbook_id: String,
merged_names: Vec<String>,
},
/// Identical same-day entry already exists; nothing changed.
/// Returns the stable playbook_id so caller still has a reference.
Noop(String),
}
/// Phase 27 — shape returned from `revise_entry`. Reports both ends of
/// the supersession so callers can link citations or audit chains.
#[derive(Debug, Clone, Serialize)]
pub struct ReviseOutcome {
pub parent_id: String,
pub parent_version: u32,
pub new_playbook_id: String,
pub new_version: u32,
pub superseded_at: String,
}
/// Return YYYY-MM-DD from an RFC3339 timestamp. Falls back to the
/// first 10 chars if parse fails — tolerant for legacy entries that
/// stored a bare date.
fn day_key(ts: &str) -> String {
chrono::DateTime::parse_from_rfc3339(ts)
.map(|t| t.format("%Y-%m-%d").to_string())
.unwrap_or_else(|_| ts.chars().take(10).collect())
}
/// Live handle passed around the service. Clone-cheap (all state is
/// inside one Arc<RwLock>).
#[derive(Clone)]
pub struct PlaybookMemory {
state: Arc<RwLock<PlaybookMemoryState>>,
store: Arc<dyn ObjectStore>,
/// Phase 26 — hot geo index: (city_lower, state_upper) → sorted
/// Vec<entry_idx>. Rebuilt on every mutation of `entries`. At
/// current scale (1.9K entries) a full scan is sub-ms; at 100K+
/// the index skips the scan for geo-filtered queries, which is
/// the dominant code path. Letta-style working memory with a real
/// LRU is overkill here — entries are bounded and fit in RAM;
/// what we need is a precomputed seek, not a bounded cache.
geo_index: Arc<RwLock<HashMap<(String, String), Vec<usize>>>>,
}
impl PlaybookMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
state: Arc::new(RwLock::new(PlaybookMemoryState::default())),
store,
geo_index: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Rebuild the geo index from scratch. Called by every mutation
/// helper after persist succeeds. O(n) scan of entries; at current
/// scale ~40µs. Skips retired and superseded entries — they never
/// participate in boost filtering, so indexing them would just
/// waste lookups.
async fn rebuild_geo_index(&self) {
let state = self.state.read().await;
let mut idx: HashMap<(String, String), Vec<usize>> = HashMap::new();
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
if e.superseded_at.is_some() { continue; }
let (Some(city), Some(st)) = (&e.city, &e.state) else { continue; };
let key = (city.to_ascii_lowercase(), st.to_ascii_uppercase());
idx.entry(key).or_default().push(i);
}
drop(state);
let mut guard = self.geo_index.write().await;
*guard = idx;
}
/// Best-effort load from primary storage. Missing = empty memory; the
/// first `/rebuild` call will hydrate it.
pub async fn load_from_storage(&self) -> Result<usize, String> {
let data = match ops::get(&self.store, STATE_KEY).await {
Ok(d) => d,
Err(_) => return Ok(0),
};
let persisted: PlaybookMemoryState = serde_json::from_slice(&data)
.map_err(|e| format!("parse playbook_memory state: {e}"))?;
let n = persisted.entries.len();
*self.state.write().await = persisted;
self.rebuild_geo_index().await;
tracing::info!("playbook_memory: loaded {n} entries from {STATE_KEY}");
Ok(n)
}
async fn persist(&self) -> Result<(), String> {
let snapshot = self.state.read().await.clone();
let bytes = serde_json::to_vec_pretty(&snapshot).map_err(|e| e.to_string())?;
ops::put(&self.store, STATE_KEY, bytes.into()).await
}
/// Replace the full in-memory state atomically and persist.
pub async fn set_entries(&self, entries: Vec<PlaybookEntry>) -> Result<(), String> {
let mut s = self.state.write().await;
s.entries = entries;
s.last_rebuilt_at = chrono::Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(())
}
/// Phase 25 — retire a specific playbook by id. Idempotent; repeat
/// calls don't overwrite the first reason. Persisted.
pub async fn retire_one(&self, playbook_id: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id && e.retired_at.is_none() {
e.retired_at = Some(chrono::Utc::now().to_rfc3339());
e.retirement_reason = Some(reason.to_string());
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(touched)
}
/// Phase 25 — retire every entry matching (city, state) whose
/// schema_fingerprint doesn't match the current one. Entries with
/// no fingerprint (legacy) are skipped — caller can use
/// `retire_by_scope` for blanket retirement.
pub async fn retire_on_schema_drift(
&self,
city: &str,
state_code: &str,
current_fingerprint: &str,
reason: &str,
) -> Result<usize, String> {
let mut count = 0;
{
let mut state = self.state.write().await;
let now = chrono::Utc::now().to_rfc3339();
for e in state.entries.iter_mut() {
if e.retired_at.is_some() { continue; }
let Some(ec) = &e.city else { continue; };
let Some(es) = &e.state else { continue; };
if !ec.eq_ignore_ascii_case(city) || !es.eq_ignore_ascii_case(state_code) { continue; }
match &e.schema_fingerprint {
Some(fp) if fp != current_fingerprint => {
e.retired_at = Some(now.clone());
e.retirement_reason = Some(reason.to_string());
count += 1;
}
_ => {}
}
}
}
if count > 0 {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(count)
}
/// Phase 27 — append a new version of an existing playbook. The
/// parent is stamped with `superseded_at` + `superseded_by`; the
/// new entry inherits `parent_id` and gets `version = parent + 1`.
/// Errors when the parent is retired (terminal state) or already
/// superseded (must revise the tip of the chain, not a middle
/// node). Caller supplies the new entry with its own fresh
/// `playbook_id`; chain-metadata fields on the input are
/// overwritten so callers can't fabricate a mismatched history.
pub async fn revise_entry(
&self,
parent_id: &str,
mut new_entry: PlaybookEntry,
) -> Result<ReviseOutcome, String> {
let now = chrono::Utc::now().to_rfc3339();
let mut state = self.state.write().await;
let Some(i) = state.entries.iter().position(|e| e.playbook_id == parent_id) else {
return Err(format!("parent playbook_id '{parent_id}' not found"));
};
{
let parent = &state.entries[i];
if parent.retired_at.is_some() {
return Err(format!(
"cannot revise retired playbook '{parent_id}' — retirement is terminal"
));
}
if let Some(succ) = &parent.superseded_by {
return Err(format!(
"playbook '{parent_id}' already superseded by '{succ}'; \
revise the latest version in the chain instead"
));
}
}
let parent_version = state.entries[i].version;
let new_version = parent_version.saturating_add(1);
let parent_pid = state.entries[i].playbook_id.clone();
let new_pid = new_entry.playbook_id.clone();
if new_pid.is_empty() {
return Err("new playbook_id must not be empty".into());
}
if new_pid == parent_pid {
return Err("new playbook_id must differ from parent".into());
}
// Enforce chain-metadata integrity — caller doesn't get to
// fabricate these.
new_entry.version = new_version;
new_entry.parent_id = Some(parent_pid.clone());
new_entry.superseded_at = None;
new_entry.superseded_by = None;
let parent_mut = &mut state.entries[i];
parent_mut.superseded_at = Some(now.clone());
parent_mut.superseded_by = Some(new_pid.clone());
state.entries.push(new_entry);
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(ReviseOutcome {
parent_id: parent_pid,
parent_version,
new_playbook_id: new_pid,
new_version,
superseded_at: now,
})
}
/// Phase 27 — return the full version chain that contains this
/// playbook_id, ordered from root (v1) to tip. Walks `parent_id`
/// backward to find the root, then `superseded_by` forward to the
/// tip. Returns empty if the id isn't present. Cycle-safe via a
/// visited set; unreachable in normal operation but the guard is
/// cheap.
pub async fn history(&self, playbook_id: &str) -> Vec<PlaybookEntry> {
let state = self.state.read().await;
let by_id: HashMap<&str, &PlaybookEntry> = state.entries
.iter()
.map(|e| (e.playbook_id.as_str(), e))
.collect();
let Some(seed) = by_id.get(playbook_id).copied() else {
return vec![];
};
// Walk backward to root.
let mut cursor = seed;
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
seen.insert(cursor.playbook_id.clone());
while let Some(pid) = &cursor.parent_id {
let Some(&next) = by_id.get(pid.as_str()) else { break };
if !seen.insert(next.playbook_id.clone()) { break; }
cursor = next;
}
let root = cursor;
// Walk forward to tip.
let mut chain = vec![root.clone()];
let mut cursor = root;
let mut seen_fwd: std::collections::HashSet<String> = std::collections::HashSet::new();
seen_fwd.insert(cursor.playbook_id.clone());
while let Some(nid) = &cursor.superseded_by {
let Some(&next) = by_id.get(nid.as_str()) else { break };
if !seen_fwd.insert(next.playbook_id.clone()) { break; }
cursor = next;
chain.push(cursor.clone());
}
chain
}
/// Stats accessor for the /status endpoint and tests. Returns
/// (total, retired, superseded, failures). Phase 27 added
/// superseded as a distinct counter: a superseded entry is
/// replaced-by-newer-version, which is a different lifecycle event
/// than retired-stop-using.
pub async fn status_counts(&self) -> (usize, usize, usize, usize) {
let state = self.state.read().await;
let total = state.entries.len();
let retired = state.entries.iter().filter(|e| e.retired_at.is_some()).count();
let superseded = state.entries.iter().filter(|e| e.superseded_at.is_some()).count();
let failures = state.failures.len();
(total, retired, superseded, failures)
}
/// Phase 26 — Mem0-style upsert. Decides ADD / UPDATE / NOOP based
/// on whether a non-retired entry with the same operation already
/// exists for the same day. Three outcomes:
///
/// ADD → no matching entry, append the new one
/// UPDATE → existing same-day entry found, merge endorsed_names
/// (union, preserving order) and refresh timestamp.
/// Playbook_id is kept stable so citations from prior
/// boost calls stay valid.
/// NOOP → existing same-day entry with identical
/// endorsed_names. Skip — no duplicates accumulate.
///
/// "Same day" keyed on YYYY-MM-DD of the entry's timestamp so
/// intraday re-seeding of the same operation dedups but tomorrow's
/// seeding for the same operation lands as a fresh ADD (which is
/// correct — a new day is a new event).
pub async fn upsert_entry(&self, new_entry: PlaybookEntry) -> Result<UpsertOutcome, String> {
let new_day = day_key(&new_entry.timestamp);
let new_names_sorted = {
let mut v = new_entry.endorsed_names.clone();
v.sort();
v
};
let mut state = self.state.write().await;
// Find a non-retired entry with same operation + day + city +
// state. Operation string alone would false-match across days
// or across cities that happen to share role+count; city+state
// is already parsed out of operation so adding them to the key
// costs nothing.
let mut existing_idx: Option<usize> = None;
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
if e.superseded_at.is_some() { continue; }
if e.operation != new_entry.operation { continue; }
if day_key(&e.timestamp) != new_day { continue; }
if e.city != new_entry.city || e.state != new_entry.state { continue; }
existing_idx = Some(i);
break;
}
match existing_idx {
None => {
let pid = new_entry.playbook_id.clone();
state.entries.push(new_entry);
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Added(pid))
}
Some(i) => {
let mut existing_names_sorted = state.entries[i].endorsed_names.clone();
existing_names_sorted.sort();
if existing_names_sorted == new_names_sorted {
// NOOP — identical data, just report the existing id
let pid = state.entries[i].playbook_id.clone();
Ok(UpsertOutcome::Noop(pid))
} else {
// UPDATE — merge names (union, stable order).
let existing = state.entries.get_mut(i).ok_or("index invalidated")?;
let mut merged: Vec<String> = existing.endorsed_names.clone();
for n in &new_entry.endorsed_names {
if !merged.contains(n) { merged.push(n.clone()); }
}
existing.endorsed_names = merged.clone();
existing.timestamp = new_entry.timestamp.clone();
// Keep original playbook_id + embedding + schema fingerprint.
// Refresh embedding only if the caller passed a non-None
// one (indicates the text shape changed).
if new_entry.embedding.is_some() {
existing.embedding = new_entry.embedding.clone();
}
if new_entry.schema_fingerprint.is_some() {
existing.schema_fingerprint = new_entry.schema_fingerprint.clone();
}
let pid = existing.playbook_id.clone();
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Updated { playbook_id: pid, merged_names: merged })
}
}
}
}
pub async fn entry_count(&self) -> usize {
self.state.read().await.entries.len()
}
pub async fn snapshot(&self) -> Vec<PlaybookEntry> {
self.state.read().await.entries.clone()
}
/// Record failure(s). Each added `FailureRecord` is an additional
/// penalty against that worker's positive boost for the same geo.
pub async fn mark_failures(&self, new_failures: Vec<FailureRecord>) -> Result<usize, String> {
if new_failures.is_empty() { return Ok(0); }
let added = new_failures.len();
let mut s = self.state.write().await;
s.failures.extend(new_failures);
drop(s);
self.persist().await?;
Ok(added)
}
/// Count failures per (city, state, name) key. Used by compute_boost_for
/// to dampen positive boost.
pub async fn failure_counts(&self) -> HashMap<(String, String, String), usize> {
let s = self.state.read().await;
let mut counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &s.failures {
*counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
counts
}
/// Given an operation's embedding, find the top-K most similar past
/// playbooks (by cosine similarity) and return a per-worker boost map
/// keyed by (city, state, name). Worker is matched by the tuple so a
/// shared name across cities doesn't cross-pollinate.
///
/// Boost formula: each qualifying playbook contributes
/// `similarity * base_weight / n_workers` to each worker it endorsed,
/// where `base_weight` is tuned to keep the cap realistic without
/// forcing every result to saturate. Total per worker is capped at
/// `MAX_BOOST_PER_WORKER`.
pub async fn compute_boost_for(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
) -> HashMap<(String, String, String), BoostEntry> {
self.compute_boost_for_filtered(query_embedding, top_k_playbooks, base_weight, None).await
}
/// Same as `compute_boost_for` but only considers playbooks whose
/// (city, state) matches the caller's target. This is the honest
/// fix for the "boosts=170 matched=0" pathology: globally-ranked
/// semantic neighbors include playbooks from every city the query
/// could never reach via its SQL filter. When the caller knows the
/// target geo, restricting here collapses noise and raises the
/// endorsed-worker hit rate. Pass None for the original behavior.
///
/// 2026-04-21 — added after a corpus-density batch of 25 runs
/// showed only 6/40 successful (role, city) combos ever triggered
/// a citation on subsequent runs. Diagnostic logging proved the
/// boost map had 170 keys but the 50-candidate pool matched 0.
pub async fn compute_boost_for_filtered(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
target_geo: Option<(&str, &str)>,
) -> HashMap<(String, String, String), BoostEntry> {
self.compute_boost_for_filtered_with_role(query_embedding, top_k_playbooks, base_weight, target_geo, None).await
}
/// Variant that also accepts a target role for pre-filtering.
/// Multi-strategy retrieval: exact (role, city, state) matches skip
/// cosine entirely and earn the maximum boost, since identity on
/// those three fields is the strongest possible similarity signal.
/// Remaining entries (within the same city+state but different
/// role, or unknown role) go through the normal cosine path as a
/// fallback. This addresses the 2026 agent-memory finding that
/// multi-strategy parallel retrieval with rerank outperforms
/// single-strategy semantic search.
pub async fn compute_boost_for_filtered_with_role(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
target_geo: Option<(&str, &str)>,
target_role: Option<&str>,
) -> HashMap<(String, String, String), BoostEntry> {
let state = self.state.read().await;
let entries = state.entries.clone();
// Build failure map once before dropping the lock so we don't
// hold the read lock across the full scoring loop.
let mut failure_counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &state.failures {
*failure_counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
drop(state);
// Phase 25 validity-window filtering. Happens before geo+cosine
// so retired/expired entries never reach the ranking pool. We
// don't mutate the state here (can't grab a write lock inside
// a read-heavy hot path); retirement_at auto-retirement is a
// separate background pass. Here we just skip anything already
// retired, and entries whose valid_until has elapsed.
let now = chrono::Utc::now();
let active_entries: Vec<&PlaybookEntry> = entries
.iter()
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.collect();
// Pre-filter by target_geo (city, state) before cosine. Phase 26
// hot cache — use the geo index (O(1) key lookup) instead of a
// linear scan of all entries. Retired entries are excluded from
// the index; valid_until is still checked here since it can
// elapse between index rebuilds.
//
// Owned entries (not references) because the state read-lock is
// released between here and the cosine step — we don't want to
// hold a read lock across the scoring work.
let geo_filtered: Vec<PlaybookEntry> = if let Some((tc, ts)) = target_geo {
let key = (tc.to_ascii_lowercase(), ts.to_ascii_uppercase());
let index = self.geo_index.read().await;
let Some(idxs) = index.get(&key) else { return HashMap::new(); };
let idxs = idxs.clone();
drop(index);
let state = self.state.read().await;
idxs.into_iter()
.filter_map(|i| state.entries.get(i))
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.cloned()
.collect()
} else {
active_entries.into_iter().cloned().collect()
};
// Multi-strategy: split the geo-filtered pool into (exact role
// match) vs (other). Exact matches skip cosine — they're already
// the strongest signal possible. References into geo_filtered
// which owns the entries.
let mut exact_matches: Vec<&PlaybookEntry> = Vec::new();
let mut cosine_pool: Vec<(f32, &PlaybookEntry)> = Vec::new();
let role_needle = target_role
.map(|r| format!("fill: {} ", r).to_ascii_lowercase());
for e in geo_filtered.iter() {
let is_exact = role_needle.as_ref()
.map(|needle| e.operation.to_ascii_lowercase().contains(needle))
.unwrap_or(false);
if is_exact {
exact_matches.push(e);
} else if let Some(v) = &e.embedding {
cosine_pool.push((cosine(query_embedding, v), e));
}
}
cosine_pool.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
// Allocate top_k across the two pools — exact matches first,
// then cosine fills the rest. Identity beats similarity.
let exact_take = exact_matches.len().min(top_k_playbooks.max(1) / 2 + 1);
let cosine_take = top_k_playbooks.saturating_sub(exact_take);
let mut scored: Vec<(f32, &PlaybookEntry)> = exact_matches
.into_iter()
.take(exact_take)
.map(|e| (1.0_f32, e))
.collect();
scored.extend(cosine_pool.into_iter().take(cosine_take));
let now = chrono::Utc::now();
let mut boosts: HashMap<(String, String, String), BoostEntry> = HashMap::new();
for (similarity, pb) in &scored {
// Negative or near-zero similarity = not actually related;
// skip so we don't inject noise when the memory is sparse.
if *similarity <= 0.05 { continue; }
let Some(city) = &pb.city else { continue; };
let Some(state) = &pb.state else { continue; };
let n_workers = pb.endorsed_names.len().max(1);
// Path 1 — temporal decay. Older playbooks weight less. Failure
// to parse the timestamp degrades to "no decay" (treat as fresh)
// rather than dropping the entry entirely; keeps backward
// compatibility with seed payloads that omitted timestamp.
let decay = chrono::DateTime::parse_from_rfc3339(&pb.timestamp)
.ok()
.map(|t| {
let age_days = (now.signed_duration_since(t.with_timezone(&chrono::Utc))
.num_seconds() as f32) / 86400.0;
if age_days <= 0.0 { 1.0 }
else { (-age_days / BOOST_HALF_LIFE_DAYS).exp() }
})
.unwrap_or(1.0);
let per_worker = similarity * base_weight * decay / (n_workers as f32);
for name in &pb.endorsed_names {
let key = (city.clone(), state.clone(), name.clone());
// Path 1 negative signal — each recorded failure halves
// this worker's contribution. Three failures → 0.125x.
// Five failures → ~0.03x (effectively zero). Caps at
// 20 before we clamp to avoid degen cases.
let fail_count = failure_counts.get(&key).copied().unwrap_or(0).min(20);
let penalty = 0.5_f32.powi(fail_count as i32);
let entry = boosts.entry(key).or_insert(BoostEntry {
boost: 0.0,
citations: Vec::new(),
});
entry.boost = (entry.boost + per_worker * penalty).min(MAX_BOOST_PER_WORKER);
if !entry.citations.contains(&pb.playbook_id) {
entry.citations.push(pb.playbook_id.clone());
}
}
}
boosts
}
}
/// Cosine similarity — pulled out so rebuild/boost share one impl.
fn cosine(a: &[f32], b: &[f32]) -> f32 {
let (mut dot, mut na, mut nb) = (0.0_f32, 0.0_f32, 0.0_f32);
let n = a.len().min(b.len());
for i in 0..n {
dot += a[i] * b[i];
na += a[i] * a[i];
nb += b[i] * b[i];
}
if na == 0.0 || nb == 0.0 { return 0.0; }
dot / (na.sqrt() * nb.sqrt())
}
// ---------------- Pattern discovery (Path 2 — meta-index) ----------------
//
// Phase 19's boost path answers "for THIS exact city + role, which workers
// have we used before?" Pattern discovery answers a different question:
// "for queries like this one, what TRAITS have past successful fills had
// in common — even if no exact prior playbook covers this geo?"
//
// The discovered pattern surfaces signals the operator didn't query for:
// e.g. "every successful Welder fill we've seen carried OSHA-10 + lockout
// /tagout — you may want to filter on those." That's the meta-index
// dimension of the original PRD: identify things we didn't know about.
#[derive(Debug, Clone, Serialize)]
pub struct PatternReport {
pub query: String,
pub matched_playbooks: usize,
pub total_workers_examined: usize,
pub common_certifications: Vec<TraitFreq>,
pub common_skills: Vec<TraitFreq>,
pub modal_archetype: Option<String>,
pub reliability_p50: f64,
pub reliability_min: f64,
pub reliability_max: f64,
pub matched_playbook_ids: Vec<String>,
pub discovered_pattern: String,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct TraitFreq {
pub name: String,
pub count: usize,
pub frequency: f32,
}
pub async fn discover_patterns(
memory: &PlaybookMemory,
ai_client: &AiClient,
catalog: &catalogd::registry::Registry,
buckets: &Arc<storaged::registry::BucketRegistry>,
query: &str,
top_k_playbooks: usize,
min_trait_frequency: f32,
) -> Result<PatternReport, String> {
let t0 = std::time::Instant::now();
// 1. Embed the query through the same nomic-embed-text model used
// for playbook embeddings, so cosine is meaningful.
let resp = ai_client
.embed(EmbedRequest { texts: vec![query.into()], model: None })
.await
.map_err(|e| format!("embed query: {e}"))?;
if resp.embeddings.is_empty() {
return Err("embed returned no vectors".into());
}
let qv: Vec<f32> = resp.embeddings[0].iter().map(|x| *x as f32).collect();
// 2. Find top-K most similar past playbooks (cosine over embeddings).
let entries = memory.snapshot().await;
let mut scored: Vec<(f32, &PlaybookEntry)> = entries
.iter()
.filter_map(|e| e.embedding.as_ref().map(|v| (cosine(&qv, v), e)))
.collect();
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(top_k_playbooks);
let matched: Vec<(f32, PlaybookEntry)> = scored
.into_iter()
.filter(|(s, _)| *s > 0.05)
.map(|(s, e)| (s, e.clone()))
.collect();
if matched.is_empty() {
return Ok(PatternReport {
query: query.into(),
matched_playbooks: 0,
total_workers_examined: 0,
common_certifications: vec![],
common_skills: vec![],
modal_archetype: None,
reliability_p50: 0.0, reliability_min: 0.0, reliability_max: 0.0,
matched_playbook_ids: vec![],
discovered_pattern: "No similar past playbooks found.".into(),
duration_secs: t0.elapsed().as_secs_f32(),
});
}
// 3. Pull each endorsed worker's full profile from workers_500k.
// Restrict by (name, city, state) tuple so cross-city homonyms
// don't pollute the aggregate.
let mut conditions: Vec<String> = Vec::new();
let mut matched_ids: Vec<String> = Vec::new();
for (_, pb) in &matched {
matched_ids.push(pb.playbook_id.clone());
let (Some(city), Some(state)) = (pb.city.as_ref(), pb.state.as_ref()) else { continue };
for name in &pb.endorsed_names {
let esc = |s: &str| s.replace('\'', "''");
conditions.push(format!(
"(name = '{}' AND city = '{}' AND state = '{}')",
esc(name), esc(city), esc(state)
));
}
}
if conditions.is_empty() {
return Ok(PatternReport {
query: query.into(),
matched_playbooks: matched.len(),
total_workers_examined: 0,
common_certifications: vec![], common_skills: vec![],
modal_archetype: None, reliability_p50: 0.0,
reliability_min: 0.0, reliability_max: 0.0,
matched_playbook_ids: matched_ids,
discovered_pattern: "Matched playbooks but no endorsed names with city/state to lookup.".into(),
duration_secs: t0.elapsed().as_secs_f32(),
});
}
let sql = format!(
"SELECT name, role, city, state, certifications, skills, archetype, \
CAST(reliability AS DOUBLE) as reliability \
FROM workers_500k WHERE {} LIMIT 500",
conditions.join(" OR ")
);
let engine = queryd::context::QueryEngine::new(
catalog.clone(), buckets.clone(), queryd::cache::MemCache::new(0),
);
let batches = engine.query(&sql).await.map_err(|e| format!("worker lookup: {e}"))?;
// 4. Aggregate. Pipe-separated cert/skill lists, single-string archetype,
// numeric reliability. Frequencies are share-of-workers.
use arrow::array::{Array, AsArray};
let mut cert_counts: HashMap<String, usize> = HashMap::new();
let mut skill_counts: HashMap<String, usize> = HashMap::new();
let mut arch_counts: HashMap<String, usize> = HashMap::new();
let mut reliabilities: Vec<f64> = Vec::new();
let mut total = 0usize;
let get_string = |b: &arrow::record_batch::RecordBatch, col: &str, row: usize| -> String {
let Some(c) = b.column_by_name(col) else { return String::new(); };
if let Some(arr) = c.as_string_view_opt() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
if let Some(arr) = c.as_string_opt::<i32>() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
String::new()
};
let get_f64 = |b: &arrow::record_batch::RecordBatch, col: &str, row: usize| -> f64 {
let Some(c) = b.column_by_name(col) else { return 0.0; };
if let Some(arr) = c.as_primitive_opt::<arrow::datatypes::Float64Type>() {
if arr.is_null(row) { return 0.0; }
return arr.value(row);
}
0.0
};
for b in &batches {
for row in 0..b.num_rows() {
total += 1;
let certs = get_string(b, "certifications", row);
for c in certs.split(['|', ',']).map(|s| s.trim()).filter(|s| !s.is_empty() && *s != "none") {
*cert_counts.entry(c.to_string()).or_insert(0) += 1;
}
let skills = get_string(b, "skills", row);
for s in skills.split(['|', ',']).map(|s| s.trim()).filter(|s| !s.is_empty()) {
*skill_counts.entry(s.to_string()).or_insert(0) += 1;
}
let arch = get_string(b, "archetype", row);
if !arch.is_empty() {
*arch_counts.entry(arch).or_insert(0) += 1;
}
let rel = get_f64(b, "reliability", row);
if rel > 0.0 { reliabilities.push(rel); }
}
}
let total_f = total.max(1) as f32;
let to_freq = |m: HashMap<String, usize>, min: f32| -> Vec<TraitFreq> {
let mut v: Vec<TraitFreq> = m.into_iter()
.map(|(name, count)| TraitFreq { name, count, frequency: count as f32 / total_f })
.filter(|t| t.frequency >= min)
.collect();
v.sort_by(|a, b| b.count.cmp(&a.count));
v.truncate(8);
v
};
let common_certifications = to_freq(cert_counts, min_trait_frequency);
let common_skills = to_freq(skill_counts, min_trait_frequency);
let modal_archetype = arch_counts.into_iter()
.max_by_key(|(_, c)| *c)
.map(|(name, _)| name);
reliabilities.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = if reliabilities.is_empty() { 0.0 } else { reliabilities[reliabilities.len() / 2] };
let rmin = reliabilities.first().copied().unwrap_or(0.0);
let rmax = reliabilities.last().copied().unwrap_or(0.0);
// Build a human-readable discovered-pattern summary
let mut parts: Vec<String> = vec![
format!("Across {} similar past playbooks ({} workers examined)", matched.len(), total),
];
if !common_certifications.is_empty() {
let head: Vec<String> = common_certifications.iter().take(3)
.map(|t| format!("{} ({:.0}%)", t.name, t.frequency * 100.0)).collect();
parts.push(format!("recurring certifications: {}", head.join(", ")));
}
if !common_skills.is_empty() {
let head: Vec<String> = common_skills.iter().take(3)
.map(|t| format!("{} ({:.0}%)", t.name, t.frequency * 100.0)).collect();
parts.push(format!("recurring skills: {}", head.join(", ")));
}
if let Some(a) = &modal_archetype { parts.push(format!("archetype mostly: {a}")); }
if !reliabilities.is_empty() {
parts.push(format!("reliability median {:.2} (range {:.2}{:.2})", p50, rmin, rmax));
}
let discovered_pattern = parts.join(" · ");
Ok(PatternReport {
query: query.into(),
matched_playbooks: matched.len(),
total_workers_examined: total,
common_certifications, common_skills,
modal_archetype, reliability_p50: p50,
reliability_min: rmin, reliability_max: rmax,
matched_playbook_ids: matched_ids,
discovered_pattern,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
// ---------------- Persist memory → SQL (Path 2 foundation) ----------------
#[derive(Debug, Clone, Serialize)]
pub struct PersistReport {
pub rows_persisted: usize,
pub dataset_name: String,
pub fingerprint: String,
pub duration_secs: f32,
}
/// Dump current in-memory state to a queryable Parquet under
/// `successful_playbooks_live`. Registers fresh objects each call — safe
/// because in-memory state is the source of truth here, so REPLACING the
/// objects list reflects the real state, not destroying it.
///
/// Distinct from the existing `successful_playbooks` dataset (which is
/// read by `rebuild()`), so this never collides with operator imports of
/// historical playbook data. Recruiter-facing SQL surfaces should query
/// `successful_playbooks_live` for current operator activity.
pub async fn persist_to_sql(
memory: &PlaybookMemory,
catalog: &catalogd::registry::Registry,
) -> Result<PersistReport, String> {
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
let t0 = std::time::Instant::now();
let entries = memory.snapshot().await;
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Utf8, true),
Field::new("operation", DataType::Utf8, true),
Field::new("approach", DataType::Utf8, true),
Field::new("result", DataType::Utf8, true),
Field::new("context", DataType::Utf8, true),
]));
let timestamps: Vec<&str> = entries.iter().map(|e| e.timestamp.as_str()).collect();
let operations: Vec<&str> = entries.iter().map(|e| e.operation.as_str()).collect();
let approaches: Vec<&str> = entries.iter().map(|e| e.approach.as_str()).collect();
let contexts: Vec<&str> = entries.iter().map(|e| e.context.as_str()).collect();
// Result column is reconstructed from endorsed_names so SQL queries
// against successful_playbooks_live see the same shape as the original
// CSV-fed successful_playbooks ("N/N filled → Name1, Name2").
let results: Vec<String> = entries.iter().map(|e| {
if e.endorsed_names.is_empty() {
String::new()
} else {
let n = e.endorsed_names.len();
format!("{}/{} filled → {}", n, n, e.endorsed_names.join(", "))
}
}).collect();
let result_refs: Vec<&str> = results.iter().map(|s| s.as_str()).collect();
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(timestamps)),
Arc::new(StringArray::from(operations)),
Arc::new(StringArray::from(approaches)),
Arc::new(StringArray::from(result_refs)),
Arc::new(StringArray::from(contexts)),
]).map_err(|e| format!("build record batch: {e}"))?;
let parquet_bytes = shared::arrow_helpers::record_batch_to_parquet(&batch)?;
let fp = shared::arrow_helpers::fingerprint_schema(&schema);
let key = "datasets/successful_playbooks_live.parquet";
ops::put(&memory.store, key, parquet_bytes.clone()).await?;
let obj = shared::types::ObjectRef {
bucket: "primary".into(),
key: key.into(),
size_bytes: parquet_bytes.len() as u64,
created_at: chrono::Utc::now(),
};
let manifest = catalog.register(
"successful_playbooks_live".into(),
fp.clone(),
vec![obj],
).await?;
Ok(PersistReport {
rows_persisted: entries.len(),
dataset_name: manifest.name,
fingerprint: fp.0,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
// ---------------- Rebuild (the core of Phase 19) ----------------
#[derive(Debug, Clone, Serialize)]
pub struct RebuildReport {
pub rows_scanned: usize,
pub entries_built: usize,
pub total_names_endorsed: usize,
pub duration_secs: f32,
}
/// Full rebuild: scan `successful_playbooks`, extract endorsements, embed
/// each row's operation+approach+context, replace the in-memory state.
///
/// Returns the report so callers can show operators what happened.
pub async fn rebuild(
memory: &PlaybookMemory,
ai_client: &AiClient,
catalog: &catalogd::registry::Registry,
buckets: &Arc<storaged::registry::BucketRegistry>,
) -> Result<RebuildReport, String> {
let t0 = std::time::Instant::now();
// 1. Pull every row of successful_playbooks through the query engine.
let sql = "SELECT timestamp, operation, approach, result, context \
FROM successful_playbooks";
let engine = queryd::context::QueryEngine::new(
catalog.clone(),
buckets.clone(),
queryd::cache::MemCache::new(0),
);
let batches = engine
.query(sql)
.await
.map_err(|e| format!("query successful_playbooks: {e}"))?;
let mut rows: Vec<(String, String, String, String, String)> = Vec::new();
for b in &batches {
let n = b.num_rows();
let get = |col: &str, row: usize| -> String {
use arrow::array::{Array, AsArray};
let Some(c) = b.column_by_name(col) else { return String::new(); };
if let Some(arr) = c.as_string_view_opt() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
if let Some(arr) = c.as_string_opt::<i32>() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
String::new()
};
for row in 0..n {
rows.push((
get("timestamp", row),
get("operation", row),
get("approach", row),
get("result", row),
get("context", row),
));
}
}
let rows_scanned = rows.len();
// 2. For each row, build a PlaybookEntry (no embedding yet). Parse
// the operation for (city, state) and the result for names.
let mut entries: Vec<PlaybookEntry> = rows
.into_iter()
.map(|(ts, op, approach, result, ctx)| {
let (city, state) = parse_city_state(&op);
let names = parse_names(&result);
PlaybookEntry {
playbook_id: stable_id(&ts, &op),
operation: op,
approach,
context: ctx,
timestamp: ts,
endorsed_names: names,
city,
state,
embedding: None,
// Rebuild doesn't know fingerprints; historical entries
// get no retirement signal until a seed with a
// fingerprint supersedes them or the operator calls
// /retire manually.
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
}
})
.collect();
// 3. Embed in one batch. Sidecar's embed handles batching internally;
// chunk here to ~64 per request to keep memory flat.
const EMBED_BATCH: usize = 64;
for chunk_start in (0..entries.len()).step_by(EMBED_BATCH) {
let end = (chunk_start + EMBED_BATCH).min(entries.len());
let texts: Vec<String> = entries[chunk_start..end]
.iter()
.map(embed_text)
.collect();
let req = EmbedRequest { texts, model: None };
let resp = ai_client
.embed(req)
.await
.map_err(|e| format!("embed batch [{chunk_start}..{end}]: {e}"))?;
for (i, v) in resp.embeddings.iter().enumerate() {
let f32v: Vec<f32> = v.iter().map(|&x| x as f32).collect();
entries[chunk_start + i].embedding = Some(f32v);
}
}
let total_names_endorsed: usize = entries.iter().map(|e| e.endorsed_names.len()).sum();
let entries_built = entries.len();
memory.set_entries(entries).await?;
Ok(RebuildReport {
rows_scanned,
entries_built,
total_names_endorsed,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
fn embed_text(e: &PlaybookEntry) -> String {
// Compact one-liner per playbook. Excludes timestamp (no semantic
// signal) and includes the fills as words (they're occasionally
// meaningful — "Luis Harris" might semantically correlate with
// Spanish-speaker names in future queries).
format!(
"{} | {} | {} | fills: {}",
e.operation,
e.approach,
e.context,
e.endorsed_names.join(", "),
)
}
/// Derive a stable id from (timestamp, operation). Two playbooks with
/// identical timestamp+operation collapse to one — benign dedup.
fn stable_id(ts: &str, op: &str) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(ts.as_bytes());
h.update(b"|");
h.update(op.as_bytes());
let bytes = h.finalize();
format!("pb-{}", hex_short(&bytes, 12))
}
fn hex_short(b: &[u8], n: usize) -> String {
let mut s = String::with_capacity(n * 2);
for byte in &b[..b.len().min(n)] {
s.push_str(&format!("{byte:02x}"));
}
s
}
/// Parse "fill: Welder x2 in Toledo, OH" → ("Toledo", "OH").
/// Returns None for malformed operations.
fn parse_city_state(op: &str) -> (Option<String>, Option<String>) {
// Split on " in " then parse "City, ST"
let after_in = match op.split(" in ").nth(1) {
Some(s) => s,
None => return (None, None),
};
let parts: Vec<&str> = after_in.splitn(2, ',').collect();
if parts.len() != 2 {
return (None, None);
}
let city = parts[0].trim().to_string();
// state might be followed by more context; take leading alpha chars
let state: String = parts[1].trim()
.chars()
.take_while(|c| c.is_ascii_alphabetic())
.collect();
if city.is_empty() || state.is_empty() {
return (None, None);
}
(Some(city), Some(state))
}
/// Parse "2/2 filled → Matthew Roberts, Amy Davis" → ["Matthew Roberts", "Amy Davis"].
fn parse_names(result: &str) -> Vec<String> {
// Everything after the arrow; split on ", ".
let after_arrow = match result.split('→').nth(1) {
Some(s) => s.trim(),
None => return Vec::new(),
};
// Strip trailing noise like "(and N more)" that some emitters add.
let cleaned = after_arrow.split(" (").next().unwrap_or(after_arrow);
cleaned
.split(',')
.map(|n| n.trim().to_string())
.filter(|n| !n.is_empty())
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_city_state_extracts_both() {
let (c, s) = parse_city_state("fill: Welder x2 in Toledo, OH");
assert_eq!(c.as_deref(), Some("Toledo"));
assert_eq!(s.as_deref(), Some("OH"));
}
#[test]
fn parse_city_state_handles_multiword_city() {
let (c, s) = parse_city_state("fill: Loader x1 in Grand Rapids, MI");
assert_eq!(c.as_deref(), Some("Grand Rapids"));
assert_eq!(s.as_deref(), Some("MI"));
}
#[test]
fn parse_city_state_malformed_returns_none() {
let (c, s) = parse_city_state("fill: something weird");
assert!(c.is_none());
assert!(s.is_none());
}
#[test]
fn parse_names_extracts_after_arrow() {
let ns = parse_names("2/2 filled → Matthew Roberts, Amy Davis");
assert_eq!(ns, vec!["Matthew Roberts".to_string(), "Amy Davis".to_string()]);
}
#[test]
fn parse_names_handles_single_fill() {
let ns = parse_names("1/1 filled → Jose Reed");
assert_eq!(ns, vec!["Jose Reed".to_string()]);
}
#[test]
fn parse_names_handles_no_arrow() {
let ns = parse_names("0/2 filled");
assert!(ns.is_empty());
}
#[test]
fn stable_id_is_deterministic() {
let a = stable_id("2026-04-20T00:00:00Z", "fill: Welder x2 in Toledo, OH");
let b = stable_id("2026-04-20T00:00:00Z", "fill: Welder x2 in Toledo, OH");
assert_eq!(a, b);
assert!(a.starts_with("pb-"));
}
#[test]
fn boost_caps_per_worker() {
// Even with 100 similar playbooks all endorsing the same name, the
// boost never exceeds MAX_BOOST_PER_WORKER.
let pm = PlaybookMemory::new(Arc::new(object_store::memory::InMemory::new()));
let entries: Vec<PlaybookEntry> = (0..100)
.map(|i| PlaybookEntry {
playbook_id: format!("pb-{i}"),
operation: "fill: Welder x1 in Toledo, OH".into(),
approach: "transfer".into(),
context: "".into(),
timestamp: "2026-04-20".into(),
endorsed_names: vec!["Deborah Powell".into()],
city: Some("Toledo".into()),
state: Some("OH".into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
})
.collect();
tokio::runtime::Runtime::new().unwrap().block_on(async {
pm.set_entries(entries).await.unwrap();
let boosts = pm.compute_boost_for(&[1.0, 0.0, 0.0], 100, 0.5).await;
let key = ("Toledo".into(), "OH".into(), "Deborah Powell".into());
let entry = boosts.get(&key).expect("boost entry present");
assert!(entry.boost <= MAX_BOOST_PER_WORKER + 1e-6,
"boost {} exceeded cap {}", entry.boost, MAX_BOOST_PER_WORKER);
});
}
}
#[cfg(test)]
mod validity_window_tests {
use super::*;
use object_store::memory::InMemory;
fn mkentry(id: &str, city: &str, state: &str, fingerprint: Option<String>, valid_until: Option<String>) -> PlaybookEntry {
PlaybookEntry {
playbook_id: id.into(),
operation: format!("fill: Welder x1 in {city}, {state}"),
approach: "hybrid".into(),
context: "test".into(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: vec!["Test Worker".into()],
city: Some(city.into()),
state: Some(state.into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: fingerprint,
valid_until,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
}
}
#[tokio::test]
async fn retire_one_marks_entry_and_persists() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mkentry("pb-1", "Nashville", "TN", None, None)]).await.unwrap();
let touched = pm.retire_one("pb-1", "manual test").await.unwrap();
assert!(touched);
let (total, retired, _, _) = pm.status_counts().await;
assert_eq!(total, 1);
assert_eq!(retired, 1);
// Second retirement is a no-op
let second = pm.retire_one("pb-1", "again").await.unwrap();
assert!(!second);
}
#[tokio::test]
async fn retired_entries_do_not_boost() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mkentry("pb-active", "Nashville", "TN", None, None);
let mut e2 = mkentry("pb-retired", "Nashville", "TN", None, None);
e2.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1, e2]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
// Only the active entry should surface. Both endorse the same
// name so we check citation count isn't doubled — presence of
// the retired playbook id in citations would mean it slipped
// through.
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-retired".to_string()));
assert!(entry.citations.contains(&"pb-active".to_string()));
}
#[tokio::test]
async fn expired_valid_until_is_skipped() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
let future = (chrono::Utc::now() + chrono::Duration::days(1)).to_rfc3339();
let e_expired = mkentry("pb-expired", "Nashville", "TN", None, Some(past));
let e_alive = { let mut e = mkentry("pb-alive", "Nashville", "TN", None, Some(future)); e };
pm.set_entries(vec![e_expired, e_alive]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-expired".to_string()));
assert!(entry.citations.contains(&"pb-alive".to_string()));
}
#[tokio::test]
async fn schema_drift_retires_mismatched_fingerprints_only() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_old = mkentry("pb-old-schema", "Nashville", "TN", Some("fp-v1".into()), None);
let e_new = mkentry("pb-new-schema", "Nashville", "TN", Some("fp-v2".into()), None);
let e_legacy = mkentry("pb-no-fp", "Nashville", "TN", None, None);
pm.set_entries(vec![e_old, e_new, e_legacy]).await.unwrap();
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test migration").await.unwrap();
// Only pb-old-schema should be retired — pb-new-schema matches,
// pb-no-fp has no fingerprint so it's legacy-safe.
assert_eq!(retired, 1);
let (_, total_retired, _, _) = pm.status_counts().await;
assert_eq!(total_retired, 1);
}
#[tokio::test]
async fn schema_drift_skips_other_cities() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_tn = mkentry("pb-tn", "Nashville", "TN", Some("fp-v1".into()), None);
let e_il = mkentry("pb-il", "Chicago", "IL", Some("fp-v1".into()), None);
pm.set_entries(vec![e_tn, e_il]).await.unwrap();
// Nashville migration shouldn't touch Chicago
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test").await.unwrap();
assert_eq!(retired, 1);
let (_, r, _, _) = pm.status_counts().await;
assert_eq!(r, 1);
}
}
#[cfg(test)]
mod upsert_tests {
use super::*;
use object_store::memory::InMemory;
fn mk(op: &str, day: &str, names: &[&str]) -> PlaybookEntry {
PlaybookEntry {
playbook_id: format!("pb-{}-{}", op.replace(' ', "_"), day),
operation: op.into(),
approach: "seed".into(),
context: "test".into(),
timestamp: format!("{day}T12:00:00Z"),
endorsed_names: names.iter().map(|s| s.to_string()).collect(),
city: Some("Nashville".into()),
state: Some("TN".into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
}
}
#[tokio::test]
async fn first_seed_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
match pm.upsert_entry(e).await.unwrap() {
UpsertOutcome::Added(_) => {}
other => panic!("expected Added, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn identical_reseed_is_noop() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
pm.upsert_entry(e1).await.unwrap();
let outcome = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(outcome, UpsertOutcome::Noop(_)));
// Still exactly one entry, no duplicate from the re-seed.
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn same_day_different_names_updates_and_merges() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let o1 = pm.upsert_entry(e1).await.unwrap();
let pid = match o1 {
UpsertOutcome::Added(p) => p,
other => panic!("expected Added, got {:?}", other),
};
let o2 = pm.upsert_entry(e2).await.unwrap();
match o2 {
UpsertOutcome::Updated { playbook_id, merged_names } => {
assert_eq!(playbook_id, pid, "Updated should keep original playbook_id");
assert_eq!(merged_names, vec!["Alice Smith".to_string(), "Bob Jones".to_string()]);
}
other => panic!("expected Updated, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1, "Updated must not create a duplicate");
}
#[tokio::test]
async fn different_day_same_op_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-22", &["Alice Smith"]);
pm.upsert_entry(e1).await.unwrap();
let o2 = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o2, UpsertOutcome::Added(_)), "different day → fresh ADD");
assert_eq!(pm.entry_count().await, 2);
}
#[tokio::test]
async fn retired_entry_doesnt_block_new_seed() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
e1.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1]).await.unwrap();
// A new seed on same day should ADD, not merge into the retired one.
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Carol Davis"]);
let o = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o, UpsertOutcome::Added(_)));
assert_eq!(pm.entry_count().await, 2);
}
}
#[cfg(test)]
mod version_tests {
use super::*;
use object_store::memory::InMemory;
fn mk(id: &str, city: &str, state: &str) -> PlaybookEntry {
PlaybookEntry {
playbook_id: id.into(),
operation: format!("fill: Welder x1 in {city}, {state}"),
approach: "hybrid".into(),
context: "test".into(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: vec!["Alice Smith".into()],
city: Some(city.into()),
state: Some(state.into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
}
}
#[tokio::test]
async fn revise_stamps_chain_metadata_on_both_ends() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
let outcome = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN"))
.await
.expect("revise should succeed against active root");
assert_eq!(outcome.parent_id, "pb-v1");
assert_eq!(outcome.parent_version, 1);
assert_eq!(outcome.new_playbook_id, "pb-v2");
assert_eq!(outcome.new_version, 2);
assert!(!outcome.superseded_at.is_empty());
let snap = pm.snapshot().await;
let v1 = snap.iter().find(|e| e.playbook_id == "pb-v1").unwrap();
let v2 = snap.iter().find(|e| e.playbook_id == "pb-v2").unwrap();
assert_eq!(v1.superseded_by.as_deref(), Some("pb-v2"));
assert!(v1.superseded_at.is_some());
assert_eq!(v2.parent_id.as_deref(), Some("pb-v1"));
assert_eq!(v2.version, 2);
assert!(v2.superseded_at.is_none());
}
#[tokio::test]
async fn revise_rejects_retired_parent() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e1 = mk("pb-v1", "Nashville", "TN");
e1.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1]).await.unwrap();
let err = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await
.expect_err("revise on retired parent must error");
assert!(err.contains("retired"), "error should mention retirement: {err}");
}
#[tokio::test]
async fn revise_rejects_already_superseded_parent() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
// pb-v1 is now superseded; revising it again must fail — caller
// should revise pb-v2 (the tip) instead.
let err = pm.revise_entry("pb-v1", mk("pb-v3-fake", "Nashville", "TN")).await
.expect_err("revise on superseded parent must error");
assert!(err.contains("superseded"), "error should mention supersession: {err}");
}
#[tokio::test]
async fn superseded_entries_excluded_from_boost() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
let mut v2 = mk("pb-v2", "Nashville", "TN");
v2.endorsed_names = vec!["Carol Davis".into()];
pm.revise_entry("pb-v1", v2).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder"),
).await;
// v1's endorsement (Alice Smith) should be absent — it was
// superseded. v2's endorsement (Carol Davis) should be present.
assert!(
!boosts.contains_key(&("Nashville".into(), "TN".into(), "Alice Smith".into())),
"superseded entry's endorsement must not boost"
);
let carol = boosts.get(&("Nashville".into(), "TN".into(), "Carol Davis".into()));
assert!(carol.is_some(), "tip version's endorsement must still boost");
assert!(carol.unwrap().citations.contains(&"pb-v2".to_string()));
}
#[tokio::test]
async fn history_walks_root_to_tip_from_any_node() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
pm.revise_entry("pb-v2", mk("pb-v3", "Nashville", "TN")).await.unwrap();
// Starting from the root — same chain.
let chain_from_root = pm.history("pb-v1").await;
assert_eq!(chain_from_root.len(), 3);
assert_eq!(chain_from_root[0].playbook_id, "pb-v1");
assert_eq!(chain_from_root[1].playbook_id, "pb-v2");
assert_eq!(chain_from_root[2].playbook_id, "pb-v3");
// Starting from the tip — same chain, same order.
let chain_from_tip = pm.history("pb-v3").await;
assert_eq!(chain_from_tip.len(), 3);
assert_eq!(chain_from_tip[0].playbook_id, "pb-v1");
assert_eq!(chain_from_tip[2].playbook_id, "pb-v3");
// Starting from the middle — same chain.
let chain_from_mid = pm.history("pb-v2").await;
assert_eq!(chain_from_mid.len(), 3);
assert_eq!(chain_from_mid[0].playbook_id, "pb-v1");
assert_eq!(chain_from_mid[2].playbook_id, "pb-v3");
}
#[tokio::test]
async fn history_empty_for_unknown_id() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
assert!(pm.history("pb-nonexistent").await.is_empty());
}
#[tokio::test]
async fn status_counts_reports_superseded_separately() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
let (total, retired, superseded, _) = pm.status_counts().await;
assert_eq!(total, 2);
assert_eq!(retired, 0);
assert_eq!(superseded, 1);
}
#[tokio::test]
async fn legacy_entries_without_version_default_to_v1() {
// Simulate state persisted before Phase 27 — no version field.
// Serde default kicks in; entries should be treated as roots.
let json = r#"{
"entries": [{
"playbook_id": "pb-legacy",
"operation": "fill: Welder x1 in Nashville, TN",
"approach": "hybrid",
"context": "",
"timestamp": "2026-04-21T00:00:00Z",
"endorsed_names": ["Alice"],
"city": "Nashville",
"state": "TN"
}],
"last_rebuilt_at": 0,
"failures": []
}"#;
let state: PlaybookMemoryState = serde_json::from_str(json).unwrap();
let legacy = &state.entries[0];
assert_eq!(legacy.version, 1);
assert!(legacy.parent_id.is_none());
assert!(legacy.superseded_at.is_none());
assert!(legacy.superseded_by.is_none());
}
}