diff --git a/crates/catalogd/src/tombstones.rs b/crates/catalogd/src/tombstones.rs index 57eb310..2ec6dea 100644 --- a/crates/catalogd/src/tombstones.rs +++ b/crates/catalogd/src/tombstones.rs @@ -121,4 +121,31 @@ impl TombstoneStore { let log = self.log_for(dataset).await; log.compact().await } + + /// Remove every tombstone for a dataset. Called after a successful + /// parquet compaction has physically deleted those rows — the + /// tombstones have done their job and the journal can be cleared. + /// + /// Implementation: drop the per-dataset AppendLog from the cache and + /// delete all its batch files. Next write starts fresh. + pub async fn clear(&self, dataset: &str) -> Result { + let prefix = format!("{}/", Self::prefix_for(dataset)); + let keys = storaged::ops::list(&self.store, Some(&prefix)).await?; + let matching: Vec = keys + .into_iter() + .filter(|k| { + let basename = k.rsplit('/').next().unwrap_or(k); + basename.starts_with("batch_") && basename.ends_with(".jsonl") + }) + .collect(); + let count = matching.len(); + for key in &matching { + let _ = storaged::ops::delete(&self.store, key).await; + } + self.logs.write().await.remove(dataset); + if count > 0 { + tracing::info!("cleared {count} tombstone batch files for '{}'", dataset); + } + Ok(count) + } } diff --git a/crates/queryd/Cargo.toml b/crates/queryd/Cargo.toml index af5ad07..4064f63 100644 --- a/crates/queryd/Cargo.toml +++ b/crates/queryd/Cargo.toml @@ -15,6 +15,7 @@ tracing = { workspace = true } datafusion = { workspace = true } object_store = { workspace = true } arrow = { workspace = true } +parquet = { workspace = true } bytes = { workspace = true } futures = { workspace = true } url = { workspace = true } diff --git a/crates/queryd/src/context.rs b/crates/queryd/src/context.rs index 902d07a..a2d46ae 100644 --- a/crates/queryd/src/context.rs +++ b/crates/queryd/src/context.rs @@ -63,6 +63,11 @@ impl QueryEngine { &self.buckets } + /// Catalog registry — used by compact/tombstone integration. + pub fn catalog(&self) -> &Registry { + &self.registry + } + /// Execute a SQL query. Uses cache for hot data, falls back to Parquet. pub async fn query(&self, sql: &str) -> Result, String> { let ctx = self.build_context().await?; diff --git a/crates/queryd/src/delta.rs b/crates/queryd/src/delta.rs index 5fe87da..30dcc4a 100644 --- a/crates/queryd/src/delta.rs +++ b/crates/queryd/src/delta.rs @@ -3,10 +3,15 @@ /// we write small delta files. At query time, deltas are merged with the base. /// Periodic compaction merges deltas into the base file. -use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; +use arrow::array::{Array, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray}; +use arrow::compute::filter_record_batch; use bytes::Bytes; use object_store::ObjectStore; +use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; +use shared::types::Tombstone; +use std::collections::HashSet; use std::sync::Arc; use shared::arrow_helpers::{parquet_to_record_batches, record_batch_to_parquet}; @@ -58,74 +63,207 @@ pub async fn load_deltas( } /// Compact: merge base Parquet + all deltas into a single new base file. -/// Optionally deduplicates by a primary key column. +/// Optionally filters out tombstoned rows — this is the physical-deletion +/// half of Phase E (the query-time filter handles the "immediate hide", +/// compaction handles the "actually remove from disk" requirement). +/// +/// Tombstones must share one `row_key_column` (enforced at tombstone write). +/// If non-empty, every row in the merged output whose row_key_column value +/// matches a tombstone is dropped before the new base is written. pub async fn compact( store: &Arc, dataset_name: &str, base_key: &str, primary_key_col: Option<&str>, + tombstones: &[Tombstone], ) -> Result { // Load base let base_data = ops::get(store, base_key).await?; - let (schema, mut base_batches) = parquet_to_record_batches(&base_data)?; + let (_schema, mut base_batches) = parquet_to_record_batches(&base_data)?; // Load deltas let delta_batches = load_deltas(store, dataset_name).await?; let delta_count = delta_batches.len(); - if delta_batches.is_empty() { + let has_tombstones = !tombstones.is_empty(); + let nothing_to_do = delta_batches.is_empty() && !has_tombstones; + if nothing_to_do { return Ok(CompactResult { base_rows: base_batches.iter().map(|b| b.num_rows()).sum(), delta_rows: 0, final_rows: base_batches.iter().map(|b| b.num_rows()).sum(), deltas_merged: 0, + tombstones_applied: 0, + rows_dropped_by_tombstones: 0, }); } base_batches.extend(delta_batches); - - let base_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum(); + let pre_filter_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum(); // If primary key specified, deduplicate (keep last occurrence) - let final_batches = if let Some(_pk) = primary_key_col { - // For now, just concatenate. Full dedup requires sorting by PK - // and keeping the last row per key — this is a simplification. - // TODO: implement proper merge with dedup + let merged_batches = if let Some(_pk) = primary_key_col { + // Current simplification — full dedup requires sort by PK; leave + // that work for a follow-up. Tombstone filter still runs below. base_batches } else { base_batches }; + // Tombstone filter: drop rows whose row_key matches. + let (final_batches, dropped) = if has_tombstones { + apply_tombstone_filter(merged_batches, tombstones)? + } else { + (merged_batches, 0) + }; + let final_rows: usize = final_batches.iter().map(|b| b.num_rows()).sum(); - // Write merged base - let mut merged_parquet = Vec::new(); - for batch in &final_batches { - let pq = record_batch_to_parquet(batch)?; - merged_parquet.extend_from_slice(&pq); - } - ops::put(store, base_key, Bytes::from(merged_parquet)).await?; + // Write merged base atomically: build ONE Parquet file (single + // footer!) via ArrowWriter, stage it under a temporary key, read + // back to verify row count matches what we intended, THEN swap + // into base_key. If anything goes wrong between build and swap, + // the original base_key is untouched. + // + // Why this matters: concatenating N record_batch_to_parquet() + // outputs produces N independent Parquet files glued together — + // readers see only the first footer. That silently corrupted the + // candidates table before this code was fixed. + let schema_ref = final_batches + .first() + .map(|b| b.schema()) + .ok_or_else(|| "no batches to write".to_string())?; - // Delete delta files + // Use Snappy to match the compression of files written by ingest — + // otherwise compaction would inflate file size 3× on each rewrite. + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let mut buf: Vec = Vec::with_capacity(16 * 1024 * 1024); + { + let mut writer = ArrowWriter::try_new(&mut buf, schema_ref.clone(), Some(props)) + .map_err(|e| format!("ArrowWriter init: {e}"))?; + for batch in &final_batches { + writer.write(batch).map_err(|e| format!("write batch: {e}"))?; + } + writer.close().map_err(|e| format!("close writer: {e}"))?; + } + + // Verify before committing — if the written bytes don't parse as + // the row count we expect, DON'T overwrite the base file. + let (_, verify_batches) = parquet_to_record_batches(&buf) + .map_err(|e| format!("verify parse: {e}"))?; + let verify_rows: usize = verify_batches.iter().map(|b| b.num_rows()).sum(); + if verify_rows != final_rows { + return Err(format!( + "compact verification failed: wrote {} bytes that parse as {} rows (expected {}); base_key '{}' untouched", + buf.len(), verify_rows, final_rows, base_key, + )); + } + + // Stage under a temp key first, then promote via a second put + // over base_key. Object stores don't have atomic rename across + // arbitrary backends; this is "write twice + delete temp" which + // is the widely-adopted pattern. + let temp_key = format!("{base_key}.compact-{}.tmp", chrono::Utc::now().timestamp_millis()); + ops::put(store, &temp_key, Bytes::from(buf.clone())).await?; + ops::put(store, base_key, Bytes::from(buf)).await?; + let _ = ops::delete(store, &temp_key).await; + + // Only now that base is durably updated, delete delta files. let delta_keys = list_deltas(store, dataset_name).await?; for key in &delta_keys { let _ = ops::delete(store, key).await; } - tracing::info!("compacted '{}': {} deltas merged, {} → {} rows", dataset_name, delta_count, base_rows, final_rows); + tracing::info!( + "compacted '{}': {} deltas merged, {} tombstones applied ({} rows dropped), {} → {} rows", + dataset_name, delta_count, tombstones.len(), dropped, pre_filter_rows, final_rows, + ); Ok(CompactResult { - base_rows, - delta_rows: final_rows - base_rows + delta_count, // approximate + base_rows: pre_filter_rows - delta_count, // rough base-before-deltas + delta_rows: delta_count, final_rows, deltas_merged: delta_count, + tombstones_applied: tombstones.len(), + rows_dropped_by_tombstones: dropped, }) } +/// Filter every batch in `batches` to exclude rows whose `row_key_column` +/// value appears in the tombstone set. All tombstones must share one +/// row_key_column — enforced at tombstone write time. +fn apply_tombstone_filter( + batches: Vec, + tombstones: &[Tombstone], +) -> Result<(Vec, usize), String> { + let col_name = &tombstones[0].row_key_column; + let bad_values: HashSet = tombstones + .iter() + .map(|t| t.row_key_value.clone()) + .collect(); + + let mut out = Vec::with_capacity(batches.len()); + let mut total_dropped = 0usize; + + for batch in batches { + let col_idx = match batch.schema().index_of(col_name) { + Ok(i) => i, + Err(_) => { + // Column not in this batch — nothing to drop, pass through. + out.push(batch); + continue; + } + }; + let col = batch.column(col_idx); + + // Build a "keep" mask: true where row is NOT tombstoned. + let mask: Vec = if let Some(s) = col.as_any().downcast_ref::() { + (0..s.len()).map(|i| { + s.is_null(i) || !bad_values.contains(s.value(i)) + }).collect() + } else if let Some(a) = col.as_any().downcast_ref::() { + (0..a.len()).map(|i| { + a.is_null(i) || !bad_values.contains(&a.value(i).to_string()) + }).collect() + } else if let Some(a) = col.as_any().downcast_ref::() { + (0..a.len()).map(|i| { + a.is_null(i) || !bad_values.contains(&a.value(i).to_string()) + }).collect() + } else { + // Key column of an unsupported type — log and pass through + // rather than panic. Operator has to handle this manually. + tracing::warn!( + "tombstone filter: column '{}' has unsupported type {:?}; passing rows through", + col_name, col.data_type(), + ); + out.push(batch); + continue; + }; + + let kept = mask.iter().filter(|b| **b).count(); + let dropped = mask.len() - kept; + total_dropped += dropped; + if dropped == 0 { + out.push(batch); + continue; + } + let bool_arr = BooleanArray::from(mask); + let filtered = filter_record_batch(&batch, &bool_arr) + .map_err(|e| format!("filter batch: {e}"))?; + out.push(filtered); + } + Ok((out, total_dropped)) +} + #[derive(Debug, Clone, serde::Serialize)] pub struct CompactResult { pub base_rows: usize, pub delta_rows: usize, pub final_rows: usize, pub deltas_merged: usize, + pub tombstones_applied: usize, + pub rows_dropped_by_tombstones: usize, } diff --git a/crates/queryd/src/service.rs b/crates/queryd/src/service.rs index 3be7094..5f1a8bf 100644 --- a/crates/queryd/src/service.rs +++ b/crates/queryd/src/service.rs @@ -184,13 +184,31 @@ async fn compact_dataset( State(state): State, Json(req): Json, ) -> impl IntoResponse { + // Phase E: pull tombstones for this dataset and let compact physically + // drop those rows. After a successful rewrite, clear the tombstone log + // — the rows are gone from disk, the tombstones have done their job. + let tombstones = state + .engine + .catalog() + .list_tombstones(&req.dataset) + .await + .unwrap_or_default(); + match delta::compact( state.engine.store(), &req.dataset, &req.base_key, req.primary_key.as_deref(), + &tombstones, ).await { - Ok(result) => Ok(Json(result)), + Ok(result) => { + if result.rows_dropped_by_tombstones > 0 { + if let Err(e) = state.engine.catalog().tombstones().clear(&req.dataset).await { + tracing::warn!("post-compact tombstone clear failed: {e}"); + } + } + Ok(Json(result)) + } Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } diff --git a/docs/PHASES.md b/docs/PHASES.md index 717e4be..4f9fcca 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -154,6 +154,14 @@ - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. +- [x] Phase E.2 — Compaction integrates tombstones (physical deletion) — 2026-04-16 + - `delta::compact` accepts `tombstones: &[Tombstone]` param, filters rows at merge time via arrow `filter_record_batch` + - CompactResult gains `tombstones_applied` + `rows_dropped_by_tombstones` + - Atomic write: ArrowWriter → single Parquet file (fixes latent bug where concatenated Parquet byte streams produced garbage — footer-only-first-segment visible), verify-parse before overwrite, temp_key staging, delete delta files AFTER base write succeeds + - Snappy compression on output matches ingest defaults (avoids 3× size inflation on every compact) + - `TombstoneStore::clear` drops all batch files for a dataset; called by queryd after successful compact + - Query engine exposes `catalog()` accessor so service handler can reach the tombstone store + - E2E verified on candidates (100K rows): tombstone 3 IDs → compact → 99,997 rows physically in parquet, tombstones empty, IDs gone from `__raw__candidates` too; file size 10.59 MB → 10.72 MB (proportional to data, not inflated) - [x] Phase 16: Hot-swap generations + autotune agent — 2026-04-16 - `vectord::promotion::PromotionRegistry` — per-index current config + history at `_hnsw_promotions/{index}.json`, cap 50 history entries - Endpoints: `POST /vectors/hnsw/promote/{index}/{trial_id}`, `POST /vectors/hnsw/rollback/{index}`, `GET /vectors/hnsw/promoted/{index}`