Path 1 negative signal + Path 2 pattern discovery + name validation

New:
- /vectors/playbook_memory/patterns: meta-index pattern discovery.
  Given a query, finds top-K similar playbooks, pulls each endorsed
  worker's full workers_500k profile, aggregates shared traits (cert
  frequencies, skill frequencies, modal archetype, reliability
  distribution), returns a human-readable discovered_pattern. Surfaces
  signals operators didn't explicitly query — the original PRD's
  "identify things we didn't know" dimension.
- /vectors/playbook_memory/mark_failed: records worker failures per
  (city, state, name). compute_boost_for applies 0.5^n penalty per
  recorded failure, so 3 failures quarter a worker's positive boost and
  5 effectively zero it. Path 1 negative signal — recruiter trust
  depends on the system NOT recommending people who no-showed.
- Bun /log_failure: validates failed_names against workers_500k
  (same ghost-guard as /log), forwards to /mark_failed.

Improved:
- /log now validates endorsed_names against workers_500k for the
  contract's city+state before seeding. Ghost names (names that don't
  correspond to real workers) are rejected in the response and excluded
  from the seed, preventing silent boost failures.
- Bun /search auto-appends `CAST(availability AS DOUBLE) > 0.5` to
  sql_filter when the caller didn't constrain availability. Opt out
  with `include_unavailable: true`. Recruiter trust bug: surfacing
  already-placed workers breaks the first call.
- DEFAULT_TOP_K_PLAYBOOKS 25 → 100. Direct cosine measurement showed
  similarities cluster 0.55-0.67 across all playbooks regardless of
  geo, so k=25 missed relevant geo-matched playbooks. Brute-force is
  still sub-ms at this size.

Verified end-to-end on live data:
- Ghost names rejected on /log + /log_failure
- Availability filter drops unavailable workers from candidate pool
- Pattern discovery on unseen Cleveland OH Welder query returned
  recurring skills (first aid 43%, grinder 43%, blueprint 43%) and
  modal archetype (specialist) across 20 semantically similar past
  playbooks in 0.24s
- Negative signal: Helen Sanchez boost dropped +0.250 → +0.163 after
  3 failures recorded via /log_failure (34% reduction)
This commit is contained in:
root 2026-04-20 14:55:46 -05:00
parent 20b0289aa9
commit 95c26f04f8
3 changed files with 149 additions and 7 deletions

View File

@ -47,11 +47,15 @@ const STATE_KEY: &str = "_playbook_memory/state.json";
pub const MAX_BOOST_PER_WORKER: f32 = 0.25; pub const MAX_BOOST_PER_WORKER: f32 = 0.25;
/// Default number of past playbooks to consider when ranking the current /// Default number of past playbooks to consider when ranking the current
/// operation. Bumped 5 → 25 on 2026-04-20 because at >100 entries in /// operation. Bumped 25 → 100 on 2026-04-20 (second revision) after
/// memory the old default missed too many relevant playbooks — boost /// direct measurement showed cosine similarities cluster in a narrow band
/// silently failed even when the seeded workers were ideal matches. /// (0.55-0.67) across all playbooks regardless of geo — the embedding
/// 25 is brute-force-cheap (sub-ms) and covers most live operator memory. /// model doesn't discriminate city/role strongly enough. k=25 missed
pub const DEFAULT_TOP_K_PLAYBOOKS: usize = 25; /// relevant Toledo Welder playbooks even when they existed; k=100
/// includes them comfortably. Brute-force remains sub-ms at this size.
/// Deeper fix: filter playbooks by (target_city, target_state) in the
/// request before similarity ranking — deferred.
pub const DEFAULT_TOP_K_PLAYBOOKS: usize = 100;
/// Half-life of a playbook's contribution to boost, in days. A playbook /// Half-life of a playbook's contribution to boost, in days. A playbook
/// 30 days old contributes half what a fresh one would; 60 days old, a /// 30 days old contributes half what a fresh one would; 60 days old, a
@ -83,6 +87,18 @@ pub struct PlaybookEntry {
pub embedding: Option<Vec<f32>>, pub embedding: Option<Vec<f32>>,
} }
/// A recorded failure — worker who didn't deliver on a contract.
/// Tracked per (city, state, name) so a single worker's failures on
/// Toledo Welder contracts don't penalize the same name in Chicago.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureRecord {
pub city: String,
pub state: String,
pub name: String,
pub reason: String,
pub timestamp: String,
}
/// Persisted / in-memory state. /// Persisted / in-memory state.
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct PlaybookMemoryState { struct PlaybookMemoryState {
@ -90,6 +106,11 @@ struct PlaybookMemoryState {
/// Unix epoch millis when the last rebuild completed. Caller can /// Unix epoch millis when the last rebuild completed. Caller can
/// use this to gate "stale > N hours → trigger rebuild" behavior. /// use this to gate "stale > N hours → trigger rebuild" behavior.
last_rebuilt_at: i64, last_rebuilt_at: i64,
/// Failed-fill records. Path 1 negative signal — every entry here
/// dampens the positive boost for its (city, state, name) key by
/// half per failure count, so three failures zero the boost.
#[serde(default)]
failures: Vec<FailureRecord>,
} }
/// Per-worker boost payload. `citations` lets the response layer show /// Per-worker boost payload. `citations` lets the response layer show
@ -154,6 +175,29 @@ impl PlaybookMemory {
self.state.read().await.entries.clone() self.state.read().await.entries.clone()
} }
/// Record failure(s). Each added `FailureRecord` is an additional
/// penalty against that worker's positive boost for the same geo.
pub async fn mark_failures(&self, new_failures: Vec<FailureRecord>) -> Result<usize, String> {
if new_failures.is_empty() { return Ok(0); }
let added = new_failures.len();
let mut s = self.state.write().await;
s.failures.extend(new_failures);
drop(s);
self.persist().await?;
Ok(added)
}
/// Count failures per (city, state, name) key. Used by compute_boost_for
/// to dampen positive boost.
pub async fn failure_counts(&self) -> HashMap<(String, String, String), usize> {
let s = self.state.read().await;
let mut counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &s.failures {
*counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
counts
}
/// Given an operation's embedding, find the top-K most similar past /// Given an operation's embedding, find the top-K most similar past
/// playbooks (by cosine similarity) and return a per-worker boost map /// playbooks (by cosine similarity) and return a per-worker boost map
/// keyed by (city, state, name). Worker is matched by the tuple so a /// keyed by (city, state, name). Worker is matched by the tuple so a
@ -170,7 +214,15 @@ impl PlaybookMemory {
top_k_playbooks: usize, top_k_playbooks: usize,
base_weight: f32, base_weight: f32,
) -> HashMap<(String, String, String), BoostEntry> { ) -> HashMap<(String, String, String), BoostEntry> {
let entries = self.state.read().await.entries.clone(); let state = self.state.read().await;
let entries = state.entries.clone();
// Build failure map once before dropping the lock so we don't
// hold the read lock across the full scoring loop.
let mut failure_counts: HashMap<(String, String, String), usize> = HashMap::new();
for f in &state.failures {
*failure_counts.entry((f.city.clone(), f.state.clone(), f.name.clone())).or_insert(0) += 1;
}
drop(state);
// Brute-force cosine. Empty / missing embeddings just skip. // Brute-force cosine. Empty / missing embeddings just skip.
let mut scored: Vec<(f32, &PlaybookEntry)> = entries let mut scored: Vec<(f32, &PlaybookEntry)> = entries
@ -205,11 +257,17 @@ impl PlaybookMemory {
let per_worker = similarity * base_weight * decay / (n_workers as f32); let per_worker = similarity * base_weight * decay / (n_workers as f32);
for name in &pb.endorsed_names { for name in &pb.endorsed_names {
let key = (city.clone(), state.clone(), name.clone()); let key = (city.clone(), state.clone(), name.clone());
// Path 1 negative signal — each recorded failure halves
// this worker's contribution. Three failures → 0.125x.
// Five failures → ~0.03x (effectively zero). Caps at
// 20 before we clamp to avoid degen cases.
let fail_count = failure_counts.get(&key).copied().unwrap_or(0).min(20);
let penalty = 0.5_f32.powi(fail_count as i32);
let entry = boosts.entry(key).or_insert(BoostEntry { let entry = boosts.entry(key).or_insert(BoostEntry {
boost: 0.0, boost: 0.0,
citations: Vec::new(), citations: Vec::new(),
}); });
entry.boost = (entry.boost + per_worker).min(MAX_BOOST_PER_WORKER); entry.boost = (entry.boost + per_worker * penalty).min(MAX_BOOST_PER_WORKER);
if !entry.citations.contains(&pb.playbook_id) { if !entry.citations.contains(&pb.playbook_id) {
entry.citations.push(pb.playbook_id.clone()); entry.citations.push(pb.playbook_id.clone());
} }

View File

@ -124,6 +124,7 @@ pub fn router(state: VectorState) -> Router {
.route("/playbook_memory/seed", post(seed_playbook_memory)) .route("/playbook_memory/seed", post(seed_playbook_memory))
.route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql)) .route("/playbook_memory/persist_sql", post(persist_playbook_memory_sql))
.route("/playbook_memory/patterns", post(discover_playbook_patterns)) .route("/playbook_memory/patterns", post(discover_playbook_patterns))
.route("/playbook_memory/mark_failed", post(mark_playbook_failed))
.with_state(state) .with_state(state)
} }
@ -2242,6 +2243,47 @@ async fn discover_playbook_patterns(
} }
} }
#[derive(Deserialize)]
struct MarkFailedRequest {
/// Operation text, same shape as seed: "fill: Role xN in City, ST"
operation: String,
/// Names of workers who didn't deliver on the fill.
failed_names: Vec<String>,
/// Short reason (no-show, fired, unreliable). Stored verbatim.
#[serde(default)]
reason: String,
}
async fn mark_playbook_failed(
State(state): State<VectorState>,
Json(req): Json<MarkFailedRequest>,
) -> impl IntoResponse {
// Parse city + state from the operation — mirrors seed's parser.
let after_in = req.operation.split(" in ").nth(1).unwrap_or("");
let mut parts = after_in.splitn(2, ',');
let city = parts.next().map(|s| s.trim().to_string()).filter(|s| !s.is_empty());
let state_ = parts.next().map(|s|
s.trim().chars().take_while(|c| c.is_ascii_alphabetic()).collect::<String>()
).filter(|s| !s.is_empty());
let (Some(city), Some(state_code)) = (city, state_) else {
return Err((StatusCode::BAD_REQUEST,
"operation must match 'fill: Role xN in City, ST' shape".into()));
};
let ts = chrono::Utc::now().to_rfc3339();
let records: Vec<playbook_memory::FailureRecord> = req.failed_names.iter()
.map(|n| playbook_memory::FailureRecord {
city: city.clone(), state: state_code.clone(), name: n.clone(),
reason: req.reason.clone(), timestamp: ts.clone(),
})
.collect();
match state.playbook_memory.mark_failures(records).await {
Ok(added) => Ok(Json(serde_json::json!({ "added": added, "city": city, "state": state_code }))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn playbook_memory_stats( async fn playbook_memory_stats(
State(state): State<VectorState>, State(state): State<VectorState>,
) -> impl IntoResponse { ) -> impl IntoResponse {

View File

@ -514,6 +514,48 @@ async function main() {
}); });
} }
// Tool: log FAILED fill — negative signal for Phase 19 boost.
// Workers named here get a 0.5^n penalty on future positive
// boosts in the same (city, state). Three failures effectively
// zero the boost; five make the worker invisible to the re-rank.
// Names are validated against workers_500k same as /log.
if (url.pathname === "/log_failure") {
const b = await json();
const opMatch = String(b.operation || "").match(/ in ([^,]+),\s*([A-Za-z]+)/);
const city = opMatch ? opMatch[1].trim() : "";
const state = opMatch ? opMatch[2].trim() : "";
const rawNames: string[] = Array.isArray(b.failed_names) ? b.failed_names : [];
if (!city || !state) {
return err("operation must be 'fill: Role xN in City, ST'", 400);
}
if (rawNames.length === 0) return err("failed_names must be a non-empty array", 400);
const quoted = rawNames.map((n: string) => `'${n.replace(/'/g, "''")}'`).join(",");
const sql = `SELECT DISTINCT name FROM workers_500k `
+ `WHERE name IN (${quoted}) AND city = '${city.replace(/'/g,"''")}' `
+ `AND state = '${state.replace(/'/g,"''")}'`;
const vr = await api("POST", "/query/sql", { sql }).catch(() => ({ rows: [] as any[] })) as any;
const found = new Set((vr.rows ?? []).map((r: any) => r.name));
const failed_names = rawNames.filter((n: string) => found.has(n));
const rejected = rawNames.filter((n: string) => !found.has(n));
if (failed_names.length === 0) {
return ok({ marked: 0, rejected_ghost_names: rejected,
note: "no failed_names matched workers_500k for this geo" });
}
const mr = await api("POST", "/vectors/playbook_memory/mark_failed", {
operation: b.operation,
failed_names,
reason: b.reason || "",
});
return ok({
marked: mr?.added ?? 0,
rejected_ghost_names: rejected,
city, state,
note: `Each marked worker's positive boost in ${city}, ${state} is halved per recorded failure.`,
});
}
// Tool: get playbooks // Tool: get playbooks
if (url.pathname === "/playbooks") { if (url.pathname === "/playbooks") {
const kw = url.searchParams.get("keyword"); const kw = url.searchParams.get("keyword");