lakehouse/crates/queryd/src/context.rs
root 4e1c400f5d Phase E.2: Compaction integrates tombstones — physical deletion closes GDPR loop
Phase E gave us soft-delete at query time (tombstones hide rows via a
DataFusion filter view). This completes the invariant: after compact,
tombstoned rows are PHYSICALLY absent from the parquet on disk.

delta::compact changes:
- Signature adds tombstones: &[Tombstone]
- After merging base + deltas, apply_tombstone_filter builds a
  BooleanArray keep-mask per batch (True where row_key_value is NOT
  in the tombstone set) and applies arrow::compute::filter_record_batch
- Supports Utf8, Int32, Int64 key columns (matches refresh.rs coverage
  for pg- and csv-derived schemas)
- CompactResult gains tombstones_applied + rows_dropped_by_tombstones
- Caller clears tombstone store on success

Critical correctness fix surfaced during E2E testing:
The original Phase 8 compact concatenated N independent Parquet byte
streams from record_batch_to_parquet() — each with its own footer.
Parquet readers only see the FIRST footer's data; the rest is invisible.
Latent since Phase 8 shipped; triggered by tombstone-filtering produc-
ing multiple batches. Corrupted candidates.parquet on first test run
(restored from UI fixture copy — good argument for test data in repo).

Fix:
- Single ArrowWriter per compaction, writes every batch into one
  properly-footered Parquet
- Snappy compression to match ingest defaults (otherwise rewrite
  inflated file 3× — 10.5MB → 34MB — because no compression was set)
- Verify-before-swap: parse written buf back to confirm row count
  matches expected; refuses to overwrite base_key if verification fails
- Write to {base_key}.compact-{ts}.tmp first, then to base_key; delete
  temp; only then delete delta files. Any error along the way leaves
  the original base intact.

TombstoneStore::clear(dataset) drops all tombstone batch files and
evicts the per-dataset AppendLog from cache. Called after successful
compact.

QueryEngine::catalog() accessor exposes the Registry so queryd
handlers can reach the tombstone store without routing through gateway
state.

E2E on candidates (100K rows, 15 cols):
- Baseline: 10.59 MB, 100000 rows
- Tombstone CAND-000001/2/3 (soft-delete): 99997 visible, 100000 raw
- Compact: tombstones_applied=3, rows_dropped=3, final_rows=99997
- Post: 10.72 MB (Snappy), valid parquet (1 row_group), 99997 rows
- Restart: persists, tombstones list empty, __raw__candidates also
  99997 (the 3 IDs are physically gone from disk)

PRD invariant close: deletion is now actually deletion, not just
masking. GDPR erasure request → tombstone + schedule compact → data
gone.

Deferred:
- Compact-all-datasets cron (currently manual per-dataset via
  POST /query/compact)
- Compaction of tombstone batch files themselves (they grow at
  flush_threshold=1 per tombstone; TombstoneStore::compact exists
  but not auto-called)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 10:38:30 -05:00

329 lines
14 KiB
Rust

use catalogd::registry::Registry;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::MemTable;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::prelude::*;
use object_store::ObjectStore;
use std::sync::Arc;
use storaged::registry::BucketRegistry;
use url::Url;
use crate::cache::MemCache;
use crate::delta;
const STORE_SCHEME: &str = "lakehouse";
/// URL scheme used to register a bucket's object store with DataFusion.
/// Bucket names safely embed in URL host components after sanitizing
/// `:` (used in `profile:user`) to `-`.
fn bucket_scheme(bucket: &str) -> String {
format!("{STORE_SCHEME}-{}", bucket.replace(':', "-"))
}
/// Build a `ListingTableUrl`-shaped string for an object in a bucket.
fn bucket_object_url(bucket: &str, key: &str) -> String {
format!("{}://data/{}", bucket_scheme(bucket), key)
}
/// Build the registration base URL for a bucket.
fn bucket_base_url(bucket: &str) -> String {
format!("{}://data/", bucket_scheme(bucket))
}
/// Query engine with in-memory cache and delta merge support.
/// Federation layer 2: holds a `BucketRegistry` so cross-bucket queries
/// route reads to the right ObjectStore based on `ObjectRef.bucket`.
#[derive(Clone)]
pub struct QueryEngine {
registry: Registry,
buckets: Arc<BucketRegistry>,
/// Primary bucket store cached for backward-compat callers (compact,
/// workspace ops) that haven't been migrated to bucket-aware paths.
store: Arc<dyn ObjectStore>,
cache: MemCache,
}
impl QueryEngine {
pub fn new(registry: Registry, buckets: Arc<BucketRegistry>, cache: MemCache) -> Self {
let store = buckets.default_store();
Self { registry, buckets, store, cache }
}
pub fn cache(&self) -> &MemCache {
&self.cache
}
/// Backward-compat: returns the primary bucket's store. Bucket-aware
/// callers should use `engine.buckets()` directly.
pub fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
}
pub fn buckets(&self) -> &Arc<BucketRegistry> {
&self.buckets
}
/// Catalog registry — used by compact/tombstone integration.
pub fn catalog(&self) -> &Registry {
&self.registry
}
/// Execute a SQL query. Uses cache for hot data, falls back to Parquet.
pub async fn query(&self, sql: &str) -> Result<Vec<arrow::array::RecordBatch>, String> {
let ctx = self.build_context().await?;
let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?;
let batches = df.collect().await.map_err(|e| format!("execution error: {e}"))?;
Ok(batches)
}
/// Pin a dataset into the memory cache.
pub async fn pin_dataset(&self, name: &str) -> Result<(), String> {
let ctx = SessionContext::new();
// Register every bucket so a multi-bucket dataset can be pinned.
for info in self.buckets.list().await {
let url = Url::parse(&bucket_base_url(&info.name))
.map_err(|e| format!("invalid store url: {e}"))?;
let store = self.buckets.get(&info.name)
.map_err(|e| format!("registry inconsistency: {e}"))?;
ctx.runtime_env().register_object_store(&url, store);
}
let dataset = self.registry.get_by_name(name).await
.ok_or_else(|| format!("dataset not found: {name}"))?;
if dataset.objects.is_empty() {
return Err(format!("dataset '{name}' has no objects"));
}
let opts = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_paths: Vec<ListingTableUrl> = dataset.objects.iter()
.filter_map(|o| {
let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket };
ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok()
})
.collect();
let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await
.map_err(|e| format!("schema inference: {e}"))?;
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opts)
.with_schema(schema.clone());
let table = ListingTable::try_new(config).map_err(|e| format!("table: {e}"))?;
ctx.register_table(name, Arc::new(table)).map_err(|e| format!("register: {e}"))?;
let df = ctx.sql(&format!("SELECT * FROM {name}")).await.map_err(|e| format!("read: {e}"))?;
let batches = df.collect().await.map_err(|e| format!("collect: {e}"))?;
self.cache.put(name, schema, batches).await;
Ok(())
}
async fn build_context(&self) -> Result<SessionContext, String> {
let ctx = SessionContext::new();
// Phase E: snapshot tombstones by dataset before registering tables
// so we can wrap tombstoned tables in a filter view. The underlying
// base table is registered under an internal name `__raw__{dataset}`
// and the public `{dataset}` name becomes the filtered view.
let all_dataset_names: Vec<String> = self.registry.list().await
.iter().map(|d| d.name.clone()).collect();
let tombstones_by_dataset = self
.registry
.tombstones()
.all_grouped(&all_dataset_names)
.await
.unwrap_or_default();
// Federation layer 2: register every configured bucket as its own
// DataFusion ObjectStore under a distinct URL scheme. Each
// dataset's ObjectRef.bucket determines which store DataFusion
// routes to at scan time.
let bucket_infos = self.buckets.list().await;
for info in &bucket_infos {
let url_str = bucket_base_url(&info.name);
let url = Url::parse(&url_str)
.map_err(|e| format!("invalid store url '{url_str}': {e}"))?;
// unknown-bucket here would be a config invariant violation,
// since we just listed them — propagate as an error to surface it.
let store = self.buckets.get(&info.name)
.map_err(|e| format!("registry inconsistency: {e}"))?;
ctx.runtime_env().register_object_store(&url, store);
}
// Backward-compat: also register the legacy `lakehouse://data/`
// URL pointing at primary, in case any code path still constructs
// ListingTableUrls without a bucket prefix.
let legacy_url = Url::parse(&format!("{STORE_SCHEME}://data/"))
.map_err(|e| format!("invalid legacy store url: {e}"))?;
ctx.runtime_env().register_object_store(&legacy_url, self.store.clone());
let datasets = self.registry.list().await;
for dataset in &datasets {
if dataset.objects.is_empty() {
continue;
}
// Check cache first — cached batches are bucket-agnostic since
// they're already materialized in memory.
if let Some(cached) = self.cache.get(&dataset.name).await {
let delta_batches = delta::load_deltas(&self.store, &dataset.name).await.unwrap_or_default();
let mut all_batches = cached.batches;
all_batches.extend(delta_batches);
let mem_table = MemTable::try_new(cached.schema, vec![all_batches])
.map_err(|e| format!("MemTable error for {}: {e}", dataset.name))?;
ctx.register_table(&dataset.name, Arc::new(mem_table))
.map_err(|e| format!("register cached {}: {e}", dataset.name))?;
tracing::debug!("using cached table: {}", dataset.name);
continue;
}
// Build ListingTable URLs that include the per-object bucket
// scheme. DataFusion routes each scan to the right store
// automatically based on the URL.
let opts = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_paths: Vec<ListingTableUrl> = dataset.objects.iter()
.filter_map(|o| {
let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket };
ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok()
})
.collect();
if table_paths.is_empty() {
continue;
}
let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await
.map_err(|e| format!("schema inference failed for {}: {e}", dataset.name))?;
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(opts)
.with_schema(schema);
let table = ListingTable::try_new(config)
.map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?;
// Decide the registration name: if this dataset has any
// tombstones, the raw table gets an internal name and the
// public name becomes a filtered view.
let tombstone_entry = tombstones_by_dataset.get(&dataset.name);
let register_name = if tombstone_entry.is_some() {
format!("__raw__{}", dataset.name)
} else {
dataset.name.clone()
};
if let Err(e) = ctx.register_table(register_name.as_str(), Arc::new(table)) {
let msg = e.to_string();
if msg.contains("already exists") {
tracing::debug!("skip duplicate manifest registration: {}", register_name);
continue;
} else {
return Err(format!("table registration failed for {}: {}", register_name, msg));
}
}
// If there are tombstones, register the public name as a
// filtered view that excludes tombstoned row_key_values.
if let Some((key_col, values)) = tombstone_entry {
// Build WHERE NOT IN (...) — quote values to be SQL-safe.
// For string keys this is a literal list; for integer keys
// CAST(col AS VARCHAR) makes the comparison unambiguous.
let quoted: Vec<String> = values.iter()
.map(|v| format!("'{}'", v.replace('\'', "''")))
.collect();
let sql = format!(
"SELECT * FROM \"{}\" WHERE CAST(\"{}\" AS VARCHAR) NOT IN ({})",
register_name, key_col, quoted.join(", "),
);
tracing::debug!(
"tombstone filter for '{}': {} row_keys excluded",
dataset.name, values.len(),
);
match ctx.sql(&sql).await {
Ok(df) => {
if let Err(e) = ctx.register_table(dataset.name.as_str(), df.into_view()) {
let msg = e.to_string();
if msg.contains("already exists") {
tracing::debug!("skip duplicate tombstone view: {}", dataset.name);
} else {
tracing::warn!("tombstone view registration failed for {}: {msg}", dataset.name);
}
}
}
Err(e) => {
tracing::warn!("tombstone view SQL failed for '{}': {e}", dataset.name);
}
}
}
}
// Phase D: register every AiView as a DataFusion view that wraps
// the base table with the safe projection + filter + redactions.
for view in self.registry.list_views().await {
// Build the SELECT clause with column whitelist and any
// redaction expressions. Each column is either the raw column
// name or an expression aliased back to the column name so
// downstream queries see the same shape.
let select_cols: Vec<String> = view.columns.iter().map(|col| {
match view.column_redactions.get(col) {
None => format!("\"{}\"", col),
Some(shared::types::Redaction::Null) => {
format!("CAST(NULL AS VARCHAR) AS \"{}\"", col)
}
Some(shared::types::Redaction::Hash) => {
// DataFusion has digest('sha256', value) — but column
// type might not be string. Cast first to be safe.
format!("digest(CAST(\"{}\" AS VARCHAR), 'sha256') AS \"{}\"", col, col)
}
Some(shared::types::Redaction::Mask { keep_prefix, keep_suffix }) => {
// Keep first N + last M chars, replace middle with stars.
// CASE NULL through with COALESCE for null-safe behavior.
format!(
"CASE WHEN \"{c}\" IS NULL THEN NULL ELSE \
concat(\
substr(CAST(\"{c}\" AS VARCHAR), 1, {p}), \
repeat('*', greatest(0, length(CAST(\"{c}\" AS VARCHAR)) - {p} - {s})), \
substr(CAST(\"{c}\" AS VARCHAR), greatest(1, length(CAST(\"{c}\" AS VARCHAR)) - {s} + 1))\
) END AS \"{c}\"",
c = col, p = keep_prefix, s = keep_suffix,
)
}
}
}).collect();
let where_clause = view.row_filter
.as_deref()
.map(|f| format!(" WHERE {}", f))
.unwrap_or_default();
let view_sql = format!(
"SELECT {} FROM \"{}\"{}",
select_cols.join(", "),
view.base_dataset,
where_clause,
);
tracing::debug!("registering AiView '{}': {}", view.name, view_sql);
let df = match ctx.sql(&view_sql).await {
Ok(df) => df,
Err(e) => {
tracing::warn!("view '{}' SQL invalid, skipping: {e}", view.name);
continue;
}
};
if let Err(e) = ctx.register_table(view.name.as_str(), df.into_view()) {
let msg = e.to_string();
if msg.contains("already exists") {
tracing::debug!("skip duplicate view registration: {}", view.name);
} else {
tracing::warn!("view '{}' registration failed: {e}", view.name);
}
}
}
Ok(ctx)
}
}