lakehouse/crates/catalogd/src/tombstones.rs
root 4e1c400f5d Phase E.2: Compaction integrates tombstones — physical deletion closes GDPR loop
Phase E gave us soft-delete at query time (tombstones hide rows via a
DataFusion filter view). This completes the invariant: after compact,
tombstoned rows are PHYSICALLY absent from the parquet on disk.

delta::compact changes:
- Signature adds tombstones: &[Tombstone]
- After merging base + deltas, apply_tombstone_filter builds a
  BooleanArray keep-mask per batch (True where row_key_value is NOT
  in the tombstone set) and applies arrow::compute::filter_record_batch
- Supports Utf8, Int32, Int64 key columns (matches refresh.rs coverage
  for pg- and csv-derived schemas)
- CompactResult gains tombstones_applied + rows_dropped_by_tombstones
- Caller clears tombstone store on success

Critical correctness fix surfaced during E2E testing:
The original Phase 8 compact concatenated N independent Parquet byte
streams from record_batch_to_parquet() — each with its own footer.
Parquet readers only see the FIRST footer's data; the rest is invisible.
Latent since Phase 8 shipped; triggered by tombstone-filtering produc-
ing multiple batches. Corrupted candidates.parquet on first test run
(restored from UI fixture copy — good argument for test data in repo).

Fix:
- Single ArrowWriter per compaction, writes every batch into one
  properly-footered Parquet
- Snappy compression to match ingest defaults (otherwise rewrite
  inflated file 3× — 10.5MB → 34MB — because no compression was set)
- Verify-before-swap: parse written buf back to confirm row count
  matches expected; refuses to overwrite base_key if verification fails
- Write to {base_key}.compact-{ts}.tmp first, then to base_key; delete
  temp; only then delete delta files. Any error along the way leaves
  the original base intact.

TombstoneStore::clear(dataset) drops all tombstone batch files and
evicts the per-dataset AppendLog from cache. Called after successful
compact.

QueryEngine::catalog() accessor exposes the Registry so queryd
handlers can reach the tombstone store without routing through gateway
state.

E2E on candidates (100K rows, 15 cols):
- Baseline: 10.59 MB, 100000 rows
- Tombstone CAND-000001/2/3 (soft-delete): 99997 visible, 100000 raw
- Compact: tombstones_applied=3, rows_dropped=3, final_rows=99997
- Post: 10.72 MB (Snappy), valid parquet (1 row_group), 99997 rows
- Restart: persists, tombstones list empty, __raw__candidates also
  99997 (the 3 IDs are physically gone from disk)

PRD invariant close: deletion is now actually deletion, not just
masking. GDPR erasure request → tombstone + schedule compact → data
gone.

Deferred:
- Compact-all-datasets cron (currently manual per-dataset via
  POST /query/compact)
- Compaction of tombstone batch files themselves (they grow at
  flush_threshold=1 per tombstone; TombstoneStore::compact exists
  but not auto-called)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 10:38:30 -05:00

152 lines
5.8 KiB
Rust

//! Soft-delete tombstone storage (Phase E).
//!
//! One append-log per dataset at `_catalog/tombstones/{dataset}/batch_*.jsonl`.
//! Uses the shared `storaged::append_log::AppendLog` pattern so appends
//! are write-once (never rewrites existing files) and can be compacted.
//!
//! The store exposes a per-dataset cache of active tombstones for the
//! hot path (queryd filter construction) — that's why events are pulled
//! into an in-memory map on every call rather than scanning object
//! storage repeatedly.
use object_store::ObjectStore;
use shared::types::Tombstone;
use std::collections::HashMap;
use std::sync::Arc;
use storaged::append_log::{AppendLog, CompactStats};
use tokio::sync::RwLock;
const TOMBSTONE_PREFIX: &str = "_catalog/tombstones";
#[derive(Clone)]
pub struct TombstoneStore {
store: Arc<dyn ObjectStore>,
logs: Arc<RwLock<HashMap<String, Arc<AppendLog>>>>,
}
impl TombstoneStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
store,
logs: Arc::new(RwLock::new(HashMap::new())),
}
}
fn prefix_for(dataset: &str) -> String {
// Sanitize dataset name for filesystem safety.
let safe: String = dataset
.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' })
.collect();
format!("{TOMBSTONE_PREFIX}/{}", safe)
}
async fn log_for(&self, dataset: &str) -> Arc<AppendLog> {
if let Some(log) = self.logs.read().await.get(dataset) {
return log.clone();
}
let mut guard = self.logs.write().await;
if let Some(log) = guard.get(dataset) {
return log.clone();
}
// Threshold of 1 — every tombstone is high-value. Compliance/audit
// doesn't tolerate "lost on restart"; we trade a small file count
// for guaranteed durability. Compaction merges later if volume grows.
let log = Arc::new(
AppendLog::new(self.store.clone(), Self::prefix_for(dataset))
.with_flush_threshold(1),
);
guard.insert(dataset.to_string(), log.clone());
log
}
/// Append one tombstone. Validates that the `row_key_column` matches
/// the column already used for this dataset (all tombstones for a
/// dataset share one key column so the query filter is well-defined).
/// Forces a flush so the tombstone is durable before this call returns.
pub async fn append(&self, ts: &Tombstone) -> Result<(), String> {
let existing = self.list(&ts.dataset).await?;
if let Some(prior) = existing.first() {
if prior.row_key_column != ts.row_key_column {
return Err(format!(
"dataset '{}' already uses '{}' as tombstone key; cannot mix with '{}'",
ts.dataset, prior.row_key_column, ts.row_key_column,
));
}
}
let line = serde_json::to_vec(ts).map_err(|e| e.to_string())?;
let log = self.log_for(&ts.dataset).await;
log.append(line).await?;
// Belt-and-suspenders: explicit flush in case the threshold is
// ever raised. Tombstones must be durable on return.
log.flush().await
}
/// All tombstones for a dataset (chronological).
pub async fn list(&self, dataset: &str) -> Result<Vec<Tombstone>, String> {
let log = self.log_for(dataset).await;
let lines = log.read_all().await?;
let mut out = Vec::with_capacity(lines.len());
for line in lines {
match serde_json::from_slice::<Tombstone>(&line) {
Ok(t) => out.push(t),
Err(e) => tracing::warn!("tombstones/{}: skip malformed entry: {e}", dataset),
}
}
Ok(out)
}
/// Per-dataset grouped view used by queryd — returns a map of
/// `{dataset -> (row_key_column, set_of_values)}` for every dataset
/// that has any tombstones.
pub async fn all_grouped(
&self,
datasets: &[String],
) -> Result<HashMap<String, (String, Vec<String>)>, String> {
let mut grouped = HashMap::new();
for dataset in datasets {
let ts = match self.list(dataset).await {
Ok(ts) => ts,
Err(_) => continue,
};
if ts.is_empty() { continue; }
let col = ts[0].row_key_column.clone();
let values: Vec<String> = ts.iter().map(|t| t.row_key_value.clone()).collect();
grouped.insert(dataset.clone(), (col, values));
}
Ok(grouped)
}
pub async fn compact(&self, dataset: &str) -> Result<CompactStats, String> {
let log = self.log_for(dataset).await;
log.compact().await
}
/// Remove every tombstone for a dataset. Called after a successful
/// parquet compaction has physically deleted those rows — the
/// tombstones have done their job and the journal can be cleared.
///
/// Implementation: drop the per-dataset AppendLog from the cache and
/// delete all its batch files. Next write starts fresh.
pub async fn clear(&self, dataset: &str) -> Result<usize, String> {
let prefix = format!("{}/", Self::prefix_for(dataset));
let keys = storaged::ops::list(&self.store, Some(&prefix)).await?;
let matching: Vec<String> = keys
.into_iter()
.filter(|k| {
let basename = k.rsplit('/').next().unwrap_or(k);
basename.starts_with("batch_") && basename.ends_with(".jsonl")
})
.collect();
let count = matching.len();
for key in &matching {
let _ = storaged::ops::delete(&self.store, key).await;
}
self.logs.write().await.remove(dataset);
if count > 0 {
tracing::info!("cleared {count} tombstone batch files for '{}'", dataset);
}
Ok(count)
}
}