lakehouse/crates/storaged/src/error_journal.rs
root dbe00d018f Federation foundation + HNSW trial system + Postgres streaming + PRD reframe
Four shipped features and a PRD realignment, all measured end-to-end:

HNSW trial system (Phase 15 horizon item → complete)
- vectord: EmbeddingCache, harness (eval sets + brute-force ground truth),
  TrialJournal, parameterized HnswConfig on build_index_with_config
- /vectors/hnsw/trial, /hnsw/trials/{idx}, /hnsw/trials/{idx}/best,
  /hnsw/evals/{name}/autogen, /hnsw/cache/stats
- Measured on resumes_100k_v2 (100K × 768d): brute-force 44ms -> HNSW 873us
  at 100% recall@10. ec=80 es=30 locked as HnswConfig::default()
- Lower ec values trade recall for build time: 20/30 = 0.96 recall in 8s,
  80/30 = 1.00 recall in 230s

Catalog manifest repair
- catalogd: resync_from_parquet reads parquet footers to restore row_count
  and columns on drifted manifests
- POST /catalog/datasets/{name}/resync + POST /catalog/resync-missing
- All 7 staffing tables recovered to PRD-matching 2,469,278 rows

Federation foundation (ADR-017)
- shared::secrets: SecretsProvider trait + FileSecretsProvider (reads
  /etc/lakehouse/secrets.toml, enforces 0600 perms)
- storaged::registry::BucketRegistry — multi-bucket resolution with
  rescue_bucket read fallback and reachability probing
- storaged::error_journal — bucket op failures visible in one HTTP call
- storaged::append_log — write-once batched append pattern (fixes the RMW
  anti-pattern llms3.com calls out; errors and trial journals both use it)
- /storage/buckets, /storage/errors, /storage/bucket-health,
  /storage/errors/{flush,compact}
- Bucket-aware I/O at /storage/buckets/{bucket}/objects/{*key} with
  X-Lakehouse-Rescue-Used observability headers on fallback

Postgres streaming ingest
- ingestd::pg_stream: DSN parser, batched ORDER BY + LIMIT/OFFSET pagination
  into ArrowWriter, lineage redacts password
- POST /ingest/db — verified against live knowledge_base.team_runs
  (586 rows × 13 cols, 6 batches, 196ms end-to-end)

PRD realignment (2026-04-16)
- Dual use case: staffing analytics + local LLM knowledge substrate
- Removed "multi-tenancy (single-owner system)" from non-goals
- Added invariants 8-11: indexes hot-swappable, per-reader profiles,
  trials-as-data, operational failures findable in one HTTP call
- New phases 16 (hot-swap generations), 17 (model profiles + dataset
  bindings), 18 (Lance vs Parquet+sidecar evaluation)
- Known ceilings table documents the 5M vector wall and escape hatches
- ADR-017 (federation), ADR-018 (append-log pattern) added
- EXECUTION_PLAN.md sequences phases B-E with success gates and
  decision rules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 01:50:05 -05:00

201 lines
6.9 KiB
Rust

/// Bucket operation error journal.
///
/// Every bucket op failure and every rescue fallback lands here. Goal:
/// answering "is anything broken?" with one HTTP call.
///
/// Storage: batched write-once files under `_errors/bucket_errors/` in the
/// primary bucket. Uses the shared `AppendLog` helper so we never rewrite
/// existing files — see `append_log.rs` for the full pattern rationale.
///
/// In-memory ring buffer holds the last N events for fast response; on
/// startup `load_recent` hydrates it from all batch files.
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::append_log::{AppendLog, CompactStats};
const JOURNAL_PREFIX: &str = "_errors/bucket_errors";
const RING_CAPACITY: usize = 2000;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum BucketOp {
Read,
Write,
Delete,
List,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketErrorEvent {
pub ts: DateTime<Utc>,
pub op: BucketOp,
pub target: String,
pub key: String,
pub error: String,
#[serde(default)]
pub rescued: bool,
}
impl BucketErrorEvent {
pub fn new_read(target: &str, key: &str, error: &str) -> Self {
Self { ts: Utc::now(), op: BucketOp::Read, target: target.into(), key: key.into(), error: error.into(), rescued: false }
}
pub fn new_write(target: &str, key: &str, error: &str) -> Self {
Self { ts: Utc::now(), op: BucketOp::Write, target: target.into(), key: key.into(), error: error.into(), rescued: false }
}
pub fn new_delete(target: &str, key: &str, error: &str) -> Self {
Self { ts: Utc::now(), op: BucketOp::Delete, target: target.into(), key: key.into(), error: error.into(), rescued: false }
}
pub fn new_list(target: &str, prefix: &str, error: &str) -> Self {
Self { ts: Utc::now(), op: BucketOp::List, target: target.into(), key: prefix.into(), error: error.into(), rescued: false }
}
}
#[derive(Clone)]
pub struct ErrorJournal {
log: Arc<AppendLog>,
ring: Arc<RwLock<VecDeque<BucketErrorEvent>>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct HealthReport {
pub period_minutes: i64,
pub total_errors: usize,
pub per_bucket: std::collections::HashMap<String, usize>,
pub unhealthy_buckets: Vec<String>,
}
impl ErrorJournal {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
// Keep flush threshold lowish — operators checking /storage/errors
// after a recent incident want to see fresh rows even without an
// explicit flush.
let log = Arc::new(
AppendLog::new(store, JOURNAL_PREFIX).with_flush_threshold(8),
);
Self {
log,
ring: Arc::new(RwLock::new(VecDeque::with_capacity(RING_CAPACITY))),
}
}
/// Hydrate the ring buffer from existing batch files. Call once at
/// startup. Tolerates malformed lines (skipped with a warning) and
/// missing files (returns 0).
pub async fn load_recent(&self) -> Result<usize, String> {
let lines = self.log.read_all().await.unwrap_or_default();
let mut ring = self.ring.write().await;
for line in lines {
match serde_json::from_slice::<BucketErrorEvent>(&line) {
Ok(ev) => {
if ring.len() >= RING_CAPACITY { ring.pop_front(); }
ring.push_back(ev);
}
Err(e) => tracing::warn!("error journal: skip malformed line ({e})"),
}
}
Ok(ring.len())
}
/// Append an event. In-memory ring updated immediately; persistence
/// happens in batches via the underlying AppendLog.
pub async fn append(&self, event: BucketErrorEvent) {
{
let mut ring = self.ring.write().await;
if ring.len() >= RING_CAPACITY { ring.pop_front(); }
ring.push_back(event.clone());
}
match serde_json::to_vec(&event) {
Ok(line) => {
if let Err(e) = self.log.append(line).await {
tracing::error!("error journal persist failed: {e}");
}
}
Err(e) => tracing::error!("error journal serialize failed: {e}"),
}
}
/// Mark the most recent matching in-memory event as rescued.
/// Only updates the ring buffer — the JSONL line already persisted
/// records the failure fact; rescue status travels in response headers.
pub async fn mark_rescued_last(&self, target: &str, key: &str) {
let mut ring = self.ring.write().await;
for ev in ring.iter_mut().rev() {
if matches!(ev.op, BucketOp::Read) && ev.target == target && ev.key == key && !ev.rescued {
ev.rescued = true;
break;
}
}
}
pub async fn recent(&self, limit: usize) -> Vec<BucketErrorEvent> {
let ring = self.ring.read().await;
let start = ring.len().saturating_sub(limit);
ring.iter().skip(start).cloned().collect()
}
pub async fn filter(
&self,
bucket: Option<&str>,
since: Option<DateTime<Utc>>,
limit: usize,
) -> Vec<BucketErrorEvent> {
let ring = self.ring.read().await;
ring.iter()
.rev()
.filter(|ev| bucket.map_or(true, |b| ev.target == b))
.filter(|ev| since.map_or(true, |s| ev.ts >= s))
.take(limit)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
}
/// Summarize errors in the last N minutes.
pub async fn health(&self, period_minutes: i64) -> HealthReport {
use std::collections::HashMap;
let cutoff = Utc::now() - chrono::Duration::minutes(period_minutes);
let ring = self.ring.read().await;
let recent: Vec<_> = ring.iter().filter(|ev| ev.ts >= cutoff).collect();
let mut per_bucket: HashMap<String, usize> = HashMap::new();
for ev in &recent {
*per_bucket.entry(ev.target.clone()).or_insert(0) += 1;
}
let unhealthy_buckets: Vec<String> = per_bucket
.iter()
.filter(|(_, c)| **c >= 3)
.map(|(k, _)| k.clone())
.collect();
HealthReport {
period_minutes,
total_errors: recent.len(),
per_bucket,
unhealthy_buckets,
}
}
/// Force an immediate flush of buffered events to object storage.
pub async fn flush(&self) -> Result<(), String> {
self.log.flush().await
}
/// Consolidate all batch files into one. Operator cleanup.
pub async fn compact(&self) -> Result<CompactStats, String> {
self.log.compact().await
}
/// How many JSONL batch files currently exist.
pub async fn file_count(&self) -> Result<usize, String> {
self.log.file_count().await
}
}