lakehouse/crates/storaged/src/append_log.rs
profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

331 lines
12 KiB
Rust

/// Write-once batched append log.
///
/// Problem we're fixing: the error journal and HNSW trial journal both
/// previously did read-modify-write of their whole JSONL file on every
/// event. That's O(N²) cumulative work and generates huge churn at scale.
/// It's exactly the pattern llms3.com flags as the "small-file /
/// rewrite-amplification" anti-pattern.
///
/// This helper implements the pattern object storage actually wants:
///
/// - Events accumulate in an **in-memory buffer** (reads see them immediately).
/// - When the buffer hits a threshold, or `flush()` is called, the buffer is
/// written **as one new object** with a timestamp-sorted key.
/// - Existing objects are never rewritten.
/// - Reads enumerate all batch files, sort by key, and concat in order.
/// - An explicit `compact()` reads every batch file, writes one consolidated
/// file, and deletes the originals — the LSM-tree compaction idea applied
/// to small JSONL events.
///
/// Storage layout:
/// ```text
/// {prefix}/
/// batch_0001776319628000123.jsonl
/// batch_0001776319745987654.jsonl
/// batch_compacted_00001776319800000000.jsonl (after compact())
/// ```
/// Key format: the zero-padded epoch microsecond of the write, so lexical
/// sort == chronological sort.
use bytes::Bytes;
use chrono::Utc;
use object_store::ObjectStore;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::ops;
const DEFAULT_FLUSH_THRESHOLD: usize = 32;
pub struct AppendLog {
store: Arc<dyn ObjectStore>,
prefix: String,
buffer: Mutex<Vec<Vec<u8>>>,
flush_threshold: usize,
}
impl AppendLog {
/// Create a new append log rooted at `prefix` in the given object store.
/// Events auto-flush when buffer reaches `flush_threshold` (default 32).
pub fn new(store: Arc<dyn ObjectStore>, prefix: impl Into<String>) -> Self {
Self {
store,
prefix: prefix.into(),
buffer: Mutex::new(Vec::new()),
flush_threshold: DEFAULT_FLUSH_THRESHOLD,
}
}
pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
self.flush_threshold = threshold.max(1);
self
}
/// Add an event. The returned future completes either immediately
/// (buffered) or after a flush, depending on whether the buffer hit the
/// threshold. Callers don't need to care either way.
pub async fn append(&self, line: Vec<u8>) -> Result<(), String> {
let should_flush = {
let mut buf = self.buffer.lock().await;
buf.push(line);
buf.len() >= self.flush_threshold
};
if should_flush {
self.flush().await?;
}
Ok(())
}
/// Force-flush the in-memory buffer to object storage as a single new
/// batch file. Safe to call anytime (idempotent no-op when buffer empty).
pub async fn flush(&self) -> Result<(), String> {
let batch = {
let mut buf = self.buffer.lock().await;
if buf.is_empty() { return Ok(()); }
std::mem::take(&mut *buf)
};
let ts_us = Utc::now().timestamp_micros().max(0) as u128;
let key = format!("{}/batch_{:019}.jsonl", self.prefix, ts_us);
let mut body = Vec::with_capacity(batch.iter().map(|b| b.len() + 1).sum());
for line in batch {
body.extend_from_slice(&line);
if !body.ends_with(b"\n") { body.push(b'\n'); }
}
ops::put(&self.store, &key, Bytes::from(body)).await
}
/// Read every event across all batch files + unflushed in-memory buffer.
/// Events are returned in chronological order.
pub async fn read_all(&self) -> Result<Vec<Vec<u8>>, String> {
let mut keys = self.list_batch_keys().await?;
keys.sort();
let mut out = Vec::new();
for key in keys {
let bytes = ops::get(&self.store, &key).await?;
for line in bytes.split(|b| *b == b'\n') {
if !line.is_empty() {
out.push(line.to_vec());
}
}
}
// Include unflushed events so callers see the latest state
// whether or not someone ran flush() recently.
let buf = self.buffer.lock().await;
for line in buf.iter() {
out.push(line.clone());
}
Ok(out)
}
/// Consolidate all current batch files into one compacted file, then
/// delete the originals. Safe to call while appends are in flight:
/// new batches written during compaction get a higher timestamp and
/// survive. Fails closed — if anything goes wrong mid-delete, the
/// compacted file coexists with originals and next read sees duplicates
/// (which the dedup caller must handle) rather than data loss.
pub async fn compact(&self) -> Result<CompactStats, String> {
// Snapshot which files to compact BEFORE we write the new one.
let mut originals = self.list_batch_keys().await?;
originals.sort();
if originals.len() < 2 {
return Ok(CompactStats { merged_files: originals.len(), events: 0, new_key: None });
}
// Gather all existing events.
let mut events = Vec::new();
for key in &originals {
let bytes = ops::get(&self.store, key).await?;
for line in bytes.split(|b| *b == b'\n') {
if !line.is_empty() {
events.push(line.to_vec());
}
}
}
let total_events = events.len();
if total_events == 0 {
// Clean up empty files without writing a new one.
for key in &originals {
let _ = ops::delete(&self.store, key).await;
}
return Ok(CompactStats { merged_files: originals.len(), events: 0, new_key: None });
}
// Name the compacted file with the SAME `batch_{ts}` format so it
// sorts chronologically with future batches. Using a distinct prefix
// ("batch_compacted_") would break lex ordering: later `batch_N`
// files would sort BEFORE the compacted file because 'c' > digits.
// Timestamp = now, so any appends arriving during compaction (which
// get the current wall-clock time) sort AFTER this file.
let ts_us = Utc::now().timestamp_micros().max(0) as u128;
let new_key = format!("{}/batch_{:019}.jsonl", self.prefix, ts_us);
let mut body = Vec::new();
for line in &events {
body.extend_from_slice(line);
body.push(b'\n');
}
ops::put(&self.store, &new_key, Bytes::from(body)).await?;
// Only delete originals once the consolidated file is persisted.
let mut failures = 0;
for key in &originals {
if ops::delete(&self.store, key).await.is_err() {
failures += 1;
}
}
if failures > 0 {
tracing::warn!(
"compact '{}': {} original files failed to delete — consolidated file {} has the data",
self.prefix, failures, new_key,
);
}
Ok(CompactStats {
merged_files: originals.len(),
events: total_events,
new_key: Some(new_key),
})
}
/// How many batch files exist on disk right now.
pub async fn file_count(&self) -> Result<usize, String> {
Ok(self.list_batch_keys().await?.len())
}
async fn list_batch_keys(&self) -> Result<Vec<String>, String> {
let prefix_with_slash = format!("{}/", self.prefix);
// list the prefix then filter for keys that match our naming scheme;
// unrelated files at the same prefix won't be touched.
let raw = ops::list(&self.store, Some(&prefix_with_slash)).await?;
Ok(raw
.into_iter()
.filter(|k| {
let basename = k.rsplit('/').next().unwrap_or(k);
basename.starts_with("batch_") && basename.ends_with(".jsonl")
})
.collect())
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CompactStats {
pub merged_files: usize,
pub events: usize,
pub new_key: Option<String>,
}
// Log unflushed-buffer size on drop. We can't `.await` from a sync `Drop`,
// so a real flush isn't possible here — callers are responsible for calling
// `.flush()` before dropping if durability matters. These journals are
// observability hints; a few lost buffered events at shutdown are
// acceptable per ADR-018.
impl Drop for AppendLog {
fn drop(&mut self) {
let buf_len = self.buffer.try_lock().map(|b| b.len()).unwrap_or(0);
if buf_len == 0 { return; }
tracing::debug!(
"append_log '{}' dropping with {} unflushed events",
self.prefix, buf_len,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
fn mk(threshold: usize) -> AppendLog {
AppendLog::new(Arc::new(InMemory::new()), "prefix")
.with_flush_threshold(threshold)
}
#[tokio::test]
async fn append_stays_buffered_below_threshold() {
let log = mk(5);
log.append(b"one".to_vec()).await.unwrap();
log.append(b"two".to_vec()).await.unwrap();
assert_eq!(log.file_count().await.unwrap(), 0, "no files until threshold");
let all = log.read_all().await.unwrap();
assert_eq!(all, vec![b"one".to_vec(), b"two".to_vec()],
"read_all surfaces unflushed buffer");
}
#[tokio::test]
async fn append_auto_flushes_on_threshold() {
let log = mk(3);
for i in 0..3 {
log.append(format!("evt{i}").into_bytes()).await.unwrap();
}
assert_eq!(log.file_count().await.unwrap(), 1, "threshold triggered one flush");
// A fourth append stays buffered until the next threshold.
log.append(b"evt3".to_vec()).await.unwrap();
assert_eq!(log.file_count().await.unwrap(), 1, "below threshold again");
}
#[tokio::test]
async fn flush_empty_is_noop() {
let log = mk(32);
log.flush().await.unwrap();
log.flush().await.unwrap();
assert_eq!(log.file_count().await.unwrap(), 0);
}
#[tokio::test]
async fn read_all_orders_events_across_flushes() {
let log = mk(1); // flush-on-every-append
for i in 0..5 {
log.append(format!("e{i}").into_bytes()).await.unwrap();
// Spread writes out so timestamps sort strictly.
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
}
let all = log.read_all().await.unwrap();
let strs: Vec<String> = all.into_iter()
.map(|v| String::from_utf8(v).unwrap())
.collect();
assert_eq!(strs, vec!["e0", "e1", "e2", "e3", "e4"],
"lex sort of batch keys == chronological event order");
}
#[tokio::test]
async fn compact_merges_multiple_files_into_one() {
let log = mk(1); // force file-per-append
for i in 0..4 {
log.append(format!("e{i}").into_bytes()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
}
assert_eq!(log.file_count().await.unwrap(), 4);
let stats = log.compact().await.unwrap();
assert_eq!(stats.merged_files, 4);
assert_eq!(stats.events, 4);
assert!(stats.new_key.is_some());
assert_eq!(log.file_count().await.unwrap(), 1, "originals deleted, 1 survivor");
let all = log.read_all().await.unwrap();
assert_eq!(all.len(), 4, "no events lost in compaction");
}
#[tokio::test]
async fn compact_with_single_file_is_noop() {
let log = mk(1);
log.append(b"only".to_vec()).await.unwrap();
assert_eq!(log.file_count().await.unwrap(), 1);
let stats = log.compact().await.unwrap();
assert_eq!(stats.merged_files, 1);
assert_eq!(stats.events, 0, "nothing to consolidate");
assert!(stats.new_key.is_none(), "no new file written");
assert_eq!(log.file_count().await.unwrap(), 1, "original untouched");
}
}