lakehouse/crates/vectord/src/playbook_memory.rs
root cdc24d8bd0
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
shared: build ModelMatrix — migrate 5 call sites off deprecated estimate_tokens
The `aibridge::context::estimate_tokens` deprecation has been pointing
at `shared::model_matrix::ModelMatrix::estimate_tokens` for a while,
but that module didn't exist — so the deprecation was aspirational
noise, not actionable guidance.

Built the minimal target: `shared::model_matrix::ModelMatrix` with
an associated `estimate_tokens(text: &str) -> usize` method. Same
chars/4 ceiling heuristic as the deprecated helper. 6 tests cover
empty/3/4/5-char cases, multi-byte UTF-8 (emoji count as 1 char each),
and linear scaling to 400-char inputs.

Migrated 5 call sites:
  - aibridge/context.rs:88 — opts.system token count
  - aibridge/context.rs:89 — prompt token count
  - aibridge/tree_split.rs:22 — import (now uses ModelMatrix)
  - aibridge/tree_split.rs:84, 89 — truncate_scratchpad budget loop
  - aibridge/tree_split.rs:282 — scratchpad post-truncation assertion
  - aibridge/context.rs:183 — system-prompt budget test

Also cleaned up two parallel test warnings:
  - aibridge/context.rs legacy estimate_tokens_ceiling_divides_by_four
    test deleted (ModelMatrix's tests cover the same behavior now).
  - vectord/playbook_memory.rs:1650 unused_mut on e_alive.

Net workspace warning count: 11 → 0 (including --tests build).

The deprecated `estimate_tokens` wrapper stays in aibridge/context.rs
for external callers. Future commits can remove it entirely once no
public API surface still references it.

The applier's warning-count gate now has a floor of 0 — any future
patch that introduces a single warning trips the gate automatically.
Previously a floor of 11 tolerated noise.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 06:32:16 -05:00

2130 lines
90 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase 19: Playbook memory — the feedback loop that makes the index
//! learn from real outcomes instead of just logging them.
//!
//! When an agent (multi-agent orchestrator or human operator) seals a
//! successful playbook, it lands in the `successful_playbooks` dataset.
//! Historically that was a write-only log. This module turns it into a
//! re-ranking signal:
//!
//! 1. `rebuild` reads every row of `successful_playbooks`, embeds the
//! operation+approach+context as one vector per playbook, parses
//! out the worker names from the `result` column, and stores both
//! the vectors and the (playbook → names) endorsement map in memory.
//!
//! 2. At query time, `compute_boost_for` takes a new operation text
//! (e.g. "fill: Welder x2 in Toledo, OH"), embeds it, brute-force
//! ranks past playbooks by cosine similarity, and returns a boost
//! map keyed by (city, state, worker_name) → `BoostEntry`. Each
//! entry carries its similarity score and the citing playbook_ids,
//! so explanations ("ranked higher because of 3 similar past fills
//! in Toledo") are free.
//!
//! 3. The `use_playbook_memory` flag on `/vectors/hybrid` adds those
//! boosts to matching search hits and re-sorts.
//!
//! Why brute force instead of another HNSW: `successful_playbooks` grows
//! by operators, not automation. A few thousand rows is the realistic
//! ceiling for years. Brute force at 10K × 768d is <10ms on this hardware
//! — not worth the operational cost of another indexed surface.
//!
//! Persistence: the endorsements map round-trips through
//! `_playbook_memory/state.json` in primary storage so the cache
//! survives restarts without a full rebuild.
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use aibridge::client::{AiClient, EmbedRequest};
use object_store::ObjectStore;
use storaged::ops;
const STATE_KEY: &str = "_playbook_memory/state.json";
/// Maximum boost a single worker can accumulate across all similar past
/// playbooks. Prevents one very popular worker from always winning.
pub const MAX_BOOST_PER_WORKER: f32 = 0.25;
/// Default number of past playbooks to consider when ranking the current
/// operation. Bumped 25 → 100 on 2026-04-20 (second revision) after
/// direct measurement showed cosine similarities cluster in a narrow band
/// (0.55-0.67) across all playbooks regardless of geo — the embedding
/// model doesn't discriminate city/role strongly enough. k=25 missed
/// relevant Toledo Welder playbooks even when they existed; k=100
/// includes them comfortably. Brute-force remains sub-ms at this size.
/// Deeper fix: filter playbooks by (target_city, target_state) in the
/// request before similarity ranking — deferred.
pub const DEFAULT_TOP_K_PLAYBOOKS: usize = 100;
/// Half-life of a playbook's contribution to boost, in days. A playbook
/// 30 days old contributes half what a fresh one would; 60 days old, a
/// quarter; etc. Per Path 1 (deepen statistical) — stale endorsements
/// shouldn't dominate fresh signal. Recruiter trust depends on this.
pub const BOOST_HALF_LIFE_DAYS: f32 = 30.0;
/// Shape of one playbook in memory. The embedding is optional so we can
/// round-trip a cached state without re-embedding; the rebuild path
/// populates it.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlaybookEntry {
pub playbook_id: String,
pub operation: String,
pub approach: String,
pub context: String,
pub timestamp: String,
/// Parsed out of `result` (e.g. "2/2 filled → Matthew Roberts, Amy Davis").
/// Stored as raw names; matching against search results happens on
/// (city, state, name) tuples at boost time.
pub endorsed_names: Vec<String>,
/// City + state parsed out of the operation string. Kept separately
/// so boost matching doesn't re-parse on every query.
pub city: Option<String>,
pub state: Option<String>,
/// Embedding of `operation + approach + context`. Option so persisted
/// state can omit it on first load and have a later embed() fill in.
#[serde(default)]
pub embedding: Option<Vec<f32>>,
/// Schema fingerprint captured at seed time — SHA-256 hex of the
/// target dataset's (column_name, type) tuples. When the dataset's
/// schema changes (column rename, type change, drop), entries
/// seeded against the old schema are considered stale and get
/// skipped by `compute_boost_for_filtered_with_role` unless the
/// caller passes `allow_stale: true`. Optional so historical
/// entries without a fingerprint (ingested before this field
/// existed) degrade to "never stale" rather than getting
/// silently zeroed. Phase 25 (2026-04-21).
#[serde(default)]
pub schema_fingerprint: Option<String>,
/// Optional hard expiry. When set and now() > valid_until, the
/// entry is skipped. Used for playbooks that were known to be
/// time-limited at seed time (seasonal hires, temporary contracts).
#[serde(default)]
pub valid_until: Option<String>,
/// Set by `retire()` — auto-retirement when schema drift detected,
/// or manual via POST /vectors/playbook_memory/retire. Entries with
/// this set are excluded from all boost calculations; they remain
/// in the journal for forensic purposes.
#[serde(default)]
pub retired_at: Option<String>,
/// Human-readable retirement reason. Examples:
/// "schema_drift: workers_500k 2026-05-03 added column X"
/// "expired: valid_until 2026-05-01 elapsed"
/// "manual: operator requested via POST /retire"
#[serde(default)]
pub retirement_reason: Option<String>,
/// Phase 27 — monotonic version counter within a playbook chain.
/// First version is 1; `revise_entry` sets the new entry's version
/// to parent.version + 1. Entries persisted before Phase 27 get
/// version=1 via serde default and are treated as roots.
#[serde(default = "default_version")]
pub version: u32,
/// Phase 27 — playbook_id of the prior version in this chain. None
/// for root entries (first version).
#[serde(default)]
pub parent_id: Option<String>,
/// Phase 27 — timestamp set when a newer version replaced this
/// entry via `revise_entry`. Superseded entries are excluded from
/// boost calculations (same rule as `retired_at`) but remain
/// queryable via `history` for audit.
#[serde(default)]
pub superseded_at: Option<String>,
/// Phase 27 — playbook_id of the entry that replaced this one.
/// Walking `superseded_by` from the root forward reconstructs the
/// full version chain.
#[serde(default)]
pub superseded_by: Option<String>,
/// Phase 45 — external documentation references captured at seal
/// time. One entry per tool/library the procedure consulted.
/// Drives drift detection: when context7 reports a newer version
/// for any entry here than what's in `version_seen`, the playbook
/// is `doc_drift_flagged_at` and excluded from boost until human
/// review clears it. Legacy entries (pre-Phase-45) load with an
/// empty vec — they simply never drift-flag, same as entries
/// without a `schema_fingerprint` in Phase 25.
#[serde(default)]
pub doc_refs: Vec<DocRef>,
/// Phase 45 — set by `flag_doc_drift()` when one or more
/// `doc_refs` entries have a newer version available than
/// `version_seen`. Flagged entries are excluded from boost until
/// `doc_drift_reviewed_at` is set via the /resolve endpoint.
#[serde(default)]
pub doc_drift_flagged_at: Option<String>,
/// Phase 45 — set by human operator via
/// `/vectors/playbook_memory/doc_drift/resolve/{id}` after
/// reviewing the drift diagnosis. Either re-admits the entry to
/// boost (if still applicable) or pairs with `retired_at` /
/// `superseded_by` if the procedure changed.
#[serde(default)]
pub doc_drift_reviewed_at: Option<String>,
}
fn default_version() -> u32 { 1 }
/// Phase 45 — one external doc reference. Recorded at seal time so
/// drift detection knows what version was consulted. `snippet_hash`
/// lets us detect "same version, different passage" when a library
/// patches docs without bumping the version number.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DocRef {
/// Canonical tool/library name as context7 knows it, e.g.
/// "docker", "terraform", "react", "next.js". Case-insensitive
/// on compare.
pub tool: String,
/// Version string exactly as seen at seal time. Context7 typically
/// returns semver-like; we store raw string to avoid parsing
/// ambiguity ("latest", "next", "canary" are all valid).
pub version_seen: String,
/// Optional hash of the specific doc passage the procedure
/// referenced. Useful when version hasn't bumped but content
/// rewrote.
#[serde(default)]
pub snippet_hash: Option<String>,
/// Optional direct URL back to the doc (context7 can resolve
/// tool+version → URL, so this is cache not source-of-truth).
#[serde(default)]
pub source_url: Option<String>,
/// When this reference was captured. RFC3339.
pub seen_at: String,
}
impl Default for PlaybookEntry {
fn default() -> Self {
Self {
playbook_id: String::new(),
operation: String::new(),
approach: String::new(),
context: String::new(),
timestamp: String::new(),
endorsed_names: Vec::new(),
city: None,
state: None,
embedding: None,
schema_fingerprint: None,
valid_until: None,
retired_at: None,
retirement_reason: None,
version: 1,
parent_id: None,
superseded_at: None,
superseded_by: None,
doc_refs: Vec::new(),
doc_drift_flagged_at: None,
doc_drift_reviewed_at: None,
}
}
}
/// A recorded failure — worker who didn't deliver on a contract.
/// Tracked per (city, state, name) so a single worker's failures on
/// Toledo Welder contracts don't penalize the same name in Chicago.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureRecord {
pub city: String,
pub state: String,
pub name: String,
pub reason: String,
pub timestamp: String,
}
/// Persisted / in-memory state.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct PlaybookMemoryState {
entries: Vec<PlaybookEntry>,
/// Unix epoch millis when the last rebuild completed. Caller can
/// use this to gate "stale > N hours → trigger rebuild" behavior.
last_rebuilt_at: i64,
/// Failed-fill records. Path 1 negative signal — every entry here
/// dampens the positive boost for its (city, state, name) key by
/// half per failure count, so three failures zero the boost.
#[serde(default)]
failures: Vec<FailureRecord>,
}
/// Per-worker boost payload. `citations` lets the response layer show
/// "boosted because of these past fills" without a second lookup.
#[derive(Debug, Clone, Serialize)]
pub struct BoostEntry {
pub boost: f32,
pub citations: Vec<String>, // playbook_ids that endorsed this worker
}
/// Phase 26 — what happened during an upsert. The seed endpoint
/// returns this shape so the caller sees whether its write was a new
/// entry, a merge, or a dedup'd no-op.
///
/// SHAPE NOTE: `#[serde(tag = "mode")]` requires struct-like variants —
/// bare `Added(String)` and `Noop(String)` newtype variants would
/// panic serialization at runtime. That bug silently 500-ed every
/// /seed call from Phase 26 (commit 640db8c) until 2026-04-22 when the
/// auditor's hybrid fixture surfaced it. All three variants are now
/// struct-like, producing uniform JSON:
/// {"mode":"added","playbook_id":"pb-..."}
/// {"mode":"updated","playbook_id":"pb-...","merged_names":[...]}
/// {"mode":"noop","playbook_id":"pb-..."}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "mode", rename_all = "lowercase")]
pub enum UpsertOutcome {
/// New playbook appended. Carries the new playbook_id.
Added { playbook_id: String },
/// Existing same-day entry updated. Playbook_id unchanged; names
/// merged (union, original order preserved, new names appended).
Updated {
playbook_id: String,
merged_names: Vec<String>,
},
/// Identical same-day entry already exists; nothing changed.
/// Returns the stable playbook_id so caller still has a reference.
Noop { playbook_id: String },
}
/// Phase 27 — shape returned from `revise_entry`. Reports both ends of
/// the supersession so callers can link citations or audit chains.
#[derive(Debug, Clone, Serialize)]
pub struct ReviseOutcome {
pub parent_id: String,
pub parent_version: u32,
pub new_playbook_id: String,
pub new_version: u32,
pub superseded_at: String,
}
/// Return YYYY-MM-DD from an RFC3339 timestamp. Falls back to the
/// first 10 chars if parse fails — tolerant for legacy entries that
/// stored a bare date.
fn day_key(ts: &str) -> String {
chrono::DateTime::parse_from_rfc3339(ts)
.map(|t| t.format("%Y-%m-%d").to_string())
.unwrap_or_else(|_| ts.chars().take(10).collect())
}
/// Live handle passed around the service. Clone-cheap (all state is
/// inside one Arc<RwLock>).
#[derive(Clone)]
pub struct PlaybookMemory {
state: Arc<RwLock<PlaybookMemoryState>>,
store: Arc<dyn ObjectStore>,
/// Phase 26 — hot geo index: (city_lower, state_upper) → sorted
/// Vec<entry_idx>. Rebuilt on every mutation of `entries`. At
/// current scale (1.9K entries) a full scan is sub-ms; at 100K+
/// the index skips the scan for geo-filtered queries, which is
/// the dominant code path. Letta-style working memory with a real
/// LRU is overkill here — entries are bounded and fit in RAM;
/// what we need is a precomputed seek, not a bounded cache.
geo_index: Arc<RwLock<HashMap<(String, String), Vec<usize>>>>,
}
impl PlaybookMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
state: Arc::new(RwLock::new(PlaybookMemoryState::default())),
store,
geo_index: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Rebuild the geo index from scratch. Called by every mutation
/// helper after persist succeeds. O(n) scan of entries; at current
/// scale ~40µs. Skips retired and superseded entries — they never
/// participate in boost filtering, so indexing them would just
/// waste lookups.
async fn rebuild_geo_index(&self) {
let state = self.state.read().await;
let mut idx: HashMap<(String, String), Vec<usize>> = HashMap::new();
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
if e.superseded_at.is_some() { continue; }
let (Some(city), Some(st)) = (&e.city, &e.state) else { continue; };
let key = (city.to_ascii_lowercase(), st.to_ascii_uppercase());
idx.entry(key).or_default().push(i);
}
drop(state);
let mut guard = self.geo_index.write().await;
*guard = idx;
}
/// Best-effort load from primary storage. Missing = empty memory; the
/// first `/rebuild` call will hydrate it.
pub async fn load_from_storage(&self) -> Result<usize, String> {
let data = match ops::get(&self.store, STATE_KEY).await {
Ok(d) => d,
Err(_) => return Ok(0),
};
let persisted: PlaybookMemoryState = serde_json::from_slice(&data)
.map_err(|e| format!("parse playbook_memory state: {e}"))?;
let n = persisted.entries.len();
*self.state.write().await = persisted;
self.rebuild_geo_index().await;
tracing::info!("playbook_memory: loaded {n} entries from {STATE_KEY}");
Ok(n)
}
async fn persist(&self) -> Result<(), String> {
let snapshot = self.state.read().await.clone();
let bytes = serde_json::to_vec_pretty(&snapshot).map_err(|e| e.to_string())?;
ops::put(&self.store, STATE_KEY, bytes.into()).await
}
/// Replace the full in-memory state atomically and persist.
pub async fn set_entries(&self, entries: Vec<PlaybookEntry>) -> Result<(), String> {
let mut s = self.state.write().await;
s.entries = entries;
s.last_rebuilt_at = chrono::Utc::now().timestamp_millis();
drop(s);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(())
}
/// Phase 25 — retire a specific playbook by id. Idempotent; repeat
/// calls don't overwrite the first reason. Persisted.
pub async fn retire_one(&self, playbook_id: &str, reason: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id && e.retired_at.is_none() {
e.retired_at = Some(chrono::Utc::now().to_rfc3339());
e.retirement_reason = Some(reason.to_string());
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(touched)
}
/// Phase 45 slice 3 — stamp `doc_drift_flagged_at` on a playbook.
/// Idempotent: if already flagged and not yet reviewed, this is a
/// no-op. Callers should check the return value: Ok(true) means we
/// made a new flag, Ok(false) means already flagged (unreviewed) or
/// playbook not found, Err means persist failed.
pub async fn flag_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id {
// Skip if already flagged + unreviewed (avoid
// churning the timestamp on every re-check).
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
break;
}
e.doc_drift_flagged_at = Some(chrono::Utc::now().to_rfc3339());
e.doc_drift_reviewed_at = None; // fresh flag clears any stale review
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
}
Ok(touched)
}
/// Phase 45 slice 3 — stamp `doc_drift_reviewed_at` on a playbook,
/// re-admitting it to boost calculation. Idempotent.
pub async fn resolve_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id && e.doc_drift_flagged_at.is_some()
&& e.doc_drift_reviewed_at.is_none()
{
e.doc_drift_reviewed_at = Some(chrono::Utc::now().to_rfc3339());
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
}
Ok(touched)
}
/// Read-only: get a playbook entry by id. Used by the drift-check
/// handler to read doc_refs without exposing the full state lock.
pub async fn get_entry(&self, playbook_id: &str) -> Option<PlaybookEntry> {
let state = self.state.read().await;
state.entries.iter().find(|e| e.playbook_id == playbook_id).cloned()
}
/// Phase 25 — retire every entry matching (city, state) whose
/// schema_fingerprint doesn't match the current one. Entries with
/// no fingerprint (legacy) are skipped — caller can use
/// `retire_by_scope` for blanket retirement.
pub async fn retire_on_schema_drift(
&self,
city: &str,
state_code: &str,
current_fingerprint: &str,
reason: &str,
) -> Result<usize, String> {
let mut count = 0;
{
let mut state = self.state.write().await;
let now = chrono::Utc::now().to_rfc3339();
for e in state.entries.iter_mut() {
if e.retired_at.is_some() { continue; }
let Some(ec) = &e.city else { continue; };
let Some(es) = &e.state else { continue; };
if !ec.eq_ignore_ascii_case(city) || !es.eq_ignore_ascii_case(state_code) { continue; }
match &e.schema_fingerprint {
Some(fp) if fp != current_fingerprint => {
e.retired_at = Some(now.clone());
e.retirement_reason = Some(reason.to_string());
count += 1;
}
_ => {}
}
}
}
if count > 0 {
self.persist().await?;
self.rebuild_geo_index().await;
}
Ok(count)
}
/// Phase 27 — append a new version of an existing playbook. The
/// parent is stamped with `superseded_at` + `superseded_by`; the
/// new entry inherits `parent_id` and gets `version = parent + 1`.
/// Errors when the parent is retired (terminal state) or already
/// superseded (must revise the tip of the chain, not a middle
/// node). Caller supplies the new entry with its own fresh
/// `playbook_id`; chain-metadata fields on the input are
/// overwritten so callers can't fabricate a mismatched history.
pub async fn revise_entry(
&self,
parent_id: &str,
mut new_entry: PlaybookEntry,
) -> Result<ReviseOutcome, String> {
let now = chrono::Utc::now().to_rfc3339();
let mut state = self.state.write().await;
let Some(i) = state.entries.iter().position(|e| e.playbook_id == parent_id) else {
return Err(format!("parent playbook_id '{parent_id}' not found"));
};
{
let parent = &state.entries[i];
if parent.retired_at.is_some() {
return Err(format!(
"cannot revise retired playbook '{parent_id}' — retirement is terminal"
));
}
if let Some(succ) = &parent.superseded_by {
return Err(format!(
"playbook '{parent_id}' already superseded by '{succ}'; \
revise the latest version in the chain instead"
));
}
}
let parent_version = state.entries[i].version;
let new_version = parent_version.saturating_add(1);
let parent_pid = state.entries[i].playbook_id.clone();
let new_pid = new_entry.playbook_id.clone();
if new_pid.is_empty() {
return Err("new playbook_id must not be empty".into());
}
if new_pid == parent_pid {
return Err("new playbook_id must differ from parent".into());
}
// Enforce chain-metadata integrity — caller doesn't get to
// fabricate these.
new_entry.version = new_version;
new_entry.parent_id = Some(parent_pid.clone());
new_entry.superseded_at = None;
new_entry.superseded_by = None;
let parent_mut = &mut state.entries[i];
parent_mut.superseded_at = Some(now.clone());
parent_mut.superseded_by = Some(new_pid.clone());
state.entries.push(new_entry);
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(ReviseOutcome {
parent_id: parent_pid,
parent_version,
new_playbook_id: new_pid,
new_version,
superseded_at: now,
})
}
/// Phase 27 — return the full version chain that contains this
/// playbook_id, ordered from root (v1) to tip. Walks `parent_id`
/// backward to find the root, then `superseded_by` forward to the
/// tip. Returns empty if the id isn't present. Cycle-safe via a
/// visited set; unreachable in normal operation but the guard is
/// cheap.
pub async fn history(&self, playbook_id: &str) -> Vec<PlaybookEntry> {
let state = self.state.read().await;
let by_id: HashMap<&str, &PlaybookEntry> = state.entries
.iter()
.map(|e| (e.playbook_id.as_str(), e))
.collect();
let Some(seed) = by_id.get(playbook_id).copied() else {
return vec![];
};
// Walk backward to root.
let mut cursor = seed;
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
seen.insert(cursor.playbook_id.clone());
while let Some(pid) = &cursor.parent_id {
let Some(&next) = by_id.get(pid.as_str()) else { break };
if !seen.insert(next.playbook_id.clone()) { break; }
cursor = next;
}
let root = cursor;
// Walk forward to tip.
let mut chain = vec![root.clone()];
let mut cursor = root;
let mut seen_fwd: std::collections::HashSet<String> = std::collections::HashSet::new();
seen_fwd.insert(cursor.playbook_id.clone());
while let Some(nid) = &cursor.superseded_by {
let Some(&next) = by_id.get(nid.as_str()) else { break };
if !seen_fwd.insert(next.playbook_id.clone()) { break; }
cursor = next;
chain.push(cursor.clone());
}
chain
}
/// Stats accessor for the /status endpoint and tests. Returns
/// (total, retired, superseded, failures). Phase 27 added
/// superseded as a distinct counter: a superseded entry is
/// replaced-by-newer-version, which is a different lifecycle event
/// than retired-stop-using.
pub async fn status_counts(&self) -> (usize, usize, usize, usize) {
let state = self.state.read().await;
let total = state.entries.len();
let retired = state.entries.iter().filter(|e| e.retired_at.is_some()).count();
let superseded = state.entries.iter().filter(|e| e.superseded_at.is_some()).count();
let failures = state.failures.len();
(total, retired, superseded, failures)
}
/// Phase 26 — Mem0-style upsert. Decides ADD / UPDATE / NOOP based
/// on whether a non-retired entry with the same operation already
/// exists for the same day. Three outcomes:
///
/// ADD → no matching entry, append the new one
/// UPDATE → existing same-day entry found, merge endorsed_names
/// (union, preserving order) and refresh timestamp.
/// Playbook_id is kept stable so citations from prior
/// boost calls stay valid.
/// NOOP → existing same-day entry with identical
/// endorsed_names. Skip — no duplicates accumulate.
///
/// "Same day" keyed on YYYY-MM-DD of the entry's timestamp so
/// intraday re-seeding of the same operation dedups but tomorrow's
/// seeding for the same operation lands as a fresh ADD (which is
/// correct — a new day is a new event).
pub async fn upsert_entry(&self, new_entry: PlaybookEntry) -> Result<UpsertOutcome, String> {
let new_day = day_key(&new_entry.timestamp);
let new_names_sorted = {
let mut v = new_entry.endorsed_names.clone();
v.sort();
v
};
let mut state = self.state.write().await;
// Find a non-retired entry with same operation + day + city +
// state. Operation string alone would false-match across days
// or across cities that happen to share role+count; city+state
// is already parsed out of operation so adding them to the key
// costs nothing.
let mut existing_idx: Option<usize> = None;
for (i, e) in state.entries.iter().enumerate() {
if e.retired_at.is_some() { continue; }
if e.superseded_at.is_some() { continue; }
if e.operation != new_entry.operation { continue; }
if day_key(&e.timestamp) != new_day { continue; }
if e.city != new_entry.city || e.state != new_entry.state { continue; }
existing_idx = Some(i);
break;
}
match existing_idx {
None => {
let pid = new_entry.playbook_id.clone();
state.entries.push(new_entry);
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Added { playbook_id: pid })
}
Some(i) => {
let mut existing_names_sorted = state.entries[i].endorsed_names.clone();
existing_names_sorted.sort();
if existing_names_sorted == new_names_sorted {
// NOOP — identical data, just report the existing id
let pid = state.entries[i].playbook_id.clone();
Ok(UpsertOutcome::Noop { playbook_id: pid })
} else {
// UPDATE — merge names (union, stable order).
let existing = state.entries.get_mut(i).ok_or("index invalidated")?;
let mut merged: Vec<String> = existing.endorsed_names.clone();
for n in &new_entry.endorsed_names {
if !merged.contains(n) { merged.push(n.clone()); }
}
existing.endorsed_names = merged.clone();
existing.timestamp = new_entry.timestamp.clone();
// Keep original playbook_id. Refresh embedding only
// if the caller passed a non-None one (indicates
// the text shape changed).
if new_entry.embedding.is_some() {
existing.embedding = new_entry.embedding.clone();
}
// Lifecycle / drift-signal fields — refresh only
// when the caller provides a new value. Each was
// previously either ignored entirely (valid_until,
// doc_refs) or only partially handled. Caught by
// the auditor's hybrid fixture on 2026-04-22 when
// re-seeding with fresh doc_refs silently dropped
// them on the UPDATE path.
if new_entry.schema_fingerprint.is_some() {
existing.schema_fingerprint = new_entry.schema_fingerprint.clone();
}
if new_entry.valid_until.is_some() {
existing.valid_until = new_entry.valid_until.clone();
}
if !new_entry.doc_refs.is_empty() {
// Merge by tool (case-insensitive). A new ref
// for the same tool supersedes the old one —
// caller is asserting "this is what I used
// now." Refs for tools the existing entry
// didn't track are appended. This keeps the
// cumulative tool history without letting a
// single re-seed erase it.
let mut merged_refs = existing.doc_refs.clone();
for new_ref in &new_entry.doc_refs {
let hit = merged_refs.iter().position(|r| {
r.tool.eq_ignore_ascii_case(&new_ref.tool)
});
match hit {
Some(idx) => merged_refs[idx] = new_ref.clone(),
None => merged_refs.push(new_ref.clone()),
}
}
existing.doc_refs = merged_refs;
}
let pid = existing.playbook_id.clone();
drop(state);
self.persist().await?;
self.rebuild_geo_index().await;
Ok(UpsertOutcome::Updated { playbook_id: pid, merged_names: merged })
}
}
}
}
pub async fn entry_count(&self) -> usize {
self.state.read().await.entries.len()
}
pub async fn snapshot(&self) -> Vec<PlaybookEntry> {
self.state.read().await.entries.clone()
}
/// Record failure(s). Each added `FailureRecord` is an additional
/// penalty against that worker's positive boost for the same geo.
pub async fn mark_failures(&self, new_failures: Vec<FailureRecord>) -> Result<usize, String> {
if new_failures.is_empty() { return Ok(0); }
let added = new_failures.len();
let mut s = self.state.write().await;
s.failures.extend(new_failures);
drop(s);
self.persist().await?;
Ok(added)
}
/// Count failures per (city, state, name) key. Used by compute_boost_for
/// to dampen positive boost.
pub async fn failure_counts(&self) -> HashMap<(String, String, String), usize> {
let s = self.state.read().await;
let mut counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &s.failures {
*counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
counts
}
/// Given an operation's embedding, find the top-K most similar past
/// playbooks (by cosine similarity) and return a per-worker boost map
/// keyed by (city, state, name). Worker is matched by the tuple so a
/// shared name across cities doesn't cross-pollinate.
///
/// Boost formula: each qualifying playbook contributes
/// `similarity * base_weight / n_workers` to each worker it endorsed,
/// where `base_weight` is tuned to keep the cap realistic without
/// forcing every result to saturate. Total per worker is capped at
/// `MAX_BOOST_PER_WORKER`.
pub async fn compute_boost_for(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
) -> HashMap<(String, String, String), BoostEntry> {
self.compute_boost_for_filtered(query_embedding, top_k_playbooks, base_weight, None).await
}
/// Same as `compute_boost_for` but only considers playbooks whose
/// (city, state) matches the caller's target. This is the honest
/// fix for the "boosts=170 matched=0" pathology: globally-ranked
/// semantic neighbors include playbooks from every city the query
/// could never reach via its SQL filter. When the caller knows the
/// target geo, restricting here collapses noise and raises the
/// endorsed-worker hit rate. Pass None for the original behavior.
///
/// 2026-04-21 — added after a corpus-density batch of 25 runs
/// showed only 6/40 successful (role, city) combos ever triggered
/// a citation on subsequent runs. Diagnostic logging proved the
/// boost map had 170 keys but the 50-candidate pool matched 0.
pub async fn compute_boost_for_filtered(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
target_geo: Option<(&str, &str)>,
) -> HashMap<(String, String, String), BoostEntry> {
self.compute_boost_for_filtered_with_role(query_embedding, top_k_playbooks, base_weight, target_geo, None).await
}
/// Variant that also accepts a target role for pre-filtering.
/// Multi-strategy retrieval: exact (role, city, state) matches skip
/// cosine entirely and earn the maximum boost, since identity on
/// those three fields is the strongest possible similarity signal.
/// Remaining entries (within the same city+state but different
/// role, or unknown role) go through the normal cosine path as a
/// fallback. This addresses the 2026 agent-memory finding that
/// multi-strategy parallel retrieval with rerank outperforms
/// single-strategy semantic search.
pub async fn compute_boost_for_filtered_with_role(
&self,
query_embedding: &[f32],
top_k_playbooks: usize,
base_weight: f32,
target_geo: Option<(&str, &str)>,
target_role: Option<&str>,
) -> HashMap<(String, String, String), BoostEntry> {
let state = self.state.read().await;
let entries = state.entries.clone();
// Build failure map once before dropping the lock so we don't
// hold the read lock across the full scoring loop.
let mut failure_counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &state.failures {
*failure_counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
drop(state);
// Phase 25 validity-window filtering. Happens before geo+cosine
// so retired/expired entries never reach the ranking pool. We
// don't mutate the state here (can't grab a write lock inside
// a read-heavy hot path); retirement_at auto-retirement is a
// separate background pass. Here we just skip anything already
// retired, and entries whose valid_until has elapsed.
let now = chrono::Utc::now();
let active_entries: Vec<&PlaybookEntry> = entries
.iter()
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
// Phase 45 slice 3 — entries flagged for doc drift and
// not yet human-reviewed are excluded from boost. The
// flag is the same strength of exclusion as retirement
// for this purpose; reviewing (via /resolve) re-admits.
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
return false;
}
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.collect();
// Pre-filter by target_geo (city, state) before cosine. Phase 26
// hot cache — use the geo index (O(1) key lookup) instead of a
// linear scan of all entries. Retired entries are excluded from
// the index; valid_until is still checked here since it can
// elapse between index rebuilds.
//
// Owned entries (not references) because the state read-lock is
// released between here and the cosine step — we don't want to
// hold a read lock across the scoring work.
let geo_filtered: Vec<PlaybookEntry> = if let Some((tc, ts)) = target_geo {
let key = (tc.to_ascii_lowercase(), ts.to_ascii_uppercase());
let index = self.geo_index.read().await;
let Some(idxs) = index.get(&key) else { return HashMap::new(); };
let idxs = idxs.clone();
drop(index);
let state = self.state.read().await;
idxs.into_iter()
.filter_map(|i| state.entries.get(i))
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
// Phase 45 slice 3 — same drift exclusion as the
// non-geo path above. Keeps the two filter paths
// consistent so a flagged entry is invisible to
// BOTH geo-targeted and global boost queries.
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
return false;
}
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
}
}
true
})
.cloned()
.collect()
} else {
active_entries.into_iter().cloned().collect()
};
// Multi-strategy: split the geo-filtered pool into (exact role
// match) vs (other). Exact matches skip cosine — they're already
// the strongest signal possible. References into geo_filtered
// which owns the entries.
let mut exact_matches: Vec<&PlaybookEntry> = Vec::new();
let mut cosine_pool: Vec<(f32, &PlaybookEntry)> = Vec::new();
let role_needle = target_role
.map(|r| format!("fill: {} ", r).to_ascii_lowercase());
for e in geo_filtered.iter() {
let is_exact = role_needle.as_ref()
.map(|needle| e.operation.to_ascii_lowercase().contains(needle))
.unwrap_or(false);
if is_exact {
exact_matches.push(e);
} else if let Some(v) = &e.embedding {
cosine_pool.push((cosine(query_embedding, v), e));
}
}
cosine_pool.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
// Allocate top_k across the two pools — exact matches first,
// then cosine fills the rest. Identity beats similarity.
let exact_take = exact_matches.len().min(top_k_playbooks.max(1) / 2 + 1);
let cosine_take = top_k_playbooks.saturating_sub(exact_take);
let mut scored: Vec<(f32, &PlaybookEntry)> = exact_matches
.into_iter()
.take(exact_take)
.map(|e| (1.0_f32, e))
.collect();
scored.extend(cosine_pool.into_iter().take(cosine_take));
let now = chrono::Utc::now();
let mut boosts: HashMap<(String, String, String), BoostEntry> = HashMap::new();
for (similarity, pb) in &scored {
// Negative or near-zero similarity = not actually related;
// skip so we don't inject noise when the memory is sparse.
if *similarity <= 0.05 { continue; }
let Some(city) = &pb.city else { continue; };
let Some(state) = &pb.state else { continue; };
let n_workers = pb.endorsed_names.len().max(1);
// Path 1 — temporal decay. Older playbooks weight less. Failure
// to parse the timestamp degrades to "no decay" (treat as fresh)
// rather than dropping the entry entirely; keeps backward
// compatibility with seed payloads that omitted timestamp.
let decay = chrono::DateTime::parse_from_rfc3339(&pb.timestamp)
.ok()
.map(|t| {
let age_days = (now.signed_duration_since(t.with_timezone(&chrono::Utc))
.num_seconds() as f32) / 86400.0;
if age_days <= 0.0 { 1.0 }
else { (-age_days / BOOST_HALF_LIFE_DAYS).exp() }
})
.unwrap_or(1.0);
let per_worker = similarity * base_weight * decay / (n_workers as f32);
for name in &pb.endorsed_names {
let key = (city.clone(), state.clone(), name.clone());
// Path 1 negative signal — each recorded failure halves
// this worker's contribution. Three failures → 0.125x.
// Five failures → ~0.03x (effectively zero). Caps at
// 20 before we clamp to avoid degen cases.
let fail_count = failure_counts.get(&key).copied().unwrap_or(0).min(20);
let penalty = 0.5_f32.powi(fail_count as i32);
let entry = boosts.entry(key).or_insert(BoostEntry {
boost: 0.0,
citations: Vec::new(),
});
entry.boost = (entry.boost + per_worker * penalty).min(MAX_BOOST_PER_WORKER);
if !entry.citations.contains(&pb.playbook_id) {
entry.citations.push(pb.playbook_id.clone());
}
}
}
boosts
}
}
/// Cosine similarity — pulled out so rebuild/boost share one impl.
fn cosine(a: &[f32], b: &[f32]) -> f32 {
let (mut dot, mut na, mut nb) = (0.0_f32, 0.0_f32, 0.0_f32);
let n = a.len().min(b.len());
for i in 0..n {
dot += a[i] * b[i];
na += a[i] * a[i];
nb += b[i] * b[i];
}
if na == 0.0 || nb == 0.0 { return 0.0; }
dot / (na.sqrt() * nb.sqrt())
}
// ---------------- Pattern discovery (Path 2 — meta-index) ----------------
//
// Phase 19's boost path answers "for THIS exact city + role, which workers
// have we used before?" Pattern discovery answers a different question:
// "for queries like this one, what TRAITS have past successful fills had
// in common — even if no exact prior playbook covers this geo?"
//
// The discovered pattern surfaces signals the operator didn't query for:
// e.g. "every successful Welder fill we've seen carried OSHA-10 + lockout
// /tagout — you may want to filter on those." That's the meta-index
// dimension of the original PRD: identify things we didn't know about.
#[derive(Debug, Clone, Serialize)]
pub struct PatternReport {
pub query: String,
pub matched_playbooks: usize,
pub total_workers_examined: usize,
pub common_certifications: Vec<TraitFreq>,
pub common_skills: Vec<TraitFreq>,
pub modal_archetype: Option<String>,
pub reliability_p50: f64,
pub reliability_min: f64,
pub reliability_max: f64,
pub matched_playbook_ids: Vec<String>,
pub discovered_pattern: String,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct TraitFreq {
pub name: String,
pub count: usize,
pub frequency: f32,
}
pub async fn discover_patterns(
memory: &PlaybookMemory,
ai_client: &AiClient,
catalog: &catalogd::registry::Registry,
buckets: &Arc<storaged::registry::BucketRegistry>,
query: &str,
top_k_playbooks: usize,
min_trait_frequency: f32,
) -> Result<PatternReport, String> {
let t0 = std::time::Instant::now();
// 1. Embed the query through the same nomic-embed-text model used
// for playbook embeddings, so cosine is meaningful.
let resp = ai_client
.embed(EmbedRequest { texts: vec![query.into()], model: None })
.await
.map_err(|e| format!("embed query: {e}"))?;
if resp.embeddings.is_empty() {
return Err("embed returned no vectors".into());
}
let qv: Vec<f32> = resp.embeddings[0].iter().map(|x| *x as f32).collect();
// 2. Find top-K most similar past playbooks (cosine over embeddings).
let entries = memory.snapshot().await;
let mut scored: Vec<(f32, &PlaybookEntry)> = entries
.iter()
.filter_map(|e| e.embedding.as_ref().map(|v| (cosine(&qv, v), e)))
.collect();
scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(top_k_playbooks);
let matched: Vec<(f32, PlaybookEntry)> = scored
.into_iter()
.filter(|(s, _)| *s > 0.05)
.map(|(s, e)| (s, e.clone()))
.collect();
if matched.is_empty() {
return Ok(PatternReport {
query: query.into(),
matched_playbooks: 0,
total_workers_examined: 0,
common_certifications: vec![],
common_skills: vec![],
modal_archetype: None,
reliability_p50: 0.0, reliability_min: 0.0, reliability_max: 0.0,
matched_playbook_ids: vec![],
discovered_pattern: "No similar past playbooks found.".into(),
duration_secs: t0.elapsed().as_secs_f32(),
});
}
// 3. Pull each endorsed worker's full profile from workers_500k.
// Restrict by (name, city, state) tuple so cross-city homonyms
// don't pollute the aggregate.
let mut conditions: Vec<String> = Vec::new();
let mut matched_ids: Vec<String> = Vec::new();
for (_, pb) in &matched {
matched_ids.push(pb.playbook_id.clone());
let (Some(city), Some(state)) = (pb.city.as_ref(), pb.state.as_ref()) else { continue };
for name in &pb.endorsed_names {
let esc = |s: &str| s.replace('\'', "''");
conditions.push(format!(
"(name = '{}' AND city = '{}' AND state = '{}')",
esc(name), esc(city), esc(state)
));
}
}
if conditions.is_empty() {
return Ok(PatternReport {
query: query.into(),
matched_playbooks: matched.len(),
total_workers_examined: 0,
common_certifications: vec![], common_skills: vec![],
modal_archetype: None, reliability_p50: 0.0,
reliability_min: 0.0, reliability_max: 0.0,
matched_playbook_ids: matched_ids,
discovered_pattern: "Matched playbooks but no endorsed names with city/state to lookup.".into(),
duration_secs: t0.elapsed().as_secs_f32(),
});
}
let sql = format!(
"SELECT name, role, city, state, certifications, skills, archetype, \
CAST(reliability AS DOUBLE) as reliability \
FROM workers_500k WHERE {} LIMIT 500",
conditions.join(" OR ")
);
let engine = queryd::context::QueryEngine::new(
catalog.clone(), buckets.clone(), queryd::cache::MemCache::new(0),
);
let batches = engine.query(&sql).await.map_err(|e| format!("worker lookup: {e}"))?;
// 4. Aggregate. Pipe-separated cert/skill lists, single-string archetype,
// numeric reliability. Frequencies are share-of-workers.
use arrow::array::{Array, AsArray};
let mut cert_counts: HashMap<String, usize> = HashMap::new();
let mut skill_counts: HashMap<String, usize> = HashMap::new();
let mut arch_counts: HashMap<String, usize> = HashMap::new();
let mut reliabilities: Vec<f64> = Vec::new();
let mut total = 0usize;
let get_string = |b: &arrow::record_batch::RecordBatch, col: &str, row: usize| -> String {
let Some(c) = b.column_by_name(col) else { return String::new(); };
if let Some(arr) = c.as_string_view_opt() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
if let Some(arr) = c.as_string_opt::<i32>() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
String::new()
};
let get_f64 = |b: &arrow::record_batch::RecordBatch, col: &str, row: usize| -> f64 {
let Some(c) = b.column_by_name(col) else { return 0.0; };
if let Some(arr) = c.as_primitive_opt::<arrow::datatypes::Float64Type>() {
if arr.is_null(row) { return 0.0; }
return arr.value(row);
}
0.0
};
for b in &batches {
for row in 0..b.num_rows() {
total += 1;
let certs = get_string(b, "certifications", row);
for c in certs.split(['|', ',']).map(|s| s.trim()).filter(|s| !s.is_empty() && *s != "none") {
*cert_counts.entry(c.to_string()).or_insert(0) += 1;
}
let skills = get_string(b, "skills", row);
for s in skills.split(['|', ',']).map(|s| s.trim()).filter(|s| !s.is_empty()) {
*skill_counts.entry(s.to_string()).or_insert(0) += 1;
}
let arch = get_string(b, "archetype", row);
if !arch.is_empty() {
*arch_counts.entry(arch).or_insert(0) += 1;
}
let rel = get_f64(b, "reliability", row);
if rel > 0.0 { reliabilities.push(rel); }
}
}
let total_f = total.max(1) as f32;
let to_freq = |m: HashMap<String, usize>, min: f32| -> Vec<TraitFreq> {
let mut v: Vec<TraitFreq> = m.into_iter()
.map(|(name, count)| TraitFreq { name, count, frequency: count as f32 / total_f })
.filter(|t| t.frequency >= min)
.collect();
v.sort_by(|a, b| b.count.cmp(&a.count));
v.truncate(8);
v
};
let common_certifications = to_freq(cert_counts, min_trait_frequency);
let common_skills = to_freq(skill_counts, min_trait_frequency);
let modal_archetype = arch_counts.into_iter()
.max_by_key(|(_, c)| *c)
.map(|(name, _)| name);
reliabilities.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = if reliabilities.is_empty() { 0.0 } else { reliabilities[reliabilities.len() / 2] };
let rmin = reliabilities.first().copied().unwrap_or(0.0);
let rmax = reliabilities.last().copied().unwrap_or(0.0);
// Build a human-readable discovered-pattern summary
let mut parts: Vec<String> = vec![
format!("Across {} similar past playbooks ({} workers examined)", matched.len(), total),
];
if !common_certifications.is_empty() {
let head: Vec<String> = common_certifications.iter().take(3)
.map(|t| format!("{} ({:.0}%)", t.name, t.frequency * 100.0)).collect();
parts.push(format!("recurring certifications: {}", head.join(", ")));
}
if !common_skills.is_empty() {
let head: Vec<String> = common_skills.iter().take(3)
.map(|t| format!("{} ({:.0}%)", t.name, t.frequency * 100.0)).collect();
parts.push(format!("recurring skills: {}", head.join(", ")));
}
if let Some(a) = &modal_archetype { parts.push(format!("archetype mostly: {a}")); }
if !reliabilities.is_empty() {
parts.push(format!("reliability median {:.2} (range {:.2}{:.2})", p50, rmin, rmax));
}
let discovered_pattern = parts.join(" · ");
Ok(PatternReport {
query: query.into(),
matched_playbooks: matched.len(),
total_workers_examined: total,
common_certifications, common_skills,
modal_archetype, reliability_p50: p50,
reliability_min: rmin, reliability_max: rmax,
matched_playbook_ids: matched_ids,
discovered_pattern,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
// ---------------- Persist memory → SQL (Path 2 foundation) ----------------
#[derive(Debug, Clone, Serialize)]
pub struct PersistReport {
pub rows_persisted: usize,
pub dataset_name: String,
pub fingerprint: String,
pub duration_secs: f32,
}
/// Dump current in-memory state to a queryable Parquet under
/// `successful_playbooks_live`. Registers fresh objects each call — safe
/// because in-memory state is the source of truth here, so REPLACING the
/// objects list reflects the real state, not destroying it.
///
/// Distinct from the existing `successful_playbooks` dataset (which is
/// read by `rebuild()`), so this never collides with operator imports of
/// historical playbook data. Recruiter-facing SQL surfaces should query
/// `successful_playbooks_live` for current operator activity.
pub async fn persist_to_sql(
memory: &PlaybookMemory,
catalog: &catalogd::registry::Registry,
) -> Result<PersistReport, String> {
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
let t0 = std::time::Instant::now();
let entries = memory.snapshot().await;
let schema = Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Utf8, true),
Field::new("operation", DataType::Utf8, true),
Field::new("approach", DataType::Utf8, true),
Field::new("result", DataType::Utf8, true),
Field::new("context", DataType::Utf8, true),
]));
let timestamps: Vec<&str> = entries.iter().map(|e| e.timestamp.as_str()).collect();
let operations: Vec<&str> = entries.iter().map(|e| e.operation.as_str()).collect();
let approaches: Vec<&str> = entries.iter().map(|e| e.approach.as_str()).collect();
let contexts: Vec<&str> = entries.iter().map(|e| e.context.as_str()).collect();
// Result column is reconstructed from endorsed_names so SQL queries
// against successful_playbooks_live see the same shape as the original
// CSV-fed successful_playbooks ("N/N filled → Name1, Name2").
let results: Vec<String> = entries.iter().map(|e| {
if e.endorsed_names.is_empty() {
String::new()
} else {
let n = e.endorsed_names.len();
format!("{}/{} filled → {}", n, n, e.endorsed_names.join(", "))
}
}).collect();
let result_refs: Vec<&str> = results.iter().map(|s| s.as_str()).collect();
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(StringArray::from(timestamps)),
Arc::new(StringArray::from(operations)),
Arc::new(StringArray::from(approaches)),
Arc::new(StringArray::from(result_refs)),
Arc::new(StringArray::from(contexts)),
]).map_err(|e| format!("build record batch: {e}"))?;
let parquet_bytes = shared::arrow_helpers::record_batch_to_parquet(&batch)?;
let fp = shared::arrow_helpers::fingerprint_schema(&schema);
let key = "datasets/successful_playbooks_live.parquet";
ops::put(&memory.store, key, parquet_bytes.clone()).await?;
let obj = shared::types::ObjectRef {
bucket: "primary".into(),
key: key.into(),
size_bytes: parquet_bytes.len() as u64,
created_at: chrono::Utc::now(),
};
let manifest = catalog.register(
"successful_playbooks_live".into(),
fp.clone(),
vec![obj],
).await?;
Ok(PersistReport {
rows_persisted: entries.len(),
dataset_name: manifest.name,
fingerprint: fp.0,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
// ---------------- Rebuild (the core of Phase 19) ----------------
#[derive(Debug, Clone, Serialize)]
pub struct RebuildReport {
pub rows_scanned: usize,
pub entries_built: usize,
pub total_names_endorsed: usize,
pub duration_secs: f32,
}
/// Full rebuild: scan `successful_playbooks`, extract endorsements, embed
/// each row's operation+approach+context, replace the in-memory state.
///
/// Returns the report so callers can show operators what happened.
pub async fn rebuild(
memory: &PlaybookMemory,
ai_client: &AiClient,
catalog: &catalogd::registry::Registry,
buckets: &Arc<storaged::registry::BucketRegistry>,
) -> Result<RebuildReport, String> {
let t0 = std::time::Instant::now();
// 1. Pull every row of successful_playbooks through the query engine.
let sql = "SELECT timestamp, operation, approach, result, context \
FROM successful_playbooks";
let engine = queryd::context::QueryEngine::new(
catalog.clone(),
buckets.clone(),
queryd::cache::MemCache::new(0),
);
let batches = engine
.query(sql)
.await
.map_err(|e| format!("query successful_playbooks: {e}"))?;
let mut rows: Vec<(String, String, String, String, String)> = Vec::new();
for b in &batches {
let n = b.num_rows();
let get = |col: &str, row: usize| -> String {
use arrow::array::{Array, AsArray};
let Some(c) = b.column_by_name(col) else { return String::new(); };
if let Some(arr) = c.as_string_view_opt() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
if let Some(arr) = c.as_string_opt::<i32>() {
if arr.is_null(row) { return String::new(); }
return arr.value(row).to_string();
}
String::new()
};
for row in 0..n {
rows.push((
get("timestamp", row),
get("operation", row),
get("approach", row),
get("result", row),
get("context", row),
));
}
}
let rows_scanned = rows.len();
// 2. For each row, build a PlaybookEntry (no embedding yet). Parse
// the operation for (city, state) and the result for names.
let mut entries: Vec<PlaybookEntry> = rows
.into_iter()
.map(|(ts, op, approach, result, ctx)| {
let (city, state) = parse_city_state(&op);
let names = parse_names(&result);
PlaybookEntry {
playbook_id: stable_id(&ts, &op),
operation: op,
approach,
context: ctx,
timestamp: ts,
endorsed_names: names,
city,
state,
// Rebuild doesn't know fingerprints or doc_refs;
// historical entries get no drift signal until a seed
// supersedes them or /retire is called manually.
..Default::default()
}
})
.collect();
// 3. Embed in one batch. Sidecar's embed handles batching internally;
// chunk here to ~64 per request to keep memory flat.
const EMBED_BATCH: usize = 64;
for chunk_start in (0..entries.len()).step_by(EMBED_BATCH) {
let end = (chunk_start + EMBED_BATCH).min(entries.len());
let texts: Vec<String> = entries[chunk_start..end]
.iter()
.map(embed_text)
.collect();
let req = EmbedRequest { texts, model: None };
let resp = ai_client
.embed(req)
.await
.map_err(|e| format!("embed batch [{chunk_start}..{end}]: {e}"))?;
for (i, v) in resp.embeddings.iter().enumerate() {
let f32v: Vec<f32> = v.iter().map(|&x| x as f32).collect();
entries[chunk_start + i].embedding = Some(f32v);
}
}
let total_names_endorsed: usize = entries.iter().map(|e| e.endorsed_names.len()).sum();
let entries_built = entries.len();
memory.set_entries(entries).await?;
Ok(RebuildReport {
rows_scanned,
entries_built,
total_names_endorsed,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
fn embed_text(e: &PlaybookEntry) -> String {
// Compact one-liner per playbook. Excludes timestamp (no semantic
// signal) and includes the fills as words (they're occasionally
// meaningful — "Luis Harris" might semantically correlate with
// Spanish-speaker names in future queries).
format!(
"{} | {} | {} | fills: {}",
e.operation,
e.approach,
e.context,
e.endorsed_names.join(", "),
)
}
/// Derive a stable id from (timestamp, operation). Two playbooks with
/// identical timestamp+operation collapse to one — benign dedup.
fn stable_id(ts: &str, op: &str) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(ts.as_bytes());
h.update(b"|");
h.update(op.as_bytes());
let bytes = h.finalize();
format!("pb-{}", hex_short(&bytes, 12))
}
fn hex_short(b: &[u8], n: usize) -> String {
let mut s = String::with_capacity(n * 2);
for byte in &b[..b.len().min(n)] {
s.push_str(&format!("{byte:02x}"));
}
s
}
/// Parse "fill: Welder x2 in Toledo, OH" → ("Toledo", "OH").
/// Returns None for malformed operations.
fn parse_city_state(op: &str) -> (Option<String>, Option<String>) {
// Split on " in " then parse "City, ST"
let after_in = match op.split(" in ").nth(1) {
Some(s) => s,
None => return (None, None),
};
let parts: Vec<&str> = after_in.splitn(2, ',').collect();
if parts.len() != 2 {
return (None, None);
}
let city = parts[0].trim().to_string();
// state might be followed by more context; take leading alpha chars
let state: String = parts[1].trim()
.chars()
.take_while(|c| c.is_ascii_alphabetic())
.collect();
if city.is_empty() || state.is_empty() {
return (None, None);
}
(Some(city), Some(state))
}
/// Parse "2/2 filled → Matthew Roberts, Amy Davis" → ["Matthew Roberts", "Amy Davis"].
fn parse_names(result: &str) -> Vec<String> {
// Everything after the arrow; split on ", ".
let after_arrow = match result.split('→').nth(1) {
Some(s) => s.trim(),
None => return Vec::new(),
};
// Strip trailing noise like "(and N more)" that some emitters add.
let cleaned = after_arrow.split(" (").next().unwrap_or(after_arrow);
cleaned
.split(',')
.map(|n| n.trim().to_string())
.filter(|n| !n.is_empty())
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_city_state_extracts_both() {
let (c, s) = parse_city_state("fill: Welder x2 in Toledo, OH");
assert_eq!(c.as_deref(), Some("Toledo"));
assert_eq!(s.as_deref(), Some("OH"));
}
#[test]
fn parse_city_state_handles_multiword_city() {
let (c, s) = parse_city_state("fill: Loader x1 in Grand Rapids, MI");
assert_eq!(c.as_deref(), Some("Grand Rapids"));
assert_eq!(s.as_deref(), Some("MI"));
}
#[test]
fn parse_city_state_malformed_returns_none() {
let (c, s) = parse_city_state("fill: something weird");
assert!(c.is_none());
assert!(s.is_none());
}
#[test]
fn parse_names_extracts_after_arrow() {
let ns = parse_names("2/2 filled → Matthew Roberts, Amy Davis");
assert_eq!(ns, vec!["Matthew Roberts".to_string(), "Amy Davis".to_string()]);
}
#[test]
fn parse_names_handles_single_fill() {
let ns = parse_names("1/1 filled → Jose Reed");
assert_eq!(ns, vec!["Jose Reed".to_string()]);
}
#[test]
fn parse_names_handles_no_arrow() {
let ns = parse_names("0/2 filled");
assert!(ns.is_empty());
}
#[test]
fn stable_id_is_deterministic() {
let a = stable_id("2026-04-20T00:00:00Z", "fill: Welder x2 in Toledo, OH");
let b = stable_id("2026-04-20T00:00:00Z", "fill: Welder x2 in Toledo, OH");
assert_eq!(a, b);
assert!(a.starts_with("pb-"));
}
#[test]
fn boost_caps_per_worker() {
// Even with 100 similar playbooks all endorsing the same name, the
// boost never exceeds MAX_BOOST_PER_WORKER.
let pm = PlaybookMemory::new(Arc::new(object_store::memory::InMemory::new()));
let entries: Vec<PlaybookEntry> = (0..100)
.map(|i| PlaybookEntry {
playbook_id: format!("pb-{i}"),
operation: "fill: Welder x1 in Toledo, OH".into(),
approach: "transfer".into(),
timestamp: "2026-04-20".into(),
endorsed_names: vec!["Deborah Powell".into()],
city: Some("Toledo".into()),
state: Some("OH".into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
..Default::default()
})
.collect();
tokio::runtime::Runtime::new().unwrap().block_on(async {
pm.set_entries(entries).await.unwrap();
let boosts = pm.compute_boost_for(&[1.0, 0.0, 0.0], 100, 0.5).await;
let key = ("Toledo".into(), "OH".into(), "Deborah Powell".into());
let entry = boosts.get(&key).expect("boost entry present");
assert!(entry.boost <= MAX_BOOST_PER_WORKER + 1e-6,
"boost {} exceeded cap {}", entry.boost, MAX_BOOST_PER_WORKER);
});
}
}
#[cfg(test)]
mod validity_window_tests {
use super::*;
use object_store::memory::InMemory;
fn mkentry(id: &str, city: &str, state: &str, fingerprint: Option<String>, valid_until: Option<String>) -> PlaybookEntry {
PlaybookEntry {
playbook_id: id.into(),
operation: format!("fill: Welder x1 in {city}, {state}"),
approach: "hybrid".into(),
context: "test".into(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: vec!["Test Worker".into()],
city: Some(city.into()),
state: Some(state.into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
schema_fingerprint: fingerprint,
valid_until,
..Default::default()
}
}
#[tokio::test]
async fn retire_one_marks_entry_and_persists() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mkentry("pb-1", "Nashville", "TN", None, None)]).await.unwrap();
let touched = pm.retire_one("pb-1", "manual test").await.unwrap();
assert!(touched);
let (total, retired, _, _) = pm.status_counts().await;
assert_eq!(total, 1);
assert_eq!(retired, 1);
// Second retirement is a no-op
let second = pm.retire_one("pb-1", "again").await.unwrap();
assert!(!second);
}
#[tokio::test]
async fn retired_entries_do_not_boost() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mkentry("pb-active", "Nashville", "TN", None, None);
let mut e2 = mkentry("pb-retired", "Nashville", "TN", None, None);
e2.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1, e2]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
// Only the active entry should surface. Both endorse the same
// name so we check citation count isn't doubled — presence of
// the retired playbook id in citations would mean it slipped
// through.
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-retired".to_string()));
assert!(entry.citations.contains(&"pb-active".to_string()));
}
#[tokio::test]
async fn expired_valid_until_is_skipped() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
let future = (chrono::Utc::now() + chrono::Duration::days(1)).to_rfc3339();
let e_expired = mkentry("pb-expired", "Nashville", "TN", None, Some(past));
let e_alive = mkentry("pb-alive", "Nashville", "TN", None, Some(future));
pm.set_entries(vec![e_expired, e_alive]).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder")
).await;
let entry = boosts.get(&("Nashville".into(), "TN".into(), "Test Worker".into())).unwrap();
assert!(!entry.citations.contains(&"pb-expired".to_string()));
assert!(entry.citations.contains(&"pb-alive".to_string()));
}
#[tokio::test]
async fn schema_drift_retires_mismatched_fingerprints_only() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_old = mkentry("pb-old-schema", "Nashville", "TN", Some("fp-v1".into()), None);
let e_new = mkentry("pb-new-schema", "Nashville", "TN", Some("fp-v2".into()), None);
let e_legacy = mkentry("pb-no-fp", "Nashville", "TN", None, None);
pm.set_entries(vec![e_old, e_new, e_legacy]).await.unwrap();
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test migration").await.unwrap();
// Only pb-old-schema should be retired — pb-new-schema matches,
// pb-no-fp has no fingerprint so it's legacy-safe.
assert_eq!(retired, 1);
let (_, total_retired, _, _) = pm.status_counts().await;
assert_eq!(total_retired, 1);
}
#[tokio::test]
async fn schema_drift_skips_other_cities() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_tn = mkentry("pb-tn", "Nashville", "TN", Some("fp-v1".into()), None);
let e_il = mkentry("pb-il", "Chicago", "IL", Some("fp-v1".into()), None);
pm.set_entries(vec![e_tn, e_il]).await.unwrap();
// Nashville migration shouldn't touch Chicago
let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test").await.unwrap();
assert_eq!(retired, 1);
let (_, r, _, _) = pm.status_counts().await;
assert_eq!(r, 1);
}
}
#[cfg(test)]
mod upsert_tests {
use super::*;
use object_store::memory::InMemory;
fn mk(op: &str, day: &str, names: &[&str]) -> PlaybookEntry {
PlaybookEntry {
playbook_id: format!("pb-{}-{}", op.replace(' ', "_"), day),
operation: op.into(),
approach: "seed".into(),
context: "test".into(),
timestamp: format!("{day}T12:00:00Z"),
endorsed_names: names.iter().map(|s| s.to_string()).collect(),
city: Some("Nashville".into()),
state: Some("TN".into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
..Default::default()
}
}
#[tokio::test]
async fn first_seed_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
match pm.upsert_entry(e).await.unwrap() {
UpsertOutcome::Added { .. } => {}
other => panic!("expected Added, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn identical_reseed_is_noop() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
pm.upsert_entry(e1).await.unwrap();
let outcome = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(outcome, UpsertOutcome::Noop { .. }));
// Still exactly one entry, no duplicate from the re-seed.
assert_eq!(pm.entry_count().await, 1);
}
#[tokio::test]
async fn same_day_different_names_updates_and_merges() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith", "Bob Jones"]);
let o1 = pm.upsert_entry(e1).await.unwrap();
let pid = match o1 {
UpsertOutcome::Added { playbook_id: p } => p,
other => panic!("expected Added, got {:?}", other),
};
let o2 = pm.upsert_entry(e2).await.unwrap();
match o2 {
UpsertOutcome::Updated { playbook_id, merged_names } => {
assert_eq!(playbook_id, pid, "Updated should keep original playbook_id");
assert_eq!(merged_names, vec!["Alice Smith".to_string(), "Bob Jones".to_string()]);
}
other => panic!("expected Updated, got {:?}", other),
}
assert_eq!(pm.entry_count().await, 1, "Updated must not create a duplicate");
}
#[tokio::test]
async fn different_day_same_op_is_add() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-22", &["Alice Smith"]);
pm.upsert_entry(e1).await.unwrap();
let o2 = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o2, UpsertOutcome::Added { .. }), "different day → fresh ADD");
assert_eq!(pm.entry_count().await, 2);
}
#[tokio::test]
async fn retired_entry_doesnt_block_new_seed() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e1 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Alice Smith"]);
e1.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1]).await.unwrap();
// A new seed on same day should ADD, not merge into the retired one.
let e2 = mk("fill: Welder x2 in Nashville, TN", "2026-04-21", &["Carol Davis"]);
let o = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o, UpsertOutcome::Added { .. }));
assert_eq!(pm.entry_count().await, 2);
}
// Regression test for the auditor-discovered bug: UPDATE branch
// silently dropped doc_refs, valid_until, and (partially)
// schema_fingerprint from the incoming entry.
//
// Before the fix: a re-seed with different endorsed_names AND new
// doc_refs would merge names but discard the new doc_refs. After:
// both sets are unioned (doc_refs merged by tool, same-tool
// supersedes).
fn docref(tool: &str, version: &str) -> DocRef {
DocRef {
tool: tool.into(),
version_seen: version.into(),
snippet_hash: Some(format!("hash-{tool}-{version}")),
source_url: None,
seen_at: "2026-04-22T00:00:00Z".into(),
}
}
fn mk_with_docs(op: &str, day: &str, names: &[&str], docs: Vec<DocRef>) -> PlaybookEntry {
let mut e = mk(op, day, names);
e.doc_refs = docs;
e
}
#[tokio::test]
async fn update_merges_doc_refs_with_existing_ones() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let day = "2026-04-22";
let op = "fill: Welder x2 in Nashville, TN";
// First seed: Alice + Docker doc ref
let e1 = mk_with_docs(op, day, &["Alice"], vec![docref("docker", "24.0.7")]);
let o1 = pm.upsert_entry(e1).await.unwrap();
assert!(matches!(o1, UpsertOutcome::Added { .. }));
// Second seed on same (op, day) but new names AND new tool ref
let e2 = mk_with_docs(op, day, &["Bob"], vec![docref("terraform", "1.5.0")]);
let o2 = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o2, UpsertOutcome::Updated { .. }));
assert_eq!(pm.entry_count().await, 1); // still one entry, merged
let snap = pm.snapshot().await;
assert_eq!(snap.len(), 1);
let merged = &snap[0];
// Names unioned
assert!(merged.endorsed_names.contains(&"Alice".to_string()));
assert!(merged.endorsed_names.contains(&"Bob".to_string()));
// Both doc_refs preserved (different tools → union)
assert_eq!(merged.doc_refs.len(), 2);
let tools: Vec<_> = merged.doc_refs.iter().map(|r| r.tool.as_str()).collect();
assert!(tools.contains(&"docker"));
assert!(tools.contains(&"terraform"));
}
#[tokio::test]
async fn update_same_tool_supersedes_older_version() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let day = "2026-04-22";
let op = "fill: Welder x2 in Nashville, TN";
let e1 = mk_with_docs(op, day, &["Alice"], vec![docref("docker", "24.0.7")]);
pm.upsert_entry(e1).await.unwrap();
// Same day, different name, SAME tool newer version
let e2 = mk_with_docs(op, day, &["Bob"], vec![docref("docker", "25.0.1")]);
let o2 = pm.upsert_entry(e2).await.unwrap();
assert!(matches!(o2, UpsertOutcome::Updated { .. }));
let snap = pm.snapshot().await;
assert_eq!(snap[0].doc_refs.len(), 1, "same tool should supersede, not duplicate");
assert_eq!(snap[0].doc_refs[0].version_seen, "25.0.1");
}
#[tokio::test]
async fn update_preserves_existing_doc_refs_when_new_entry_has_none() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let day = "2026-04-22";
let op = "fill: Welder x2 in Nashville, TN";
let e1 = mk_with_docs(op, day, &["Alice"], vec![docref("docker", "24.0.7")]);
pm.upsert_entry(e1).await.unwrap();
// Second seed doesn't carry doc_refs — shouldn't wipe the existing
let e2 = mk(op, day, &["Bob"]); // no doc_refs
pm.upsert_entry(e2).await.unwrap();
let snap = pm.snapshot().await;
assert_eq!(snap[0].doc_refs.len(), 1, "empty new doc_refs shouldn't wipe existing");
assert_eq!(snap[0].doc_refs[0].tool, "docker");
}
// ─── Phase 45 slice 3 — doc_drift flag / resolve / boost exclusion ───
#[tokio::test]
async fn flag_doc_drift_stamps_timestamp_and_persists() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
pm.set_entries(vec![e.clone()]).await.unwrap();
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
assert!(touched, "first flag should return true");
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
assert!(fetched.doc_drift_flagged_at.is_some(), "flag_doc_drift should stamp the timestamp");
assert!(fetched.doc_drift_reviewed_at.is_none(), "reviewed_at starts None on fresh flag");
}
#[tokio::test]
async fn flag_doc_drift_is_idempotent_on_already_flagged() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-01T00:00:00Z".into());
pm.set_entries(vec![e.clone()]).await.unwrap();
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
assert!(!touched, "already-flagged, unreviewed entry shouldn't re-stamp");
}
#[tokio::test]
async fn resolve_doc_drift_clears_flag_admission_gate() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-21T00:00:00Z".into());
pm.set_entries(vec![e.clone()]).await.unwrap();
let resolved = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
assert!(resolved);
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
assert!(fetched.doc_drift_reviewed_at.is_some());
// Second resolve is a no-op
let again = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
assert!(!again);
}
#[tokio::test]
async fn boost_excludes_flagged_unreviewed_entries() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_clean = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
let mut e_flagged = mk("fill: Welder x1 in Nashville, TN", "2026-04-21", &["Bob"]);
e_flagged.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
e_flagged.doc_drift_reviewed_at = None;
pm.set_entries(vec![e_clean, e_flagged]).await.unwrap();
let boosts = pm
.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 10, 0.5,
Some(("Nashville", "TN")), None,
).await;
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
assert!(keys.contains(&"Alice".to_string()));
assert!(!keys.contains(&"Bob".to_string()), "flagged+unreviewed entry leaked into boost");
}
#[tokio::test]
async fn boost_re_admits_resolved_entries() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
e.doc_drift_reviewed_at = Some("2026-04-22T00:01:00Z".into()); // human reviewed
pm.set_entries(vec![e]).await.unwrap();
let boosts = pm
.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 10, 0.5,
Some(("Nashville", "TN")), None,
).await;
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
assert!(keys.contains(&"Alice".to_string()), "resolved entry should re-enter boost pool");
}
#[tokio::test]
async fn update_refreshes_valid_until_when_caller_provides_one() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let day = "2026-04-22";
let op = "fill: Welder x2 in Nashville, TN";
// First seed with no valid_until
let e1 = mk(op, day, &["Alice"]);
pm.upsert_entry(e1).await.unwrap();
// Second seed with a valid_until deadline
let mut e2 = mk(op, day, &["Bob"]);
e2.valid_until = Some("2026-05-01T00:00:00Z".into());
pm.upsert_entry(e2).await.unwrap();
let snap = pm.snapshot().await;
assert_eq!(snap[0].valid_until.as_deref(), Some("2026-05-01T00:00:00Z"));
}
}
#[cfg(test)]
mod version_tests {
use super::*;
use object_store::memory::InMemory;
fn mk(id: &str, city: &str, state: &str) -> PlaybookEntry {
PlaybookEntry {
playbook_id: id.into(),
operation: format!("fill: Welder x1 in {city}, {state}"),
approach: "hybrid".into(),
context: "test".into(),
timestamp: chrono::Utc::now().to_rfc3339(),
endorsed_names: vec!["Alice Smith".into()],
city: Some(city.into()),
state: Some(state.into()),
embedding: Some(vec![1.0, 0.0, 0.0]),
..Default::default()
}
}
#[tokio::test]
async fn revise_stamps_chain_metadata_on_both_ends() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
let outcome = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN"))
.await
.expect("revise should succeed against active root");
assert_eq!(outcome.parent_id, "pb-v1");
assert_eq!(outcome.parent_version, 1);
assert_eq!(outcome.new_playbook_id, "pb-v2");
assert_eq!(outcome.new_version, 2);
assert!(!outcome.superseded_at.is_empty());
let snap = pm.snapshot().await;
let v1 = snap.iter().find(|e| e.playbook_id == "pb-v1").unwrap();
let v2 = snap.iter().find(|e| e.playbook_id == "pb-v2").unwrap();
assert_eq!(v1.superseded_by.as_deref(), Some("pb-v2"));
assert!(v1.superseded_at.is_some());
assert_eq!(v2.parent_id.as_deref(), Some("pb-v1"));
assert_eq!(v2.version, 2);
assert!(v2.superseded_at.is_none());
}
#[tokio::test]
async fn revise_rejects_retired_parent() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e1 = mk("pb-v1", "Nashville", "TN");
e1.retired_at = Some(chrono::Utc::now().to_rfc3339());
pm.set_entries(vec![e1]).await.unwrap();
let err = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await
.expect_err("revise on retired parent must error");
assert!(err.contains("retired"), "error should mention retirement: {err}");
}
#[tokio::test]
async fn revise_rejects_already_superseded_parent() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
// pb-v1 is now superseded; revising it again must fail — caller
// should revise pb-v2 (the tip) instead.
let err = pm.revise_entry("pb-v1", mk("pb-v3-fake", "Nashville", "TN")).await
.expect_err("revise on superseded parent must error");
assert!(err.contains("superseded"), "error should mention supersession: {err}");
}
#[tokio::test]
async fn superseded_entries_excluded_from_boost() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
let mut v2 = mk("pb-v2", "Nashville", "TN");
v2.endorsed_names = vec!["Carol Davis".into()];
pm.revise_entry("pb-v1", v2).await.unwrap();
let boosts = pm.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 100, 0.5,
Some(("Nashville", "TN")), Some("Welder"),
).await;
// v1's endorsement (Alice Smith) should be absent — it was
// superseded. v2's endorsement (Carol Davis) should be present.
assert!(
!boosts.contains_key(&("Nashville".into(), "TN".into(), "Alice Smith".into())),
"superseded entry's endorsement must not boost"
);
let carol = boosts.get(&("Nashville".into(), "TN".into(), "Carol Davis".into()));
assert!(carol.is_some(), "tip version's endorsement must still boost");
assert!(carol.unwrap().citations.contains(&"pb-v2".to_string()));
}
#[tokio::test]
async fn history_walks_root_to_tip_from_any_node() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
pm.revise_entry("pb-v2", mk("pb-v3", "Nashville", "TN")).await.unwrap();
// Starting from the root — same chain.
let chain_from_root = pm.history("pb-v1").await;
assert_eq!(chain_from_root.len(), 3);
assert_eq!(chain_from_root[0].playbook_id, "pb-v1");
assert_eq!(chain_from_root[1].playbook_id, "pb-v2");
assert_eq!(chain_from_root[2].playbook_id, "pb-v3");
// Starting from the tip — same chain, same order.
let chain_from_tip = pm.history("pb-v3").await;
assert_eq!(chain_from_tip.len(), 3);
assert_eq!(chain_from_tip[0].playbook_id, "pb-v1");
assert_eq!(chain_from_tip[2].playbook_id, "pb-v3");
// Starting from the middle — same chain.
let chain_from_mid = pm.history("pb-v2").await;
assert_eq!(chain_from_mid.len(), 3);
assert_eq!(chain_from_mid[0].playbook_id, "pb-v1");
assert_eq!(chain_from_mid[2].playbook_id, "pb-v3");
}
#[tokio::test]
async fn history_empty_for_unknown_id() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
assert!(pm.history("pb-nonexistent").await.is_empty());
}
#[tokio::test]
async fn status_counts_reports_superseded_separately() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap();
pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap();
let (total, retired, superseded, _) = pm.status_counts().await;
assert_eq!(total, 2);
assert_eq!(retired, 0);
assert_eq!(superseded, 1);
}
#[tokio::test]
async fn legacy_entries_without_version_default_to_v1() {
// Simulate state persisted before Phase 27 — no version field.
// Serde default kicks in; entries should be treated as roots.
let json = r#"{
"entries": [{
"playbook_id": "pb-legacy",
"operation": "fill: Welder x1 in Nashville, TN",
"approach": "hybrid",
"context": "",
"timestamp": "2026-04-21T00:00:00Z",
"endorsed_names": ["Alice"],
"city": "Nashville",
"state": "TN"
}],
"last_rebuilt_at": 0,
"failures": []
}"#;
let state: PlaybookMemoryState = serde_json::from_str(json).unwrap();
let legacy = &state.entries[0];
assert_eq!(legacy.version, 1);
assert!(legacy.parent_id.is_none());
assert!(legacy.superseded_at.is_none());
assert!(legacy.superseded_by.is_none());
}
}