Phase E.2: Compaction integrates tombstones — physical deletion closes GDPR loop

Phase E gave us soft-delete at query time (tombstones hide rows via a
DataFusion filter view). This completes the invariant: after compact,
tombstoned rows are PHYSICALLY absent from the parquet on disk.

delta::compact changes:
- Signature adds tombstones: &[Tombstone]
- After merging base + deltas, apply_tombstone_filter builds a
  BooleanArray keep-mask per batch (True where row_key_value is NOT
  in the tombstone set) and applies arrow::compute::filter_record_batch
- Supports Utf8, Int32, Int64 key columns (matches refresh.rs coverage
  for pg- and csv-derived schemas)
- CompactResult gains tombstones_applied + rows_dropped_by_tombstones
- Caller clears tombstone store on success

Critical correctness fix surfaced during E2E testing:
The original Phase 8 compact concatenated N independent Parquet byte
streams from record_batch_to_parquet() — each with its own footer.
Parquet readers only see the FIRST footer's data; the rest is invisible.
Latent since Phase 8 shipped; triggered by tombstone-filtering produc-
ing multiple batches. Corrupted candidates.parquet on first test run
(restored from UI fixture copy — good argument for test data in repo).

Fix:
- Single ArrowWriter per compaction, writes every batch into one
  properly-footered Parquet
- Snappy compression to match ingest defaults (otherwise rewrite
  inflated file 3× — 10.5MB → 34MB — because no compression was set)
- Verify-before-swap: parse written buf back to confirm row count
  matches expected; refuses to overwrite base_key if verification fails
- Write to {base_key}.compact-{ts}.tmp first, then to base_key; delete
  temp; only then delete delta files. Any error along the way leaves
  the original base intact.

TombstoneStore::clear(dataset) drops all tombstone batch files and
evicts the per-dataset AppendLog from cache. Called after successful
compact.

QueryEngine::catalog() accessor exposes the Registry so queryd
handlers can reach the tombstone store without routing through gateway
state.

E2E on candidates (100K rows, 15 cols):
- Baseline: 10.59 MB, 100000 rows
- Tombstone CAND-000001/2/3 (soft-delete): 99997 visible, 100000 raw
- Compact: tombstones_applied=3, rows_dropped=3, final_rows=99997
- Post: 10.72 MB (Snappy), valid parquet (1 row_group), 99997 rows
- Restart: persists, tombstones list empty, __raw__candidates also
  99997 (the 3 IDs are physically gone from disk)

PRD invariant close: deletion is now actually deletion, not just
masking. GDPR erasure request → tombstone + schedule compact → data
gone.

Deferred:
- Compact-all-datasets cron (currently manual per-dataset via
  POST /query/compact)
- Compaction of tombstone batch files themselves (they grow at
  flush_threshold=1 per tombstone; TombstoneStore::compact exists
  but not auto-called)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 10:38:30 -05:00
parent 4d5c49090c
commit 4e1c400f5d
6 changed files with 220 additions and 23 deletions

View File

@ -121,4 +121,31 @@ impl TombstoneStore {
let log = self.log_for(dataset).await;
log.compact().await
}
/// Remove every tombstone for a dataset. Called after a successful
/// parquet compaction has physically deleted those rows — the
/// tombstones have done their job and the journal can be cleared.
///
/// Implementation: drop the per-dataset AppendLog from the cache and
/// delete all its batch files. Next write starts fresh.
pub async fn clear(&self, dataset: &str) -> Result<usize, String> {
let prefix = format!("{}/", Self::prefix_for(dataset));
let keys = storaged::ops::list(&self.store, Some(&prefix)).await?;
let matching: Vec<String> = keys
.into_iter()
.filter(|k| {
let basename = k.rsplit('/').next().unwrap_or(k);
basename.starts_with("batch_") && basename.ends_with(".jsonl")
})
.collect();
let count = matching.len();
for key in &matching {
let _ = storaged::ops::delete(&self.store, key).await;
}
self.logs.write().await.remove(dataset);
if count > 0 {
tracing::info!("cleared {count} tombstone batch files for '{}'", dataset);
}
Ok(count)
}
}

View File

@ -15,6 +15,7 @@ tracing = { workspace = true }
datafusion = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
parquet = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
url = { workspace = true }

View File

@ -63,6 +63,11 @@ impl QueryEngine {
&self.buckets
}
/// Catalog registry — used by compact/tombstone integration.
pub fn catalog(&self) -> &Registry {
&self.registry
}
/// Execute a SQL query. Uses cache for hot data, falls back to Parquet.
pub async fn query(&self, sql: &str) -> Result<Vec<arrow::array::RecordBatch>, String> {
let ctx = self.build_context().await?;

View File

@ -3,10 +3,15 @@
/// we write small delta files. At query time, deltas are merged with the base.
/// Periodic compaction merges deltas into the base file.
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use arrow::array::{Array, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
use arrow::compute::filter_record_batch;
use bytes::Bytes;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use shared::types::Tombstone;
use std::collections::HashSet;
use std::sync::Arc;
use shared::arrow_helpers::{parquet_to_record_batches, record_batch_to_parquet};
@ -58,74 +63,207 @@ pub async fn load_deltas(
}
/// Compact: merge base Parquet + all deltas into a single new base file.
/// Optionally deduplicates by a primary key column.
/// Optionally filters out tombstoned rows — this is the physical-deletion
/// half of Phase E (the query-time filter handles the "immediate hide",
/// compaction handles the "actually remove from disk" requirement).
///
/// Tombstones must share one `row_key_column` (enforced at tombstone write).
/// If non-empty, every row in the merged output whose row_key_column value
/// matches a tombstone is dropped before the new base is written.
pub async fn compact(
store: &Arc<dyn ObjectStore>,
dataset_name: &str,
base_key: &str,
primary_key_col: Option<&str>,
tombstones: &[Tombstone],
) -> Result<CompactResult, String> {
// Load base
let base_data = ops::get(store, base_key).await?;
let (schema, mut base_batches) = parquet_to_record_batches(&base_data)?;
let (_schema, mut base_batches) = parquet_to_record_batches(&base_data)?;
// Load deltas
let delta_batches = load_deltas(store, dataset_name).await?;
let delta_count = delta_batches.len();
if delta_batches.is_empty() {
let has_tombstones = !tombstones.is_empty();
let nothing_to_do = delta_batches.is_empty() && !has_tombstones;
if nothing_to_do {
return Ok(CompactResult {
base_rows: base_batches.iter().map(|b| b.num_rows()).sum(),
delta_rows: 0,
final_rows: base_batches.iter().map(|b| b.num_rows()).sum(),
deltas_merged: 0,
tombstones_applied: 0,
rows_dropped_by_tombstones: 0,
});
}
base_batches.extend(delta_batches);
let base_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum();
let pre_filter_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum();
// If primary key specified, deduplicate (keep last occurrence)
let final_batches = if let Some(_pk) = primary_key_col {
// For now, just concatenate. Full dedup requires sorting by PK
// and keeping the last row per key — this is a simplification.
// TODO: implement proper merge with dedup
let merged_batches = if let Some(_pk) = primary_key_col {
// Current simplification — full dedup requires sort by PK; leave
// that work for a follow-up. Tombstone filter still runs below.
base_batches
} else {
base_batches
};
// Tombstone filter: drop rows whose row_key matches.
let (final_batches, dropped) = if has_tombstones {
apply_tombstone_filter(merged_batches, tombstones)?
} else {
(merged_batches, 0)
};
let final_rows: usize = final_batches.iter().map(|b| b.num_rows()).sum();
// Write merged base
let mut merged_parquet = Vec::new();
for batch in &final_batches {
let pq = record_batch_to_parquet(batch)?;
merged_parquet.extend_from_slice(&pq);
}
ops::put(store, base_key, Bytes::from(merged_parquet)).await?;
// Write merged base atomically: build ONE Parquet file (single
// footer!) via ArrowWriter, stage it under a temporary key, read
// back to verify row count matches what we intended, THEN swap
// into base_key. If anything goes wrong between build and swap,
// the original base_key is untouched.
//
// Why this matters: concatenating N record_batch_to_parquet()
// outputs produces N independent Parquet files glued together —
// readers see only the first footer. That silently corrupted the
// candidates table before this code was fixed.
let schema_ref = final_batches
.first()
.map(|b| b.schema())
.ok_or_else(|| "no batches to write".to_string())?;
// Delete delta files
// Use Snappy to match the compression of files written by ingest —
// otherwise compaction would inflate file size 3× on each rewrite.
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut buf: Vec<u8> = Vec::with_capacity(16 * 1024 * 1024);
{
let mut writer = ArrowWriter::try_new(&mut buf, schema_ref.clone(), Some(props))
.map_err(|e| format!("ArrowWriter init: {e}"))?;
for batch in &final_batches {
writer.write(batch).map_err(|e| format!("write batch: {e}"))?;
}
writer.close().map_err(|e| format!("close writer: {e}"))?;
}
// Verify before committing — if the written bytes don't parse as
// the row count we expect, DON'T overwrite the base file.
let (_, verify_batches) = parquet_to_record_batches(&buf)
.map_err(|e| format!("verify parse: {e}"))?;
let verify_rows: usize = verify_batches.iter().map(|b| b.num_rows()).sum();
if verify_rows != final_rows {
return Err(format!(
"compact verification failed: wrote {} bytes that parse as {} rows (expected {}); base_key '{}' untouched",
buf.len(), verify_rows, final_rows, base_key,
));
}
// Stage under a temp key first, then promote via a second put
// over base_key. Object stores don't have atomic rename across
// arbitrary backends; this is "write twice + delete temp" which
// is the widely-adopted pattern.
let temp_key = format!("{base_key}.compact-{}.tmp", chrono::Utc::now().timestamp_millis());
ops::put(store, &temp_key, Bytes::from(buf.clone())).await?;
ops::put(store, base_key, Bytes::from(buf)).await?;
let _ = ops::delete(store, &temp_key).await;
// Only now that base is durably updated, delete delta files.
let delta_keys = list_deltas(store, dataset_name).await?;
for key in &delta_keys {
let _ = ops::delete(store, key).await;
}
tracing::info!("compacted '{}': {} deltas merged, {} → {} rows", dataset_name, delta_count, base_rows, final_rows);
tracing::info!(
"compacted '{}': {} deltas merged, {} tombstones applied ({} rows dropped), {} → {} rows",
dataset_name, delta_count, tombstones.len(), dropped, pre_filter_rows, final_rows,
);
Ok(CompactResult {
base_rows,
delta_rows: final_rows - base_rows + delta_count, // approximate
base_rows: pre_filter_rows - delta_count, // rough base-before-deltas
delta_rows: delta_count,
final_rows,
deltas_merged: delta_count,
tombstones_applied: tombstones.len(),
rows_dropped_by_tombstones: dropped,
})
}
/// Filter every batch in `batches` to exclude rows whose `row_key_column`
/// value appears in the tombstone set. All tombstones must share one
/// row_key_column — enforced at tombstone write time.
fn apply_tombstone_filter(
batches: Vec<RecordBatch>,
tombstones: &[Tombstone],
) -> Result<(Vec<RecordBatch>, usize), String> {
let col_name = &tombstones[0].row_key_column;
let bad_values: HashSet<String> = tombstones
.iter()
.map(|t| t.row_key_value.clone())
.collect();
let mut out = Vec::with_capacity(batches.len());
let mut total_dropped = 0usize;
for batch in batches {
let col_idx = match batch.schema().index_of(col_name) {
Ok(i) => i,
Err(_) => {
// Column not in this batch — nothing to drop, pass through.
out.push(batch);
continue;
}
};
let col = batch.column(col_idx);
// Build a "keep" mask: true where row is NOT tombstoned.
let mask: Vec<bool> = if let Some(s) = col.as_any().downcast_ref::<StringArray>() {
(0..s.len()).map(|i| {
s.is_null(i) || !bad_values.contains(s.value(i))
}).collect()
} else if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
(0..a.len()).map(|i| {
a.is_null(i) || !bad_values.contains(&a.value(i).to_string())
}).collect()
} else if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
(0..a.len()).map(|i| {
a.is_null(i) || !bad_values.contains(&a.value(i).to_string())
}).collect()
} else {
// Key column of an unsupported type — log and pass through
// rather than panic. Operator has to handle this manually.
tracing::warn!(
"tombstone filter: column '{}' has unsupported type {:?}; passing rows through",
col_name, col.data_type(),
);
out.push(batch);
continue;
};
let kept = mask.iter().filter(|b| **b).count();
let dropped = mask.len() - kept;
total_dropped += dropped;
if dropped == 0 {
out.push(batch);
continue;
}
let bool_arr = BooleanArray::from(mask);
let filtered = filter_record_batch(&batch, &bool_arr)
.map_err(|e| format!("filter batch: {e}"))?;
out.push(filtered);
}
Ok((out, total_dropped))
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct CompactResult {
pub base_rows: usize,
pub delta_rows: usize,
pub final_rows: usize,
pub deltas_merged: usize,
pub tombstones_applied: usize,
pub rows_dropped_by_tombstones: usize,
}

View File

@ -184,13 +184,31 @@ async fn compact_dataset(
State(state): State<QueryState>,
Json(req): Json<CompactRequest>,
) -> impl IntoResponse {
// Phase E: pull tombstones for this dataset and let compact physically
// drop those rows. After a successful rewrite, clear the tombstone log
// — the rows are gone from disk, the tombstones have done their job.
let tombstones = state
.engine
.catalog()
.list_tombstones(&req.dataset)
.await
.unwrap_or_default();
match delta::compact(
state.engine.store(),
&req.dataset,
&req.base_key,
req.primary_key.as_deref(),
&tombstones,
).await {
Ok(result) => Ok(Json(result)),
Ok(result) => {
if result.rows_dropped_by_tombstones > 0 {
if let Err(e) = state.engine.catalog().tombstones().clear(&req.dataset).await {
tracing::warn!("post-compact tombstone clear failed: {e}");
}
}
Ok(Json(result))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -154,6 +154,14 @@
- `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack
- 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard
- Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling.
- [x] Phase E.2 — Compaction integrates tombstones (physical deletion) — 2026-04-16
- `delta::compact` accepts `tombstones: &[Tombstone]` param, filters rows at merge time via arrow `filter_record_batch`
- CompactResult gains `tombstones_applied` + `rows_dropped_by_tombstones`
- Atomic write: ArrowWriter → single Parquet file (fixes latent bug where concatenated Parquet byte streams produced garbage — footer-only-first-segment visible), verify-parse before overwrite, temp_key staging, delete delta files AFTER base write succeeds
- Snappy compression on output matches ingest defaults (avoids 3× size inflation on every compact)
- `TombstoneStore::clear` drops all batch files for a dataset; called by queryd after successful compact
- Query engine exposes `catalog()` accessor so service handler can reach the tombstone store
- E2E verified on candidates (100K rows): tombstone 3 IDs → compact → 99,997 rows physically in parquet, tombstones empty, IDs gone from `__raw__candidates` too; file size 10.59 MB → 10.72 MB (proportional to data, not inflated)
- [x] Phase 16: Hot-swap generations + autotune agent — 2026-04-16
- `vectord::promotion::PromotionRegistry` — per-index current config + history at `_hnsw_promotions/{index}.json`, cap 50 history entries
- Endpoints: `POST /vectors/hnsw/promote/{index}/{trial_id}`, `POST /vectors/hnsw/rollback/{index}`, `GET /vectors/hnsw/promoted/{index}`