/// Delta store for incremental updates. /// Instead of rewriting an entire Parquet file to change one row, /// we write small delta files. At query time, deltas are merged with the base. /// Periodic compaction merges deltas into the base file. use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use bytes::Bytes; use object_store::ObjectStore; use std::sync::Arc; use shared::arrow_helpers::{parquet_to_record_batches, record_batch_to_parquet}; use storaged::ops; /// Write a delta file for a dataset (new/updated rows). pub async fn write_delta( store: &Arc, dataset_name: &str, batch: &RecordBatch, ) -> Result { let ts = chrono::Utc::now().timestamp_millis(); let key = format!("deltas/{dataset_name}/{ts}.parquet"); let parquet = record_batch_to_parquet(batch)?; ops::put(store, &key, parquet).await?; tracing::info!("wrote delta for '{}': {} rows at {}", dataset_name, batch.num_rows(), key); Ok(key) } /// List all delta files for a dataset. pub async fn list_deltas( store: &Arc, dataset_name: &str, ) -> Result, String> { let prefix = format!("deltas/{dataset_name}/"); ops::list(store, Some(&prefix)).await } /// Load all delta batches for a dataset. pub async fn load_deltas( store: &Arc, dataset_name: &str, ) -> Result, String> { let keys = list_deltas(store, dataset_name).await?; let mut all_batches = Vec::new(); for key in &keys { let data = ops::get(store, key).await?; let (_, batches) = parquet_to_record_batches(&data)?; all_batches.extend(batches); } if !all_batches.is_empty() { let total_rows: usize = all_batches.iter().map(|b| b.num_rows()).sum(); tracing::debug!("loaded {} delta files ({} rows) for '{}'", keys.len(), total_rows, dataset_name); } Ok(all_batches) } /// Compact: merge base Parquet + all deltas into a single new base file. /// Optionally deduplicates by a primary key column. pub async fn compact( store: &Arc, dataset_name: &str, base_key: &str, primary_key_col: Option<&str>, ) -> Result { // Load base let base_data = ops::get(store, base_key).await?; let (schema, mut base_batches) = parquet_to_record_batches(&base_data)?; // Load deltas let delta_batches = load_deltas(store, dataset_name).await?; let delta_count = delta_batches.len(); if delta_batches.is_empty() { return Ok(CompactResult { base_rows: base_batches.iter().map(|b| b.num_rows()).sum(), delta_rows: 0, final_rows: base_batches.iter().map(|b| b.num_rows()).sum(), deltas_merged: 0, }); } base_batches.extend(delta_batches); let base_rows: usize = base_batches.iter().map(|b| b.num_rows()).sum(); // If primary key specified, deduplicate (keep last occurrence) let final_batches = if let Some(_pk) = primary_key_col { // For now, just concatenate. Full dedup requires sorting by PK // and keeping the last row per key — this is a simplification. // TODO: implement proper merge with dedup base_batches } else { base_batches }; let final_rows: usize = final_batches.iter().map(|b| b.num_rows()).sum(); // Write merged base let mut merged_parquet = Vec::new(); for batch in &final_batches { let pq = record_batch_to_parquet(batch)?; merged_parquet.extend_from_slice(&pq); } ops::put(store, base_key, Bytes::from(merged_parquet)).await?; // Delete delta files let delta_keys = list_deltas(store, dataset_name).await?; for key in &delta_keys { let _ = ops::delete(store, key).await; } tracing::info!("compacted '{}': {} deltas merged, {} → {} rows", dataset_name, delta_count, base_rows, final_rows); Ok(CompactResult { base_rows, delta_rows: final_rows - base_rows + delta_count, // approximate final_rows, deltas_merged: delta_count, }) } #[derive(Debug, Clone, serde::Serialize)] pub struct CompactResult { pub base_rows: usize, pub delta_rows: usize, pub final_rows: usize, pub deltas_merged: usize, }