//! Soft-delete tombstone storage (Phase E). //! //! One append-log per dataset at `_catalog/tombstones/{dataset}/batch_*.jsonl`. //! Uses the shared `storaged::append_log::AppendLog` pattern so appends //! are write-once (never rewrites existing files) and can be compacted. //! //! The store exposes a per-dataset cache of active tombstones for the //! hot path (queryd filter construction) — that's why events are pulled //! into an in-memory map on every call rather than scanning object //! storage repeatedly. use object_store::ObjectStore; use shared::types::Tombstone; use std::collections::HashMap; use std::sync::Arc; use storaged::append_log::{AppendLog, CompactStats}; use tokio::sync::RwLock; const TOMBSTONE_PREFIX: &str = "_catalog/tombstones"; #[derive(Clone)] pub struct TombstoneStore { store: Arc, logs: Arc>>>, } impl TombstoneStore { pub fn new(store: Arc) -> Self { Self { store, logs: Arc::new(RwLock::new(HashMap::new())), } } fn prefix_for(dataset: &str) -> String { // Sanitize dataset name for filesystem safety. let safe: String = dataset .chars() .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' }) .collect(); format!("{TOMBSTONE_PREFIX}/{}", safe) } async fn log_for(&self, dataset: &str) -> Arc { if let Some(log) = self.logs.read().await.get(dataset) { return log.clone(); } let mut guard = self.logs.write().await; if let Some(log) = guard.get(dataset) { return log.clone(); } // Threshold of 1 — every tombstone is high-value. Compliance/audit // doesn't tolerate "lost on restart"; we trade a small file count // for guaranteed durability. Compaction merges later if volume grows. let log = Arc::new( AppendLog::new(self.store.clone(), Self::prefix_for(dataset)) .with_flush_threshold(1), ); guard.insert(dataset.to_string(), log.clone()); log } /// Append one tombstone. Validates that the `row_key_column` matches /// the column already used for this dataset (all tombstones for a /// dataset share one key column so the query filter is well-defined). /// Forces a flush so the tombstone is durable before this call returns. pub async fn append(&self, ts: &Tombstone) -> Result<(), String> { let existing = self.list(&ts.dataset).await?; if let Some(prior) = existing.first() { if prior.row_key_column != ts.row_key_column { return Err(format!( "dataset '{}' already uses '{}' as tombstone key; cannot mix with '{}'", ts.dataset, prior.row_key_column, ts.row_key_column, )); } } let line = serde_json::to_vec(ts).map_err(|e| e.to_string())?; let log = self.log_for(&ts.dataset).await; log.append(line).await?; // Belt-and-suspenders: explicit flush in case the threshold is // ever raised. Tombstones must be durable on return. log.flush().await } /// All tombstones for a dataset (chronological). pub async fn list(&self, dataset: &str) -> Result, String> { let log = self.log_for(dataset).await; let lines = log.read_all().await?; let mut out = Vec::with_capacity(lines.len()); for line in lines { match serde_json::from_slice::(&line) { Ok(t) => out.push(t), Err(e) => tracing::warn!("tombstones/{}: skip malformed entry: {e}", dataset), } } Ok(out) } /// Per-dataset grouped view used by queryd — returns a map of /// `{dataset -> (row_key_column, set_of_values)}` for every dataset /// that has any tombstones. pub async fn all_grouped( &self, datasets: &[String], ) -> Result)>, String> { let mut grouped = HashMap::new(); for dataset in datasets { let ts = match self.list(dataset).await { Ok(ts) => ts, Err(_) => continue, }; if ts.is_empty() { continue; } let col = ts[0].row_key_column.clone(); let values: Vec = ts.iter().map(|t| t.row_key_value.clone()).collect(); grouped.insert(dataset.clone(), (col, values)); } Ok(grouped) } pub async fn compact(&self, dataset: &str) -> Result { let log = self.log_for(dataset).await; log.compact().await } /// Remove every tombstone for a dataset. Called after a successful /// parquet compaction has physically deleted those rows — the /// tombstones have done their job and the journal can be cleared. /// /// Implementation: drop the per-dataset AppendLog from the cache and /// delete all its batch files. Next write starts fresh. pub async fn clear(&self, dataset: &str) -> Result { let prefix = format!("{}/", Self::prefix_for(dataset)); let keys = storaged::ops::list(&self.store, Some(&prefix)).await?; let matching: Vec = keys .into_iter() .filter(|k| { let basename = k.rsplit('/').next().unwrap_or(k); basename.starts_with("batch_") && basename.ends_with(".jsonl") }) .collect(); let count = matching.len(); for key in &matching { let _ = storaged::ops::delete(&self.store, key).await; } self.logs.write().await.remove(dataset); if count > 0 { tracing::info!("cleared {count} tombstone batch files for '{}'", dataset); } Ok(count) } }