Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
290 lines
10 KiB
Rust
290 lines
10 KiB
Rust
/// Bucket operation error journal.
|
|
///
|
|
/// Every bucket op failure and every rescue fallback lands here. Goal:
|
|
/// answering "is anything broken?" with one HTTP call.
|
|
///
|
|
/// Storage: batched write-once files under `_errors/bucket_errors/` in the
|
|
/// primary bucket. Uses the shared `AppendLog` helper so we never rewrite
|
|
/// existing files — see `append_log.rs` for the full pattern rationale.
|
|
///
|
|
/// In-memory ring buffer holds the last N events for fast response; on
|
|
/// startup `load_recent` hydrates it from all batch files.
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use object_store::ObjectStore;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::VecDeque;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
|
|
use crate::append_log::{AppendLog, CompactStats};
|
|
|
|
const JOURNAL_PREFIX: &str = "_errors/bucket_errors";
|
|
const RING_CAPACITY: usize = 2000;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum BucketOp {
|
|
Read,
|
|
Write,
|
|
Delete,
|
|
List,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct BucketErrorEvent {
|
|
pub ts: DateTime<Utc>,
|
|
pub op: BucketOp,
|
|
pub target: String,
|
|
pub key: String,
|
|
pub error: String,
|
|
#[serde(default)]
|
|
pub rescued: bool,
|
|
}
|
|
|
|
impl BucketErrorEvent {
|
|
pub fn new_read(target: &str, key: &str, error: &str) -> Self {
|
|
Self { ts: Utc::now(), op: BucketOp::Read, target: target.into(), key: key.into(), error: error.into(), rescued: false }
|
|
}
|
|
pub fn new_write(target: &str, key: &str, error: &str) -> Self {
|
|
Self { ts: Utc::now(), op: BucketOp::Write, target: target.into(), key: key.into(), error: error.into(), rescued: false }
|
|
}
|
|
pub fn new_delete(target: &str, key: &str, error: &str) -> Self {
|
|
Self { ts: Utc::now(), op: BucketOp::Delete, target: target.into(), key: key.into(), error: error.into(), rescued: false }
|
|
}
|
|
pub fn new_list(target: &str, prefix: &str, error: &str) -> Self {
|
|
Self { ts: Utc::now(), op: BucketOp::List, target: target.into(), key: prefix.into(), error: error.into(), rescued: false }
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct ErrorJournal {
|
|
log: Arc<AppendLog>,
|
|
ring: Arc<RwLock<VecDeque<BucketErrorEvent>>>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct HealthReport {
|
|
pub period_minutes: i64,
|
|
pub total_errors: usize,
|
|
pub per_bucket: std::collections::HashMap<String, usize>,
|
|
pub unhealthy_buckets: Vec<String>,
|
|
}
|
|
|
|
impl ErrorJournal {
|
|
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
|
|
// Keep flush threshold lowish — operators checking /storage/errors
|
|
// after a recent incident want to see fresh rows even without an
|
|
// explicit flush.
|
|
let log = Arc::new(
|
|
AppendLog::new(store, JOURNAL_PREFIX).with_flush_threshold(8),
|
|
);
|
|
Self {
|
|
log,
|
|
ring: Arc::new(RwLock::new(VecDeque::with_capacity(RING_CAPACITY))),
|
|
}
|
|
}
|
|
|
|
/// Hydrate the ring buffer from existing batch files. Call once at
|
|
/// startup. Tolerates malformed lines (skipped with a warning) and
|
|
/// missing files (returns 0).
|
|
pub async fn load_recent(&self) -> Result<usize, String> {
|
|
let lines = self.log.read_all().await.unwrap_or_default();
|
|
let mut ring = self.ring.write().await;
|
|
for line in lines {
|
|
match serde_json::from_slice::<BucketErrorEvent>(&line) {
|
|
Ok(ev) => {
|
|
if ring.len() >= RING_CAPACITY { ring.pop_front(); }
|
|
ring.push_back(ev);
|
|
}
|
|
Err(e) => tracing::warn!("error journal: skip malformed line ({e})"),
|
|
}
|
|
}
|
|
Ok(ring.len())
|
|
}
|
|
|
|
/// Append an event. In-memory ring updated immediately; persistence
|
|
/// happens in batches via the underlying AppendLog.
|
|
pub async fn append(&self, event: BucketErrorEvent) {
|
|
{
|
|
let mut ring = self.ring.write().await;
|
|
if ring.len() >= RING_CAPACITY { ring.pop_front(); }
|
|
ring.push_back(event.clone());
|
|
}
|
|
|
|
match serde_json::to_vec(&event) {
|
|
Ok(line) => {
|
|
if let Err(e) = self.log.append(line).await {
|
|
tracing::error!("error journal persist failed: {e}");
|
|
}
|
|
}
|
|
Err(e) => tracing::error!("error journal serialize failed: {e}"),
|
|
}
|
|
}
|
|
|
|
/// Mark the most recent matching in-memory event as rescued.
|
|
/// Only updates the ring buffer — the JSONL line already persisted
|
|
/// records the failure fact; rescue status travels in response headers.
|
|
pub async fn mark_rescued_last(&self, target: &str, key: &str) {
|
|
let mut ring = self.ring.write().await;
|
|
for ev in ring.iter_mut().rev() {
|
|
if matches!(ev.op, BucketOp::Read) && ev.target == target && ev.key == key && !ev.rescued {
|
|
ev.rescued = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn recent(&self, limit: usize) -> Vec<BucketErrorEvent> {
|
|
let ring = self.ring.read().await;
|
|
let start = ring.len().saturating_sub(limit);
|
|
ring.iter().skip(start).cloned().collect()
|
|
}
|
|
|
|
pub async fn filter(
|
|
&self,
|
|
bucket: Option<&str>,
|
|
since: Option<DateTime<Utc>>,
|
|
limit: usize,
|
|
) -> Vec<BucketErrorEvent> {
|
|
let ring = self.ring.read().await;
|
|
ring.iter()
|
|
.rev()
|
|
.filter(|ev| bucket.map_or(true, |b| ev.target == b))
|
|
.filter(|ev| since.map_or(true, |s| ev.ts >= s))
|
|
.take(limit)
|
|
.cloned()
|
|
.collect::<Vec<_>>()
|
|
.into_iter()
|
|
.rev()
|
|
.collect()
|
|
}
|
|
|
|
/// Summarize errors in the last N minutes.
|
|
pub async fn health(&self, period_minutes: i64) -> HealthReport {
|
|
use std::collections::HashMap;
|
|
let cutoff = Utc::now() - chrono::Duration::minutes(period_minutes);
|
|
let ring = self.ring.read().await;
|
|
let recent: Vec<_> = ring.iter().filter(|ev| ev.ts >= cutoff).collect();
|
|
let mut per_bucket: HashMap<String, usize> = HashMap::new();
|
|
for ev in &recent {
|
|
*per_bucket.entry(ev.target.clone()).or_insert(0) += 1;
|
|
}
|
|
let unhealthy_buckets: Vec<String> = per_bucket
|
|
.iter()
|
|
.filter(|(_, c)| **c >= 3)
|
|
.map(|(k, _)| k.clone())
|
|
.collect();
|
|
HealthReport {
|
|
period_minutes,
|
|
total_errors: recent.len(),
|
|
per_bucket,
|
|
unhealthy_buckets,
|
|
}
|
|
}
|
|
|
|
/// Force an immediate flush of buffered events to object storage.
|
|
pub async fn flush(&self) -> Result<(), String> {
|
|
self.log.flush().await
|
|
}
|
|
|
|
/// Consolidate all batch files into one. Operator cleanup.
|
|
pub async fn compact(&self) -> Result<CompactStats, String> {
|
|
self.log.compact().await
|
|
}
|
|
|
|
/// How many JSONL batch files currently exist.
|
|
pub async fn file_count(&self) -> Result<usize, String> {
|
|
self.log.file_count().await
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use object_store::memory::InMemory;
|
|
|
|
fn mk() -> ErrorJournal {
|
|
ErrorJournal::new(Arc::new(InMemory::new()))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn append_updates_ring_immediately() {
|
|
let j = mk();
|
|
j.append(BucketErrorEvent::new_read("tenant_a", "k1", "timeout")).await;
|
|
j.append(BucketErrorEvent::new_write("tenant_b", "k2", "denied")).await;
|
|
|
|
let recent = j.recent(10).await;
|
|
assert_eq!(recent.len(), 2);
|
|
assert_eq!(recent[0].target, "tenant_a");
|
|
assert_eq!(recent[1].target, "tenant_b");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn load_recent_rehydrates_ring_from_disk() {
|
|
// Use a shared InMemory store so the second journal reads the first
|
|
// journal's writes — mirrors gateway restart semantics.
|
|
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
|
{
|
|
let j1 = ErrorJournal {
|
|
log: Arc::new(AppendLog::new(store.clone(), JOURNAL_PREFIX).with_flush_threshold(1)),
|
|
ring: Arc::new(RwLock::new(VecDeque::with_capacity(RING_CAPACITY))),
|
|
};
|
|
j1.append(BucketErrorEvent::new_read("t", "k", "err")).await;
|
|
j1.flush().await.unwrap();
|
|
}
|
|
|
|
let j2 = ErrorJournal {
|
|
log: Arc::new(AppendLog::new(store.clone(), JOURNAL_PREFIX)),
|
|
ring: Arc::new(RwLock::new(VecDeque::with_capacity(RING_CAPACITY))),
|
|
};
|
|
assert_eq!(j2.recent(10).await.len(), 0, "fresh ring is empty");
|
|
|
|
let loaded = j2.load_recent().await.unwrap();
|
|
assert_eq!(loaded, 1);
|
|
assert_eq!(j2.recent(10).await[0].target, "t");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mark_rescued_updates_most_recent_match() {
|
|
let j = mk();
|
|
j.append(BucketErrorEvent::new_read("t1", "k", "err")).await;
|
|
j.append(BucketErrorEvent::new_read("t2", "k", "err")).await;
|
|
j.append(BucketErrorEvent::new_read("t1", "k", "err")).await;
|
|
|
|
j.mark_rescued_last("t1", "k").await;
|
|
|
|
let recent = j.recent(10).await;
|
|
assert!(!recent[0].rescued, "older t1 event not touched");
|
|
assert!(!recent[1].rescued, "t2 not touched");
|
|
assert!(recent[2].rescued, "most recent t1 was rescued");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn health_flags_buckets_with_three_or_more_errors() {
|
|
let j = mk();
|
|
for _ in 0..3 { j.append(BucketErrorEvent::new_read("broken", "k", "e")).await; }
|
|
j.append(BucketErrorEvent::new_read("flaky", "k", "e")).await;
|
|
|
|
let h = j.health(60).await;
|
|
assert_eq!(h.total_errors, 4);
|
|
assert_eq!(h.per_bucket.get("broken"), Some(&3));
|
|
assert_eq!(h.per_bucket.get("flaky"), Some(&1));
|
|
assert!(h.unhealthy_buckets.contains(&"broken".to_string()));
|
|
assert!(!h.unhealthy_buckets.contains(&"flaky".to_string()));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn filter_by_bucket_returns_chronological_order() {
|
|
let j = mk();
|
|
j.append(BucketErrorEvent::new_read("a", "k1", "e")).await;
|
|
j.append(BucketErrorEvent::new_read("b", "k2", "e")).await;
|
|
j.append(BucketErrorEvent::new_read("a", "k3", "e")).await;
|
|
|
|
let only_a = j.filter(Some("a"), None, 10).await;
|
|
assert_eq!(only_a.len(), 2);
|
|
assert_eq!(only_a[0].key, "k1", "oldest first");
|
|
assert_eq!(only_a[1].key, "k3");
|
|
}
|
|
}
|