use catalogd::registry::Registry; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}; use datafusion::datasource::MemTable; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::prelude::*; use object_store::ObjectStore; use std::sync::Arc; use storaged::registry::BucketRegistry; use url::Url; use crate::cache::MemCache; use crate::delta; const STORE_SCHEME: &str = "lakehouse"; /// URL scheme used to register a bucket's object store with DataFusion. /// Bucket names safely embed in URL host components after sanitizing /// `:` (used in `profile:user`) to `-`. fn bucket_scheme(bucket: &str) -> String { format!("{STORE_SCHEME}-{}", bucket.replace(':', "-")) } /// Build a `ListingTableUrl`-shaped string for an object in a bucket. fn bucket_object_url(bucket: &str, key: &str) -> String { format!("{}://data/{}", bucket_scheme(bucket), key) } /// Build the registration base URL for a bucket. fn bucket_base_url(bucket: &str) -> String { format!("{}://data/", bucket_scheme(bucket)) } /// Query engine with in-memory cache and delta merge support. /// Federation layer 2: holds a `BucketRegistry` so cross-bucket queries /// route reads to the right ObjectStore based on `ObjectRef.bucket`. #[derive(Clone)] pub struct QueryEngine { registry: Registry, buckets: Arc, /// Primary bucket store cached for backward-compat callers (compact, /// workspace ops) that haven't been migrated to bucket-aware paths. store: Arc, cache: MemCache, } impl QueryEngine { pub fn new(registry: Registry, buckets: Arc, cache: MemCache) -> Self { let store = buckets.default_store(); Self { registry, buckets, store, cache } } pub fn cache(&self) -> &MemCache { &self.cache } /// Backward-compat: returns the primary bucket's store. Bucket-aware /// callers should use `engine.buckets()` directly. pub fn store(&self) -> &Arc { &self.store } pub fn buckets(&self) -> &Arc { &self.buckets } /// Execute a SQL query. Uses cache for hot data, falls back to Parquet. pub async fn query(&self, sql: &str) -> Result, String> { let ctx = self.build_context().await?; let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?; let batches = df.collect().await.map_err(|e| format!("execution error: {e}"))?; Ok(batches) } /// Pin a dataset into the memory cache. pub async fn pin_dataset(&self, name: &str) -> Result<(), String> { let ctx = SessionContext::new(); // Register every bucket so a multi-bucket dataset can be pinned. for info in self.buckets.list().await { let url = Url::parse(&bucket_base_url(&info.name)) .map_err(|e| format!("invalid store url: {e}"))?; let store = self.buckets.get(&info.name) .map_err(|e| format!("registry inconsistency: {e}"))?; ctx.runtime_env().register_object_store(&url, store); } let dataset = self.registry.get_by_name(name).await .ok_or_else(|| format!("dataset not found: {name}"))?; if dataset.objects.is_empty() { return Err(format!("dataset '{name}' has no objects")); } let opts = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_paths: Vec = dataset.objects.iter() .filter_map(|o| { let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket }; ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok() }) .collect(); let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await .map_err(|e| format!("schema inference: {e}"))?; let config = ListingTableConfig::new_with_multi_paths(table_paths) .with_listing_options(opts) .with_schema(schema.clone()); let table = ListingTable::try_new(config).map_err(|e| format!("table: {e}"))?; ctx.register_table(name, Arc::new(table)).map_err(|e| format!("register: {e}"))?; let df = ctx.sql(&format!("SELECT * FROM {name}")).await.map_err(|e| format!("read: {e}"))?; let batches = df.collect().await.map_err(|e| format!("collect: {e}"))?; self.cache.put(name, schema, batches).await; Ok(()) } async fn build_context(&self) -> Result { let ctx = SessionContext::new(); // Phase E: snapshot tombstones by dataset before registering tables // so we can wrap tombstoned tables in a filter view. The underlying // base table is registered under an internal name `__raw__{dataset}` // and the public `{dataset}` name becomes the filtered view. let all_dataset_names: Vec = self.registry.list().await .iter().map(|d| d.name.clone()).collect(); let tombstones_by_dataset = self .registry .tombstones() .all_grouped(&all_dataset_names) .await .unwrap_or_default(); // Federation layer 2: register every configured bucket as its own // DataFusion ObjectStore under a distinct URL scheme. Each // dataset's ObjectRef.bucket determines which store DataFusion // routes to at scan time. let bucket_infos = self.buckets.list().await; for info in &bucket_infos { let url_str = bucket_base_url(&info.name); let url = Url::parse(&url_str) .map_err(|e| format!("invalid store url '{url_str}': {e}"))?; // unknown-bucket here would be a config invariant violation, // since we just listed them — propagate as an error to surface it. let store = self.buckets.get(&info.name) .map_err(|e| format!("registry inconsistency: {e}"))?; ctx.runtime_env().register_object_store(&url, store); } // Backward-compat: also register the legacy `lakehouse://data/` // URL pointing at primary, in case any code path still constructs // ListingTableUrls without a bucket prefix. let legacy_url = Url::parse(&format!("{STORE_SCHEME}://data/")) .map_err(|e| format!("invalid legacy store url: {e}"))?; ctx.runtime_env().register_object_store(&legacy_url, self.store.clone()); let datasets = self.registry.list().await; for dataset in &datasets { if dataset.objects.is_empty() { continue; } // Check cache first — cached batches are bucket-agnostic since // they're already materialized in memory. if let Some(cached) = self.cache.get(&dataset.name).await { let delta_batches = delta::load_deltas(&self.store, &dataset.name).await.unwrap_or_default(); let mut all_batches = cached.batches; all_batches.extend(delta_batches); let mem_table = MemTable::try_new(cached.schema, vec![all_batches]) .map_err(|e| format!("MemTable error for {}: {e}", dataset.name))?; ctx.register_table(&dataset.name, Arc::new(mem_table)) .map_err(|e| format!("register cached {}: {e}", dataset.name))?; tracing::debug!("using cached table: {}", dataset.name); continue; } // Build ListingTable URLs that include the per-object bucket // scheme. DataFusion routes each scan to the right store // automatically based on the URL. let opts = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_paths: Vec = dataset.objects.iter() .filter_map(|o| { let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket }; ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok() }) .collect(); if table_paths.is_empty() { continue; } let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await .map_err(|e| format!("schema inference failed for {}: {e}", dataset.name))?; let config = ListingTableConfig::new_with_multi_paths(table_paths) .with_listing_options(opts) .with_schema(schema); let table = ListingTable::try_new(config) .map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?; // Decide the registration name: if this dataset has any // tombstones, the raw table gets an internal name and the // public name becomes a filtered view. let tombstone_entry = tombstones_by_dataset.get(&dataset.name); let register_name = if tombstone_entry.is_some() { format!("__raw__{}", dataset.name) } else { dataset.name.clone() }; if let Err(e) = ctx.register_table(register_name.as_str(), Arc::new(table)) { let msg = e.to_string(); if msg.contains("already exists") { tracing::debug!("skip duplicate manifest registration: {}", register_name); continue; } else { return Err(format!("table registration failed for {}: {}", register_name, msg)); } } // If there are tombstones, register the public name as a // filtered view that excludes tombstoned row_key_values. if let Some((key_col, values)) = tombstone_entry { // Build WHERE NOT IN (...) — quote values to be SQL-safe. // For string keys this is a literal list; for integer keys // CAST(col AS VARCHAR) makes the comparison unambiguous. let quoted: Vec = values.iter() .map(|v| format!("'{}'", v.replace('\'', "''"))) .collect(); let sql = format!( "SELECT * FROM \"{}\" WHERE CAST(\"{}\" AS VARCHAR) NOT IN ({})", register_name, key_col, quoted.join(", "), ); tracing::debug!( "tombstone filter for '{}': {} row_keys excluded", dataset.name, values.len(), ); match ctx.sql(&sql).await { Ok(df) => { if let Err(e) = ctx.register_table(dataset.name.as_str(), df.into_view()) { let msg = e.to_string(); if msg.contains("already exists") { tracing::debug!("skip duplicate tombstone view: {}", dataset.name); } else { tracing::warn!("tombstone view registration failed for {}: {msg}", dataset.name); } } } Err(e) => { tracing::warn!("tombstone view SQL failed for '{}': {e}", dataset.name); } } } } // Phase D: register every AiView as a DataFusion view that wraps // the base table with the safe projection + filter + redactions. for view in self.registry.list_views().await { // Build the SELECT clause with column whitelist and any // redaction expressions. Each column is either the raw column // name or an expression aliased back to the column name so // downstream queries see the same shape. let select_cols: Vec = view.columns.iter().map(|col| { match view.column_redactions.get(col) { None => format!("\"{}\"", col), Some(shared::types::Redaction::Null) => { format!("CAST(NULL AS VARCHAR) AS \"{}\"", col) } Some(shared::types::Redaction::Hash) => { // DataFusion has digest('sha256', value) — but column // type might not be string. Cast first to be safe. format!("digest(CAST(\"{}\" AS VARCHAR), 'sha256') AS \"{}\"", col, col) } Some(shared::types::Redaction::Mask { keep_prefix, keep_suffix }) => { // Keep first N + last M chars, replace middle with stars. // CASE NULL through with COALESCE for null-safe behavior. format!( "CASE WHEN \"{c}\" IS NULL THEN NULL ELSE \ concat(\ substr(CAST(\"{c}\" AS VARCHAR), 1, {p}), \ repeat('*', greatest(0, length(CAST(\"{c}\" AS VARCHAR)) - {p} - {s})), \ substr(CAST(\"{c}\" AS VARCHAR), greatest(1, length(CAST(\"{c}\" AS VARCHAR)) - {s} + 1))\ ) END AS \"{c}\"", c = col, p = keep_prefix, s = keep_suffix, ) } } }).collect(); let where_clause = view.row_filter .as_deref() .map(|f| format!(" WHERE {}", f)) .unwrap_or_default(); let view_sql = format!( "SELECT {} FROM \"{}\"{}", select_cols.join(", "), view.base_dataset, where_clause, ); tracing::debug!("registering AiView '{}': {}", view.name, view_sql); let df = match ctx.sql(&view_sql).await { Ok(df) => df, Err(e) => { tracing::warn!("view '{}' SQL invalid, skipping: {e}", view.name); continue; } }; if let Err(e) = ctx.register_table(view.name.as_str(), df.into_view()) { let msg = e.to_string(); if msg.contains("already exists") { tracing::debug!("skip duplicate view registration: {}", view.name); } else { tracing::warn!("view '{}' registration failed: {e}", view.name); } } } Ok(ctx) } }