/// Delta store for incremental updates. /// Instead of rewriting an entire Parquet file to change one row, /// we write small delta files. At query time, deltas are merged with the base. /// Periodic compaction merges deltas into the base file. use arrow::array::{Array, BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray}; use arrow::compute::filter_record_batch; use bytes::Bytes; use object_store::ObjectStore; use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use shared::types::Tombstone; use std::collections::HashSet; use std::sync::Arc; use shared::arrow_helpers::{parquet_to_record_batches, record_batch_to_parquet}; use storaged::ops; /// Write a delta file for a dataset (new/updated rows). pub async fn write_delta( store: &Arc, dataset_name: &str, batch: &RecordBatch, ) -> Result { let ts = chrono::Utc::now().timestamp_millis(); let key = format!("deltas/{dataset_name}/{ts}.parquet"); let parquet = record_batch_to_parquet(batch)?; ops::put(store, &key, parquet).await?; tracing::info!("wrote delta for '{}': {} rows at {}", dataset_name, batch.num_rows(), key); Ok(key) } /// List all delta files for a dataset. pub async fn list_deltas( store: &Arc, dataset_name: &str, ) -> Result, String> { let prefix = format!("deltas/{dataset_name}/"); ops::list(store, Some(&prefix)).await } /// Load all delta batches for a dataset. pub async fn load_deltas( store: &Arc, dataset_name: &str, ) -> Result, String> { let keys = list_deltas(store, dataset_name).await?; let mut all_batches = Vec::new(); for key in &keys { let data = ops::get(store, key).await?; let (_, batches) = parquet_to_record_batches(&data)?; all_batches.extend(batches); } if !all_batches.is_empty() { let total_rows: usize = all_batches.iter().map(|b| b.num_rows()).sum(); tracing::debug!("loaded {} delta files ({} rows) for '{}'", keys.len(), total_rows, dataset_name); } Ok(all_batches) } /// Compact: merge base Parquet + all deltas into a single new base file. /// Optionally filters out tombstoned rows — this is the physical-deletion /// half of Phase E (the query-time filter handles the "immediate hide", /// compaction handles the "actually remove from disk" requirement). /// /// Tombstones must share one `row_key_column` (enforced at tombstone write). /// If non-empty, every row in the merged output whose row_key_column value /// matches a tombstone is dropped before the new base is written. pub async fn compact( store: &Arc, dataset_name: &str, base_key: &str, primary_key_col: Option<&str>, tombstones: &[Tombstone], ) -> Result { // Load base let base_data = ops::get(store, base_key).await?; let (_schema, mut base_batches) = parquet_to_record_batches(&base_data)?; // Load deltas let delta_batches = load_deltas(store, dataset_name).await?; let delta_count = delta_batches.len(); let has_tombstones = !tombstones.is_empty(); let nothing_to_do = delta_batches.is_empty() && !has_tombstones; if nothing_to_do { return Ok(CompactResult { base_rows: base_batches.iter().map(|b| b.num_rows()).sum(), delta_rows: 0, final_rows: base_batches.iter().map(|b| b.num_rows()).sum(), deltas_merged: 0, tombstones_applied: 0, rows_dropped_by_tombstones: 0, }); } base_batches.extend(delta_batches); let pre_filter_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum(); // If primary key specified, deduplicate (keep last occurrence) let merged_batches = if let Some(_pk) = primary_key_col { // Current simplification — full dedup requires sort by PK; leave // that work for a follow-up. Tombstone filter still runs below. base_batches } else { base_batches }; // Tombstone filter: drop rows whose row_key matches. let (final_batches, dropped) = if has_tombstones { apply_tombstone_filter(merged_batches, tombstones)? } else { (merged_batches, 0) }; let final_rows: usize = final_batches.iter().map(|b| b.num_rows()).sum(); // Write merged base atomically: build ONE Parquet file (single // footer!) via ArrowWriter, stage it under a temporary key, read // back to verify row count matches what we intended, THEN swap // into base_key. If anything goes wrong between build and swap, // the original base_key is untouched. // // Why this matters: concatenating N record_batch_to_parquet() // outputs produces N independent Parquet files glued together — // readers see only the first footer. That silently corrupted the // candidates table before this code was fixed. let schema_ref = final_batches .first() .map(|b| b.schema()) .ok_or_else(|| "no batches to write".to_string())?; // Use Snappy to match the compression of files written by ingest — // otherwise compaction would inflate file size 3× on each rewrite. let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); let mut buf: Vec = Vec::with_capacity(16 * 1024 * 1024); { let mut writer = ArrowWriter::try_new(&mut buf, schema_ref.clone(), Some(props)) .map_err(|e| format!("ArrowWriter init: {e}"))?; for batch in &final_batches { writer.write(batch).map_err(|e| format!("write batch: {e}"))?; } writer.close().map_err(|e| format!("close writer: {e}"))?; } // Verify before committing — if the written bytes don't parse as // the row count we expect, DON'T overwrite the base file. let (_, verify_batches) = parquet_to_record_batches(&buf) .map_err(|e| format!("verify parse: {e}"))?; let verify_rows: usize = verify_batches.iter().map(|b| b.num_rows()).sum(); if verify_rows != final_rows { return Err(format!( "compact verification failed: wrote {} bytes that parse as {} rows (expected {}); base_key '{}' untouched", buf.len(), verify_rows, final_rows, base_key, )); } // Stage under a temp key first, then promote via a second put // over base_key. Object stores don't have atomic rename across // arbitrary backends; this is "write twice + delete temp" which // is the widely-adopted pattern. let temp_key = format!("{base_key}.compact-{}.tmp", chrono::Utc::now().timestamp_millis()); ops::put(store, &temp_key, Bytes::from(buf.clone())).await?; ops::put(store, base_key, Bytes::from(buf)).await?; let _ = ops::delete(store, &temp_key).await; // Only now that base is durably updated, delete delta files. let delta_keys = list_deltas(store, dataset_name).await?; for key in &delta_keys { let _ = ops::delete(store, key).await; } tracing::info!( "compacted '{}': {} deltas merged, {} tombstones applied ({} rows dropped), {} → {} rows", dataset_name, delta_count, tombstones.len(), dropped, pre_filter_rows, final_rows, ); Ok(CompactResult { base_rows: pre_filter_rows - delta_count, // rough base-before-deltas delta_rows: delta_count, final_rows, deltas_merged: delta_count, tombstones_applied: tombstones.len(), rows_dropped_by_tombstones: dropped, }) } /// Filter every batch in `batches` to exclude rows whose `row_key_column` /// value appears in the tombstone set. All tombstones must share one /// row_key_column — enforced at tombstone write time. fn apply_tombstone_filter( batches: Vec, tombstones: &[Tombstone], ) -> Result<(Vec, usize), String> { let col_name = &tombstones[0].row_key_column; let bad_values: HashSet = tombstones .iter() .map(|t| t.row_key_value.clone()) .collect(); let mut out = Vec::with_capacity(batches.len()); let mut total_dropped = 0usize; for batch in batches { let col_idx = match batch.schema().index_of(col_name) { Ok(i) => i, Err(_) => { // Column not in this batch — nothing to drop, pass through. out.push(batch); continue; } }; let col = batch.column(col_idx); // Build a "keep" mask: true where row is NOT tombstoned. let mask: Vec = if let Some(s) = col.as_any().downcast_ref::() { (0..s.len()).map(|i| { s.is_null(i) || !bad_values.contains(s.value(i)) }).collect() } else if let Some(a) = col.as_any().downcast_ref::() { (0..a.len()).map(|i| { a.is_null(i) || !bad_values.contains(&a.value(i).to_string()) }).collect() } else if let Some(a) = col.as_any().downcast_ref::() { (0..a.len()).map(|i| { a.is_null(i) || !bad_values.contains(&a.value(i).to_string()) }).collect() } else { // Key column of an unsupported type — log and pass through // rather than panic. Operator has to handle this manually. tracing::warn!( "tombstone filter: column '{}' has unsupported type {:?}; passing rows through", col_name, col.data_type(), ); out.push(batch); continue; }; let kept = mask.iter().filter(|b| **b).count(); let dropped = mask.len() - kept; total_dropped += dropped; if dropped == 0 { out.push(batch); continue; } let bool_arr = BooleanArray::from(mask); let filtered = filter_record_batch(&batch, &bool_arr) .map_err(|e| format!("filter batch: {e}"))?; out.push(filtered); } Ok((out, total_dropped)) } #[derive(Debug, Clone, serde::Serialize)] pub struct CompactResult { pub base_rows: usize, pub delta_rows: usize, pub final_rows: usize, pub deltas_merged: usize, pub tombstones_applied: usize, pub rows_dropped_by_tombstones: usize, }