From d87f2ccac62b968594826df9e404912455df8a43 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 16 Apr 2026 09:40:48 -0500 Subject: [PATCH] Phase E: Soft deletes (tombstones) for compliance-grade row deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements GDPR/CCPA-compatible row-level deletion without rewriting the underlying Parquet. Tombstone markers live beside each dataset and are applied at query time via a DataFusion view that excludes the deleted row_key_values. Schema (shared::types): - Tombstone { dataset, row_key_column, row_key_value, deleted_at, actor, reason } - All tombstones for a dataset must share one row_key_column — enforced at write so the query-time filter remains a single WHERE NOT IN (...) clause Storage (catalogd::tombstones): - Per-dataset AppendLog at _catalog/tombstones/{dataset}/ - flush_threshold=1 + explicit flush after every append — tombstones are high-value, low-frequency; durability on return is the contract - Reuses storaged::append_log infra so compaction is already wired (POST .../tombstones/compact will work once we expose it) Catalog (catalogd::registry): - add_tombstone validates dataset exists + key column compatibility - list_tombstones for the GET endpoint - TombstoneStore exposed via Registry::tombstones() for queryd HTTP (catalogd::service): - POST /catalog/datasets/by-name/{name}/tombstone { row_key_column, row_key_values[], actor, reason } Returns rows_tombstoned count + per-value failure list (207 on partial success). - GET same path lists active tombstones with full audit info. Query layer (queryd::context): - Snapshot tombstones-by-dataset before registering tables - Tombstoned tables: raw goes to "__raw__{name}", public "{name}" becomes DataFusion view with SELECT * FROM "__raw__{name}" WHERE CAST(col AS VARCHAR) NOT IN (...) - CAST AS VARCHAR handles both string and integer key columns - Untombstoned tables register as before — zero overhead End-to-end on candidates (100K rows): - Pick CAND-000001/2/3 (Linda/Charles/Kimberly) - POST tombstone -> rows_tombstoned: 3 - COUNT(*) drops 100000 -> 99997 - WHERE candidate_id IN (those 3) -> 0 rows - candidates_safe view transitively excludes them (Linda+Denver: __raw__candidates=159, candidates_safe=158) - Restart: COUNT still 99997, 3 tombstones reload from disk Reversibility: tombstones are reversible deletes, not destruction. Power users can still query "__raw__{name}" to see deleted rows. Phase 13 access control is what stops a non-admin from accessing __raw__* tables. Limits / follow-up: - Physical compaction not yet integrated — Phase 8's compact_files doesn't read tombstones during merge. Tombstoned rows are still on disk until that integration ships. - Phase 9 journald event emission for tombstones not wired — tombstone records carry their own actor+reason+timestamp so the audit trail is intact, but cross-referencing with the mutation event log would help compliance reporting. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/catalogd/src/lib.rs | 1 + crates/catalogd/src/registry.rs | 40 +++++++++- crates/catalogd/src/service.rs | 69 +++++++++++++++++ crates/catalogd/src/tombstones.rs | 124 ++++++++++++++++++++++++++++++ crates/queryd/src/context.rs | 69 +++++++++++++++-- crates/shared/src/types.rs | 29 +++++++ docs/PHASES.md | 9 +++ 7 files changed, 332 insertions(+), 9 deletions(-) create mode 100644 crates/catalogd/src/tombstones.rs diff --git a/crates/catalogd/src/lib.rs b/crates/catalogd/src/lib.rs index 6061f57..331cdb5 100644 --- a/crates/catalogd/src/lib.rs +++ b/crates/catalogd/src/lib.rs @@ -1,3 +1,4 @@ pub mod registry; pub mod service; pub mod grpc; +pub mod tombstones; diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index 996c716..d218714 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -1,7 +1,9 @@ use shared::types::{ AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ObjectRef, - RefreshPolicy, SchemaFingerprint, Sensitivity, + RefreshPolicy, SchemaFingerprint, Sensitivity, Tombstone, }; + +use crate::tombstones::TombstoneStore; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; @@ -46,10 +48,12 @@ const VIEW_PREFIX: &str = "_catalog/views"; /// In-memory dataset registry backed by manifest persistence in object storage. /// Also tracks AiViews (Phase D) — safe projections over base datasets. +/// And tombstones (Phase E) — soft-delete markers applied at query time. #[derive(Clone)] pub struct Registry { datasets: Arc>>, views: Arc>>, + tombstones: TombstoneStore, store: Arc, } @@ -58,10 +62,44 @@ impl Registry { Self { datasets: Arc::new(RwLock::new(HashMap::new())), views: Arc::new(RwLock::new(HashMap::new())), + tombstones: TombstoneStore::new(store.clone()), store, } } + pub fn tombstones(&self) -> &TombstoneStore { + &self.tombstones + } + + /// Add a tombstone for a dataset row. Validates that the dataset + /// exists. Idempotent at the endpoint layer — callers dedup if needed. + pub async fn add_tombstone( + &self, + dataset: &str, + row_key_column: &str, + row_key_value: &str, + actor: &str, + reason: &str, + ) -> Result { + if self.get_by_name(dataset).await.is_none() { + return Err(format!("dataset not found: {dataset}")); + } + let ts = Tombstone { + dataset: dataset.to_string(), + row_key_column: row_key_column.to_string(), + row_key_value: row_key_value.to_string(), + deleted_at: chrono::Utc::now(), + actor: actor.to_string(), + reason: reason.to_string(), + }; + self.tombstones.append(&ts).await?; + Ok(ts) + } + + pub async fn list_tombstones(&self, dataset: &str) -> Result, String> { + self.tombstones.list(dataset).await + } + /// Rebuild in-memory index from persisted manifests + views on startup. pub async fn rebuild(&self) -> Result { let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?; diff --git a/crates/catalogd/src/service.rs b/crates/catalogd/src/service.rs index 1817b4f..f0fb737 100644 --- a/crates/catalogd/src/service.rs +++ b/crates/catalogd/src/service.rs @@ -25,6 +25,8 @@ pub fn router(registry: Registry) -> Router { // Phase D: AI-safe views .route("/views", post(create_view).get(list_views)) .route("/views/{name}", get(get_view).delete(delete_view)) + // Phase E: soft-delete tombstones + .route("/datasets/by-name/{name}/tombstone", post(tombstone_rows).get(list_tombstones)) .with_state(registry) } @@ -269,3 +271,70 @@ async fn delete_view( Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } + +// --- Phase E: soft-delete tombstones --- + +#[derive(Deserialize)] +struct TombstoneRequest { + row_key_column: String, + row_key_values: Vec, + #[serde(default)] + actor: String, + #[serde(default)] + reason: String, +} + +#[derive(Serialize)] +struct TombstoneResponse { + dataset: String, + row_key_column: String, + rows_tombstoned: usize, + failures: Vec, +} + +async fn tombstone_rows( + State(registry): State, + Path(name): Path, + Json(req): Json, +) -> impl IntoResponse { + if req.row_key_values.is_empty() { + return Err((StatusCode::BAD_REQUEST, "row_key_values is empty".to_string())); + } + + let mut ok = 0; + let mut failures = Vec::new(); + for value in &req.row_key_values { + match registry + .add_tombstone(&name, &req.row_key_column, value, &req.actor, &req.reason) + .await + { + Ok(_) => ok += 1, + Err(e) => failures.push(format!("{value}: {e}")), + } + } + + let status = if ok > 0 && failures.is_empty() { + StatusCode::CREATED + } else if ok > 0 { + StatusCode::MULTI_STATUS + } else { + StatusCode::BAD_REQUEST + }; + + Ok((status, Json(TombstoneResponse { + dataset: name, + row_key_column: req.row_key_column, + rows_tombstoned: ok, + failures, + }))) +} + +async fn list_tombstones( + State(registry): State, + Path(name): Path, +) -> impl IntoResponse { + match registry.list_tombstones(&name).await { + Ok(ts) => Ok(Json(ts)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/crates/catalogd/src/tombstones.rs b/crates/catalogd/src/tombstones.rs new file mode 100644 index 0000000..57eb310 --- /dev/null +++ b/crates/catalogd/src/tombstones.rs @@ -0,0 +1,124 @@ +//! Soft-delete tombstone storage (Phase E). +//! +//! One append-log per dataset at `_catalog/tombstones/{dataset}/batch_*.jsonl`. +//! Uses the shared `storaged::append_log::AppendLog` pattern so appends +//! are write-once (never rewrites existing files) and can be compacted. +//! +//! The store exposes a per-dataset cache of active tombstones for the +//! hot path (queryd filter construction) — that's why events are pulled +//! into an in-memory map on every call rather than scanning object +//! storage repeatedly. + +use object_store::ObjectStore; +use shared::types::Tombstone; +use std::collections::HashMap; +use std::sync::Arc; +use storaged::append_log::{AppendLog, CompactStats}; +use tokio::sync::RwLock; + +const TOMBSTONE_PREFIX: &str = "_catalog/tombstones"; + +#[derive(Clone)] +pub struct TombstoneStore { + store: Arc, + logs: Arc>>>, +} + +impl TombstoneStore { + pub fn new(store: Arc) -> Self { + Self { + store, + logs: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn prefix_for(dataset: &str) -> String { + // Sanitize dataset name for filesystem safety. + let safe: String = dataset + .chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' { c } else { '_' }) + .collect(); + format!("{TOMBSTONE_PREFIX}/{}", safe) + } + + async fn log_for(&self, dataset: &str) -> Arc { + if let Some(log) = self.logs.read().await.get(dataset) { + return log.clone(); + } + let mut guard = self.logs.write().await; + if let Some(log) = guard.get(dataset) { + return log.clone(); + } + // Threshold of 1 — every tombstone is high-value. Compliance/audit + // doesn't tolerate "lost on restart"; we trade a small file count + // for guaranteed durability. Compaction merges later if volume grows. + let log = Arc::new( + AppendLog::new(self.store.clone(), Self::prefix_for(dataset)) + .with_flush_threshold(1), + ); + guard.insert(dataset.to_string(), log.clone()); + log + } + + /// Append one tombstone. Validates that the `row_key_column` matches + /// the column already used for this dataset (all tombstones for a + /// dataset share one key column so the query filter is well-defined). + /// Forces a flush so the tombstone is durable before this call returns. + pub async fn append(&self, ts: &Tombstone) -> Result<(), String> { + let existing = self.list(&ts.dataset).await?; + if let Some(prior) = existing.first() { + if prior.row_key_column != ts.row_key_column { + return Err(format!( + "dataset '{}' already uses '{}' as tombstone key; cannot mix with '{}'", + ts.dataset, prior.row_key_column, ts.row_key_column, + )); + } + } + let line = serde_json::to_vec(ts).map_err(|e| e.to_string())?; + let log = self.log_for(&ts.dataset).await; + log.append(line).await?; + // Belt-and-suspenders: explicit flush in case the threshold is + // ever raised. Tombstones must be durable on return. + log.flush().await + } + + /// All tombstones for a dataset (chronological). + pub async fn list(&self, dataset: &str) -> Result, String> { + let log = self.log_for(dataset).await; + let lines = log.read_all().await?; + let mut out = Vec::with_capacity(lines.len()); + for line in lines { + match serde_json::from_slice::(&line) { + Ok(t) => out.push(t), + Err(e) => tracing::warn!("tombstones/{}: skip malformed entry: {e}", dataset), + } + } + Ok(out) + } + + /// Per-dataset grouped view used by queryd — returns a map of + /// `{dataset -> (row_key_column, set_of_values)}` for every dataset + /// that has any tombstones. + pub async fn all_grouped( + &self, + datasets: &[String], + ) -> Result)>, String> { + let mut grouped = HashMap::new(); + for dataset in datasets { + let ts = match self.list(dataset).await { + Ok(ts) => ts, + Err(_) => continue, + }; + if ts.is_empty() { continue; } + let col = ts[0].row_key_column.clone(); + let values: Vec = ts.iter().map(|t| t.row_key_value.clone()).collect(); + grouped.insert(dataset.clone(), (col, values)); + } + Ok(grouped) + } + + pub async fn compact(&self, dataset: &str) -> Result { + let log = self.log_for(dataset).await; + log.compact().await + } +} diff --git a/crates/queryd/src/context.rs b/crates/queryd/src/context.rs index 0d899e5..902d07a 100644 --- a/crates/queryd/src/context.rs +++ b/crates/queryd/src/context.rs @@ -117,6 +117,19 @@ impl QueryEngine { async fn build_context(&self) -> Result { let ctx = SessionContext::new(); + // Phase E: snapshot tombstones by dataset before registering tables + // so we can wrap tombstoned tables in a filter view. The underlying + // base table is registered under an internal name `__raw__{dataset}` + // and the public `{dataset}` name becomes the filtered view. + let all_dataset_names: Vec = self.registry.list().await + .iter().map(|d| d.name.clone()).collect(); + let tombstones_by_dataset = self + .registry + .tombstones() + .all_grouped(&all_dataset_names) + .await + .unwrap_or_default(); + // Federation layer 2: register every configured bucket as its own // DataFusion ObjectStore under a distinct URL scheme. Each // dataset's ObjectRef.bucket determines which store DataFusion @@ -186,17 +199,57 @@ impl QueryEngine { let table = ListingTable::try_new(config) .map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?; - // Tolerate duplicate manifest entries for the same name — - // pre-existing pipeline::ingest_file behavior creates a fresh - // dataset id on every ingest. First registration wins; later - // ones are skipped with a warning rather than failing the - // whole context build. - if let Err(e) = ctx.register_table(&dataset.name, Arc::new(table)) { + // Decide the registration name: if this dataset has any + // tombstones, the raw table gets an internal name and the + // public name becomes a filtered view. + let tombstone_entry = tombstones_by_dataset.get(&dataset.name); + let register_name = if tombstone_entry.is_some() { + format!("__raw__{}", dataset.name) + } else { + dataset.name.clone() + }; + + if let Err(e) = ctx.register_table(register_name.as_str(), Arc::new(table)) { let msg = e.to_string(); if msg.contains("already exists") { - tracing::debug!("skip duplicate manifest registration: {}", dataset.name); + tracing::debug!("skip duplicate manifest registration: {}", register_name); + continue; } else { - return Err(format!("table registration failed for {}: {}", dataset.name, msg)); + return Err(format!("table registration failed for {}: {}", register_name, msg)); + } + } + + // If there are tombstones, register the public name as a + // filtered view that excludes tombstoned row_key_values. + if let Some((key_col, values)) = tombstone_entry { + // Build WHERE NOT IN (...) — quote values to be SQL-safe. + // For string keys this is a literal list; for integer keys + // CAST(col AS VARCHAR) makes the comparison unambiguous. + let quoted: Vec = values.iter() + .map(|v| format!("'{}'", v.replace('\'', "''"))) + .collect(); + let sql = format!( + "SELECT * FROM \"{}\" WHERE CAST(\"{}\" AS VARCHAR) NOT IN ({})", + register_name, key_col, quoted.join(", "), + ); + tracing::debug!( + "tombstone filter for '{}': {} row_keys excluded", + dataset.name, values.len(), + ); + match ctx.sql(&sql).await { + Ok(df) => { + if let Err(e) = ctx.register_table(dataset.name.as_str(), df.into_view()) { + let msg = e.to_string(); + if msg.contains("already exists") { + tracing::debug!("skip duplicate tombstone view: {}", dataset.name); + } else { + tracing::warn!("tombstone view registration failed for {}: {msg}", dataset.name); + } + } + } + Err(e) => { + tracing::warn!("tombstone view SQL failed for '{}': {e}", dataset.name); + } } } } diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index 512486f..6e3f0b4 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -196,6 +196,35 @@ pub struct AiView { pub description: String, } +/// Soft-delete marker (Phase E). +/// +/// Tombstones live beside the dataset in `_catalog/tombstones/{dataset}/` +/// as append-log JSONL. Query-time filter in queryd reads all tombstones +/// for each dataset and wraps the base table in a DataFusion view that +/// excludes tombstoned rows. Physical deletion happens later (compaction), +/// so the row count immediately reflects the delete but data is still on +/// disk until compact runs. That's deliberate — it gives a reversal +/// window and keeps the event journal audit trail intact. +/// +/// All tombstones for a given dataset must use the same `row_key_column` +/// (enforced on write); otherwise the query-time filter can't be built +/// as a single WHERE clause. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Tombstone { + pub dataset: String, + /// Column name that identifies the row (e.g. "candidate_id"). + pub row_key_column: String, + /// Value of that column for the tombstoned row. + pub row_key_value: String, + pub deleted_at: chrono::DateTime, + /// Human / system actor responsible for the delete (audit). + #[serde(default)] + pub actor: String, + /// Why (e.g. "GDPR request #1234", "user-requested erasure"). + #[serde(default)] + pub reason: String, +} + /// How a column's values should be transformed before being returned. /// `Mask` is the most common — keeps a few visible chars, replaces the /// rest with `*`. `Hash` returns SHA-256 of the value for join keys you diff --git a/docs/PHASES.md b/docs/PHASES.md index c4fe636..a939914 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -154,6 +154,15 @@ - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. +- [x] Phase E: Soft deletes (tombstones) — 2026-04-16 + - `shared::types::Tombstone` — { dataset, row_key_column, row_key_value, deleted_at, actor, reason } + - `catalogd::tombstones::TombstoneStore` per-dataset append-log at `_catalog/tombstones/{dataset}/`, flush_threshold=1 + explicit flush so every tombstone is durable on return (compliance requirement) + - All tombstones for a dataset must share the same `row_key_column` (validated at write — query filter is built as a single WHERE NOT IN clause) + - `Registry::add_tombstone / list_tombstones` + - Endpoint: `POST /catalog/datasets/by-name/{name}/tombstone` accepting `{row_key_column, row_key_values[], actor, reason}`; companion `GET` lists active tombstones + - `queryd::context::build_context` wraps tombstoned tables: raw goes to `__raw__{name}`, public name becomes a DataFusion view with `WHERE CAST(col AS VARCHAR) NOT IN (...)` filter + - End-to-end on candidates: tombstone 3 IDs, COUNT drops 100,000 → 99,997, specific WHERE returns empty, AiView candidates_safe transitively excludes them too, restart preserves all tombstones + - Limits / not in MVP: physical compaction (Phase 8 doesn't yet read tombstones during merge); journal integration (tombstones don't yet emit Phase 9 mutation events — covered by audit fields on the tombstone itself) - [x] Phase D: AI-safe views — 2026-04-16 - `shared::types::AiView` — name, base_dataset, columns whitelist, optional row_filter, column_redactions - `shared::types::Redaction` — Null | Hash | Mask { keep_prefix, keep_suffix }