Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
331 lines
12 KiB
Rust
331 lines
12 KiB
Rust
/// Write-once batched append log.
|
|
///
|
|
/// Problem we're fixing: the error journal and HNSW trial journal both
|
|
/// previously did read-modify-write of their whole JSONL file on every
|
|
/// event. That's O(N²) cumulative work and generates huge churn at scale.
|
|
/// It's exactly the pattern llms3.com flags as the "small-file /
|
|
/// rewrite-amplification" anti-pattern.
|
|
///
|
|
/// This helper implements the pattern object storage actually wants:
|
|
///
|
|
/// - Events accumulate in an **in-memory buffer** (reads see them immediately).
|
|
/// - When the buffer hits a threshold, or `flush()` is called, the buffer is
|
|
/// written **as one new object** with a timestamp-sorted key.
|
|
/// - Existing objects are never rewritten.
|
|
/// - Reads enumerate all batch files, sort by key, and concat in order.
|
|
/// - An explicit `compact()` reads every batch file, writes one consolidated
|
|
/// file, and deletes the originals — the LSM-tree compaction idea applied
|
|
/// to small JSONL events.
|
|
///
|
|
/// Storage layout:
|
|
/// ```text
|
|
/// {prefix}/
|
|
/// batch_0001776319628000123.jsonl
|
|
/// batch_0001776319745987654.jsonl
|
|
/// batch_compacted_00001776319800000000.jsonl (after compact())
|
|
/// ```
|
|
/// Key format: the zero-padded epoch microsecond of the write, so lexical
|
|
/// sort == chronological sort.
|
|
|
|
use bytes::Bytes;
|
|
use chrono::Utc;
|
|
use object_store::ObjectStore;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
|
|
use crate::ops;
|
|
|
|
const DEFAULT_FLUSH_THRESHOLD: usize = 32;
|
|
|
|
pub struct AppendLog {
|
|
store: Arc<dyn ObjectStore>,
|
|
prefix: String,
|
|
buffer: Mutex<Vec<Vec<u8>>>,
|
|
flush_threshold: usize,
|
|
}
|
|
|
|
impl AppendLog {
|
|
/// Create a new append log rooted at `prefix` in the given object store.
|
|
/// Events auto-flush when buffer reaches `flush_threshold` (default 32).
|
|
pub fn new(store: Arc<dyn ObjectStore>, prefix: impl Into<String>) -> Self {
|
|
Self {
|
|
store,
|
|
prefix: prefix.into(),
|
|
buffer: Mutex::new(Vec::new()),
|
|
flush_threshold: DEFAULT_FLUSH_THRESHOLD,
|
|
}
|
|
}
|
|
|
|
pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
|
|
self.flush_threshold = threshold.max(1);
|
|
self
|
|
}
|
|
|
|
/// Add an event. The returned future completes either immediately
|
|
/// (buffered) or after a flush, depending on whether the buffer hit the
|
|
/// threshold. Callers don't need to care either way.
|
|
pub async fn append(&self, line: Vec<u8>) -> Result<(), String> {
|
|
let should_flush = {
|
|
let mut buf = self.buffer.lock().await;
|
|
buf.push(line);
|
|
buf.len() >= self.flush_threshold
|
|
};
|
|
if should_flush {
|
|
self.flush().await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Force-flush the in-memory buffer to object storage as a single new
|
|
/// batch file. Safe to call anytime (idempotent no-op when buffer empty).
|
|
pub async fn flush(&self) -> Result<(), String> {
|
|
let batch = {
|
|
let mut buf = self.buffer.lock().await;
|
|
if buf.is_empty() { return Ok(()); }
|
|
std::mem::take(&mut *buf)
|
|
};
|
|
|
|
let ts_us = Utc::now().timestamp_micros().max(0) as u128;
|
|
let key = format!("{}/batch_{:019}.jsonl", self.prefix, ts_us);
|
|
|
|
let mut body = Vec::with_capacity(batch.iter().map(|b| b.len() + 1).sum());
|
|
for line in batch {
|
|
body.extend_from_slice(&line);
|
|
if !body.ends_with(b"\n") { body.push(b'\n'); }
|
|
}
|
|
ops::put(&self.store, &key, Bytes::from(body)).await
|
|
}
|
|
|
|
/// Read every event across all batch files + unflushed in-memory buffer.
|
|
/// Events are returned in chronological order.
|
|
pub async fn read_all(&self) -> Result<Vec<Vec<u8>>, String> {
|
|
let mut keys = self.list_batch_keys().await?;
|
|
keys.sort();
|
|
|
|
let mut out = Vec::new();
|
|
for key in keys {
|
|
let bytes = ops::get(&self.store, &key).await?;
|
|
for line in bytes.split(|b| *b == b'\n') {
|
|
if !line.is_empty() {
|
|
out.push(line.to_vec());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Include unflushed events so callers see the latest state
|
|
// whether or not someone ran flush() recently.
|
|
let buf = self.buffer.lock().await;
|
|
for line in buf.iter() {
|
|
out.push(line.clone());
|
|
}
|
|
|
|
Ok(out)
|
|
}
|
|
|
|
/// Consolidate all current batch files into one compacted file, then
|
|
/// delete the originals. Safe to call while appends are in flight:
|
|
/// new batches written during compaction get a higher timestamp and
|
|
/// survive. Fails closed — if anything goes wrong mid-delete, the
|
|
/// compacted file coexists with originals and next read sees duplicates
|
|
/// (which the dedup caller must handle) rather than data loss.
|
|
pub async fn compact(&self) -> Result<CompactStats, String> {
|
|
// Snapshot which files to compact BEFORE we write the new one.
|
|
let mut originals = self.list_batch_keys().await?;
|
|
originals.sort();
|
|
|
|
if originals.len() < 2 {
|
|
return Ok(CompactStats { merged_files: originals.len(), events: 0, new_key: None });
|
|
}
|
|
|
|
// Gather all existing events.
|
|
let mut events = Vec::new();
|
|
for key in &originals {
|
|
let bytes = ops::get(&self.store, key).await?;
|
|
for line in bytes.split(|b| *b == b'\n') {
|
|
if !line.is_empty() {
|
|
events.push(line.to_vec());
|
|
}
|
|
}
|
|
}
|
|
|
|
let total_events = events.len();
|
|
if total_events == 0 {
|
|
// Clean up empty files without writing a new one.
|
|
for key in &originals {
|
|
let _ = ops::delete(&self.store, key).await;
|
|
}
|
|
return Ok(CompactStats { merged_files: originals.len(), events: 0, new_key: None });
|
|
}
|
|
|
|
// Name the compacted file with the SAME `batch_{ts}` format so it
|
|
// sorts chronologically with future batches. Using a distinct prefix
|
|
// ("batch_compacted_") would break lex ordering: later `batch_N`
|
|
// files would sort BEFORE the compacted file because 'c' > digits.
|
|
// Timestamp = now, so any appends arriving during compaction (which
|
|
// get the current wall-clock time) sort AFTER this file.
|
|
let ts_us = Utc::now().timestamp_micros().max(0) as u128;
|
|
let new_key = format!("{}/batch_{:019}.jsonl", self.prefix, ts_us);
|
|
|
|
let mut body = Vec::new();
|
|
for line in &events {
|
|
body.extend_from_slice(line);
|
|
body.push(b'\n');
|
|
}
|
|
ops::put(&self.store, &new_key, Bytes::from(body)).await?;
|
|
|
|
// Only delete originals once the consolidated file is persisted.
|
|
let mut failures = 0;
|
|
for key in &originals {
|
|
if ops::delete(&self.store, key).await.is_err() {
|
|
failures += 1;
|
|
}
|
|
}
|
|
if failures > 0 {
|
|
tracing::warn!(
|
|
"compact '{}': {} original files failed to delete — consolidated file {} has the data",
|
|
self.prefix, failures, new_key,
|
|
);
|
|
}
|
|
|
|
Ok(CompactStats {
|
|
merged_files: originals.len(),
|
|
events: total_events,
|
|
new_key: Some(new_key),
|
|
})
|
|
}
|
|
|
|
/// How many batch files exist on disk right now.
|
|
pub async fn file_count(&self) -> Result<usize, String> {
|
|
Ok(self.list_batch_keys().await?.len())
|
|
}
|
|
|
|
async fn list_batch_keys(&self) -> Result<Vec<String>, String> {
|
|
let prefix_with_slash = format!("{}/", self.prefix);
|
|
// list the prefix then filter for keys that match our naming scheme;
|
|
// unrelated files at the same prefix won't be touched.
|
|
let raw = ops::list(&self.store, Some(&prefix_with_slash)).await?;
|
|
Ok(raw
|
|
.into_iter()
|
|
.filter(|k| {
|
|
let basename = k.rsplit('/').next().unwrap_or(k);
|
|
basename.starts_with("batch_") && basename.ends_with(".jsonl")
|
|
})
|
|
.collect())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, serde::Serialize)]
|
|
pub struct CompactStats {
|
|
pub merged_files: usize,
|
|
pub events: usize,
|
|
pub new_key: Option<String>,
|
|
}
|
|
|
|
// Log unflushed-buffer size on drop. We can't `.await` from a sync `Drop`,
|
|
// so a real flush isn't possible here — callers are responsible for calling
|
|
// `.flush()` before dropping if durability matters. These journals are
|
|
// observability hints; a few lost buffered events at shutdown are
|
|
// acceptable per ADR-018.
|
|
impl Drop for AppendLog {
|
|
fn drop(&mut self) {
|
|
let buf_len = self.buffer.try_lock().map(|b| b.len()).unwrap_or(0);
|
|
if buf_len == 0 { return; }
|
|
tracing::debug!(
|
|
"append_log '{}' dropping with {} unflushed events",
|
|
self.prefix, buf_len,
|
|
);
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use object_store::memory::InMemory;
|
|
|
|
fn mk(threshold: usize) -> AppendLog {
|
|
AppendLog::new(Arc::new(InMemory::new()), "prefix")
|
|
.with_flush_threshold(threshold)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn append_stays_buffered_below_threshold() {
|
|
let log = mk(5);
|
|
log.append(b"one".to_vec()).await.unwrap();
|
|
log.append(b"two".to_vec()).await.unwrap();
|
|
assert_eq!(log.file_count().await.unwrap(), 0, "no files until threshold");
|
|
let all = log.read_all().await.unwrap();
|
|
assert_eq!(all, vec![b"one".to_vec(), b"two".to_vec()],
|
|
"read_all surfaces unflushed buffer");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn append_auto_flushes_on_threshold() {
|
|
let log = mk(3);
|
|
for i in 0..3 {
|
|
log.append(format!("evt{i}").into_bytes()).await.unwrap();
|
|
}
|
|
assert_eq!(log.file_count().await.unwrap(), 1, "threshold triggered one flush");
|
|
|
|
// A fourth append stays buffered until the next threshold.
|
|
log.append(b"evt3".to_vec()).await.unwrap();
|
|
assert_eq!(log.file_count().await.unwrap(), 1, "below threshold again");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn flush_empty_is_noop() {
|
|
let log = mk(32);
|
|
log.flush().await.unwrap();
|
|
log.flush().await.unwrap();
|
|
assert_eq!(log.file_count().await.unwrap(), 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn read_all_orders_events_across_flushes() {
|
|
let log = mk(1); // flush-on-every-append
|
|
for i in 0..5 {
|
|
log.append(format!("e{i}").into_bytes()).await.unwrap();
|
|
// Spread writes out so timestamps sort strictly.
|
|
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
|
|
}
|
|
let all = log.read_all().await.unwrap();
|
|
let strs: Vec<String> = all.into_iter()
|
|
.map(|v| String::from_utf8(v).unwrap())
|
|
.collect();
|
|
assert_eq!(strs, vec!["e0", "e1", "e2", "e3", "e4"],
|
|
"lex sort of batch keys == chronological event order");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn compact_merges_multiple_files_into_one() {
|
|
let log = mk(1); // force file-per-append
|
|
for i in 0..4 {
|
|
log.append(format!("e{i}").into_bytes()).await.unwrap();
|
|
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
|
|
}
|
|
assert_eq!(log.file_count().await.unwrap(), 4);
|
|
|
|
let stats = log.compact().await.unwrap();
|
|
assert_eq!(stats.merged_files, 4);
|
|
assert_eq!(stats.events, 4);
|
|
assert!(stats.new_key.is_some());
|
|
|
|
assert_eq!(log.file_count().await.unwrap(), 1, "originals deleted, 1 survivor");
|
|
let all = log.read_all().await.unwrap();
|
|
assert_eq!(all.len(), 4, "no events lost in compaction");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn compact_with_single_file_is_noop() {
|
|
let log = mk(1);
|
|
log.append(b"only".to_vec()).await.unwrap();
|
|
assert_eq!(log.file_count().await.unwrap(), 1);
|
|
|
|
let stats = log.compact().await.unwrap();
|
|
assert_eq!(stats.merged_files, 1);
|
|
assert_eq!(stats.events, 0, "nothing to consolidate");
|
|
assert!(stats.new_key.is_none(), "no new file written");
|
|
assert_eq!(log.file_count().await.unwrap(), 1, "original untouched");
|
|
}
|
|
}
|
|
|