use catalogd::registry::Registry; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::prelude::*; use object_store::ObjectStore; use std::sync::Arc; use url::Url; const STORE_SCHEME: &str = "lakehouse"; /// Query engine that runs DataFusion over catalog-registered Parquet datasets. #[derive(Clone)] pub struct QueryEngine { registry: Registry, store: Arc, } impl QueryEngine { pub fn new(registry: Registry, store: Arc) -> Self { Self { registry, store } } /// Execute a SQL query. Resolves all catalog datasets as tables before execution. pub async fn query(&self, sql: &str) -> Result, String> { let ctx = self.build_context().await?; let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?; let batches = df.collect().await.map_err(|e| format!("execution error: {e}"))?; Ok(batches) } /// Build a SessionContext with all catalog datasets registered as tables. async fn build_context(&self) -> Result { let ctx = SessionContext::new(); // Register the object store under a custom scheme to avoid path doubling. // The store already has the root prefix (e.g. ./data), so paths are relative keys. let base_url = Url::parse(&format!("{STORE_SCHEME}://data/")) .map_err(|e| format!("invalid store url: {e}"))?; ctx.runtime_env().register_object_store(&base_url, self.store.clone()); // Register each catalog dataset as a table let datasets = self.registry.list().await; for dataset in &datasets { if dataset.objects.is_empty() { continue; } let opts = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_paths: Vec = dataset.objects.iter() .filter_map(|o| { let url_str = format!("{STORE_SCHEME}://data/{}", o.key); ListingTableUrl::parse(&url_str).ok() }) .collect(); if table_paths.is_empty() { tracing::warn!("dataset {} has no valid paths, skipping", dataset.name); continue; } // Infer schema from the first file let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await .map_err(|e| format!("schema inference failed for {}: {e}", dataset.name))?; let config = ListingTableConfig::new_with_multi_paths(table_paths) .with_listing_options(opts) .with_schema(schema); let table = ListingTable::try_new(config) .map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?; ctx.register_table(&dataset.name, Arc::new(table)) .map_err(|e| format!("table registration failed for {}: {e}", dataset.name))?; tracing::debug!("registered table: {}", dataset.name); } Ok(ctx) } }