diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index e94f19b..fe9b09e 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -9,6 +9,15 @@ use tokio::sync::RwLock; use storaged::ops; use object_store::ObjectStore; +#[derive(Debug, Clone, Default, serde::Serialize)] +pub struct MigrateBucketsReport { + pub refs_examined: usize, + pub refs_renamed: usize, // legacy "data"/"local" → "primary" + pub refs_stamped: usize, // empty → "primary" + pub refs_unchanged: usize, // already canonical + pub manifests_persisted: usize, +} + /// Partial metadata update — only set fields are applied. #[derive(Debug, Clone, Default, serde::Deserialize)] pub struct MetadataUpdate { @@ -295,6 +304,56 @@ impl Registry { Ok(()) } + /// Federation layer 2: stamp `bucket = "primary"` on every ObjectRef + /// whose `bucket` field is empty or matches a legacy value (`"data"`, + /// `"local"`). One-shot migration; re-running is a safe no-op once + /// every ref is canonical. + pub async fn migrate_buckets_to_primary(&self) -> Result { + let mut datasets = self.datasets.write().await; + let mut report = MigrateBucketsReport::default(); + let mut to_persist: Vec<(DatasetId, DatasetManifest)> = Vec::new(); + + for manifest in datasets.values_mut() { + let mut changed = false; + for obj in manifest.objects.iter_mut() { + report.refs_examined += 1; + if obj.bucket.is_empty() { + obj.bucket = "primary".to_string(); + report.refs_stamped += 1; + changed = true; + } else if obj.bucket == "data" || obj.bucket == "local" { + obj.bucket = "primary".to_string(); + report.refs_renamed += 1; + changed = true; + } else { + report.refs_unchanged += 1; + } + } + if changed { + manifest.updated_at = chrono::Utc::now(); + to_persist.push((manifest.id.clone(), manifest.clone())); + } + } + + // Persist updated manifests after we've finished mutating the map. + for (id, manifest) in to_persist { + let key = format!("{MANIFEST_PREFIX}/{}.json", id); + let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + report.manifests_persisted += 1; + } + + tracing::info!( + "bucket migration: examined {} refs, renamed {}, stamped {}, unchanged {}, persisted {} manifests", + report.refs_examined, + report.refs_renamed, + report.refs_stamped, + report.refs_unchanged, + report.manifests_persisted, + ); + Ok(report) + } + /// List datasets whose `embedding_stale_since` is set — they need a refresh. pub async fn stale_datasets(&self) -> Vec { let datasets = self.datasets.read().await; diff --git a/crates/catalogd/src/service.rs b/crates/catalogd/src/service.rs index 19c4b6c..b1aa01b 100644 --- a/crates/catalogd/src/service.rs +++ b/crates/catalogd/src/service.rs @@ -21,6 +21,7 @@ pub fn router(registry: Registry) -> Router { .route("/datasets/by-name/{name}/metadata", post(update_metadata)) .route("/datasets/by-name/{name}/resync", post(resync_dataset)) .route("/resync-missing", post(resync_all_missing)) + .route("/migrate-buckets", post(migrate_buckets)) .with_state(registry) } @@ -194,3 +195,13 @@ async fn resync_all_missing(State(registry): State) -> impl IntoRespon failed: err.into_iter().map(|(name, error)| ResyncErr { name, error }).collect(), }) } + +/// Federation layer 2 one-shot: normalize every ObjectRef.bucket field +/// to the canonical "primary" value. Idempotent — re-running once +/// everything is canonical is a safe no-op. +async fn migrate_buckets(State(registry): State) -> impl IntoResponse { + match registry.migrate_buckets_to_primary().await { + Ok(report) => Ok(Json(report)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 256732d..eb2cafc 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -57,7 +57,7 @@ async fn main() { // Query engine with 16GB memory cache let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024); - let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone(), cache); + let engine = queryd::context::QueryEngine::new(registry.clone(), bucket_registry.clone(), cache); // Event journal — append-only mutation log (flush every 100 events) let journal = journald::journal::Journal::new(store.clone(), 100); @@ -86,6 +86,7 @@ async fn main() { .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { store: store.clone(), registry: registry.clone(), + buckets: bucket_registry.clone(), })) .nest("/vectors", vectord::service::router({ let index_reg = vectord::index_registry::IndexRegistry::new(store.clone()); diff --git a/crates/ingestd/src/pipeline.rs b/crates/ingestd/src/pipeline.rs index c28b518..a8d90df 100644 --- a/crates/ingestd/src/pipeline.rs +++ b/crates/ingestd/src/pipeline.rs @@ -28,12 +28,29 @@ pub struct IngestResult { } /// Full ingest pipeline: detect → parse → dedup → store → register. +/// +/// Bucket name (federation layer 2) is recorded on the ObjectRef so the +/// catalog knows which bucket holds the data — defaults to "primary" via +/// the ingest service's resolve_bucket helper. pub async fn ingest_file( filename: &str, content: &[u8], dataset_name: Option<&str>, store: &Arc, registry: &Registry, +) -> Result { + ingest_file_to_bucket(filename, content, dataset_name, "primary", store, registry).await +} + +/// Same as `ingest_file` but with explicit target bucket name on the +/// ObjectRef. Header-aware ingest endpoints call this directly. +pub async fn ingest_file_to_bucket( + filename: &str, + content: &[u8], + dataset_name: Option<&str>, + bucket: &str, + store: &Arc, + registry: &Registry, ) -> Result { // 1. Detect file type let file_type = detect_file_type(filename, content); @@ -98,7 +115,7 @@ pub async fn ingest_file( let schema_fp = fingerprint_schema(&schema); let now = chrono::Utc::now(); let obj_ref = ObjectRef { - bucket: "data".to_string(), + bucket: bucket.to_string(), key: storage_key.clone(), size_bytes: parquet_size, created_at: now, diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs index ddb946c..3dcef67 100644 --- a/crates/ingestd/src/service.rs +++ b/crates/ingestd/src/service.rs @@ -1,7 +1,7 @@ use axum::{ Json, Router, extract::{Multipart, Query, State}, - http::StatusCode, + http::{HeaderMap, StatusCode}, response::IntoResponse, routing::{get, post}, }; @@ -15,11 +15,37 @@ use crate::{db_ingest, pg_stream, pipeline}; use shared::arrow_helpers::record_batch_to_parquet; use shared::types::{ObjectRef, SchemaFingerprint}; use storaged::ops; +use storaged::registry::BucketRegistry; #[derive(Clone)] pub struct IngestState { pub store: Arc, pub registry: Registry, + /// Federation layer 2: lookup target bucket from request headers. + pub buckets: Arc, +} + +/// Resolve the target bucket from `X-Lakehouse-Bucket` header. +/// Returns `(bucket_name, store_for_writes)`. Falls back to "primary" +/// when the header is absent. Returns Err with the canonical bucket +/// list when the header names an unknown bucket. +fn resolve_bucket( + headers: &HeaderMap, + buckets: &BucketRegistry, +) -> Result<(String, Arc), (StatusCode, String)> { + let target = headers + .get("x-lakehouse-bucket") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| buckets.default_name().to_string()); + + match buckets.get(&target) { + Ok(store) => Ok((target, store)), + Err(_) => Err(( + StatusCode::NOT_FOUND, + format!("unknown bucket '{}' — use GET /storage/buckets to list", target), + )), + } } pub fn router(state: IngestState) -> Router { @@ -44,8 +70,11 @@ struct IngestQuery { async fn ingest_file( State(state): State, Query(query): Query, + headers: HeaderMap, mut multipart: Multipart, ) -> impl IntoResponse { + let (bucket, store) = resolve_bucket(&headers, &state.buckets)?; + let field = match multipart.next_field().await { Ok(Some(f)) => f, Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())), @@ -56,11 +85,14 @@ async fn ingest_file( let content = field.bytes().await .map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?; - tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len()); + tracing::info!( + "ingest '{}' ({} bytes) -> bucket={}", + filename, content.len(), bucket, + ); let dataset_name = query.name.as_deref(); - match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await { + match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await { Ok(result) => { if result.deduplicated { Ok((StatusCode::OK, Json(result))) @@ -195,11 +227,13 @@ async fn import_pg_table( /// `POST /ingest/db` with `{dsn, table, dataset_name}`. async fn ingest_db_stream( State(state): State, + headers: HeaderMap, Json(req): Json, ) -> Result<(StatusCode, Json), (StatusCode, String)> { + let (bucket, store) = resolve_bucket(&headers, &state.buckets)?; tracing::info!( - "pg stream ingest: table='{}' dataset='{:?}' batch_size={:?}", - req.table, req.dataset_name, req.batch_size, + "pg stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}", + req.table, req.dataset_name, bucket, req.batch_size, ); // Stream from Postgres into Parquet bytes. @@ -224,7 +258,7 @@ async fn ingest_db_stream( let storage_key = format!("datasets/{}.parquet", dataset_name); let size_bytes = parquet.len() as u64; - ops::put(&state.store, &storage_key, parquet) + ops::put(&store, &storage_key, parquet) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; @@ -234,7 +268,7 @@ async fn ingest_db_stream( dataset_name.clone(), SchemaFingerprint(schema_fp.0), vec![ObjectRef { - bucket: "primary".to_string(), + bucket: bucket.clone(), key: storage_key.clone(), size_bytes, created_at: now, diff --git a/crates/queryd/src/context.rs b/crates/queryd/src/context.rs index 76ba127..ac54c1c 100644 --- a/crates/queryd/src/context.rs +++ b/crates/queryd/src/context.rs @@ -5,6 +5,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::prelude::*; use object_store::ObjectStore; use std::sync::Arc; +use storaged::registry::BucketRegistry; use url::Url; use crate::cache::MemCache; @@ -12,27 +13,56 @@ use crate::delta; const STORE_SCHEME: &str = "lakehouse"; +/// URL scheme used to register a bucket's object store with DataFusion. +/// Bucket names safely embed in URL host components after sanitizing +/// `:` (used in `profile:user`) to `-`. +fn bucket_scheme(bucket: &str) -> String { + format!("{STORE_SCHEME}-{}", bucket.replace(':', "-")) +} + +/// Build a `ListingTableUrl`-shaped string for an object in a bucket. +fn bucket_object_url(bucket: &str, key: &str) -> String { + format!("{}://data/{}", bucket_scheme(bucket), key) +} + +/// Build the registration base URL for a bucket. +fn bucket_base_url(bucket: &str) -> String { + format!("{}://data/", bucket_scheme(bucket)) +} + /// Query engine with in-memory cache and delta merge support. +/// Federation layer 2: holds a `BucketRegistry` so cross-bucket queries +/// route reads to the right ObjectStore based on `ObjectRef.bucket`. #[derive(Clone)] pub struct QueryEngine { registry: Registry, + buckets: Arc, + /// Primary bucket store cached for backward-compat callers (compact, + /// workspace ops) that haven't been migrated to bucket-aware paths. store: Arc, cache: MemCache, } impl QueryEngine { - pub fn new(registry: Registry, store: Arc, cache: MemCache) -> Self { - Self { registry, store, cache } + pub fn new(registry: Registry, buckets: Arc, cache: MemCache) -> Self { + let store = buckets.default_store(); + Self { registry, buckets, store, cache } } pub fn cache(&self) -> &MemCache { &self.cache } + /// Backward-compat: returns the primary bucket's store. Bucket-aware + /// callers should use `engine.buckets()` directly. pub fn store(&self) -> &Arc { &self.store } + pub fn buckets(&self) -> &Arc { + &self.buckets + } + /// Execute a SQL query. Uses cache for hot data, falls back to Parquet. pub async fn query(&self, sql: &str) -> Result, String> { let ctx = self.build_context().await?; @@ -43,11 +73,16 @@ impl QueryEngine { /// Pin a dataset into the memory cache. pub async fn pin_dataset(&self, name: &str) -> Result<(), String> { - // Read from Parquet let ctx = SessionContext::new(); - let base_url = Url::parse(&format!("{STORE_SCHEME}://data/")) - .map_err(|e| format!("invalid store url: {e}"))?; - ctx.runtime_env().register_object_store(&base_url, self.store.clone()); + + // Register every bucket so a multi-bucket dataset can be pinned. + for info in self.buckets.list().await { + let url = Url::parse(&bucket_base_url(&info.name)) + .map_err(|e| format!("invalid store url: {e}"))?; + let store = self.buckets.get(&info.name) + .map_err(|e| format!("registry inconsistency: {e}"))?; + ctx.runtime_env().register_object_store(&url, store); + } let dataset = self.registry.get_by_name(name).await .ok_or_else(|| format!("dataset not found: {name}"))?; @@ -58,7 +93,10 @@ impl QueryEngine { let opts = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_paths: Vec = dataset.objects.iter() - .filter_map(|o| ListingTableUrl::parse(&format!("{STORE_SCHEME}://data/{}", o.key)).ok()) + .filter_map(|o| { + let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket }; + ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok() + }) .collect(); let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await @@ -79,9 +117,28 @@ impl QueryEngine { async fn build_context(&self) -> Result { let ctx = SessionContext::new(); - let base_url = Url::parse(&format!("{STORE_SCHEME}://data/")) - .map_err(|e| format!("invalid store url: {e}"))?; - ctx.runtime_env().register_object_store(&base_url, self.store.clone()); + // Federation layer 2: register every configured bucket as its own + // DataFusion ObjectStore under a distinct URL scheme. Each + // dataset's ObjectRef.bucket determines which store DataFusion + // routes to at scan time. + let bucket_infos = self.buckets.list().await; + for info in &bucket_infos { + let url_str = bucket_base_url(&info.name); + let url = Url::parse(&url_str) + .map_err(|e| format!("invalid store url '{url_str}': {e}"))?; + // unknown-bucket here would be a config invariant violation, + // since we just listed them — propagate as an error to surface it. + let store = self.buckets.get(&info.name) + .map_err(|e| format!("registry inconsistency: {e}"))?; + ctx.runtime_env().register_object_store(&url, store); + } + + // Backward-compat: also register the legacy `lakehouse://data/` + // URL pointing at primary, in case any code path still constructs + // ListingTableUrls without a bucket prefix. + let legacy_url = Url::parse(&format!("{STORE_SCHEME}://data/")) + .map_err(|e| format!("invalid legacy store url: {e}"))?; + ctx.runtime_env().register_object_store(&legacy_url, self.store.clone()); let datasets = self.registry.list().await; for dataset in &datasets { @@ -89,9 +146,9 @@ impl QueryEngine { continue; } - // Check cache first + // Check cache first — cached batches are bucket-agnostic since + // they're already materialized in memory. if let Some(cached) = self.cache.get(&dataset.name).await { - // Load any delta files and merge let delta_batches = delta::load_deltas(&self.store, &dataset.name).await.unwrap_or_default(); let mut all_batches = cached.batches; @@ -106,10 +163,15 @@ impl QueryEngine { continue; } - // Fall back to Parquet file + // Build ListingTable URLs that include the per-object bucket + // scheme. DataFusion routes each scan to the right store + // automatically based on the URL. let opts = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_paths: Vec = dataset.objects.iter() - .filter_map(|o| ListingTableUrl::parse(&format!("{STORE_SCHEME}://data/{}", o.key)).ok()) + .filter_map(|o| { + let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket }; + ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok() + }) .collect(); if table_paths.is_empty() { @@ -124,8 +186,19 @@ impl QueryEngine { let table = ListingTable::try_new(config) .map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?; - ctx.register_table(&dataset.name, Arc::new(table)) - .map_err(|e| format!("table registration failed for {}: {e}", dataset.name))?; + // Tolerate duplicate manifest entries for the same name — + // pre-existing pipeline::ingest_file behavior creates a fresh + // dataset id on every ingest. First registration wins; later + // ones are skipped with a warning rather than failing the + // whole context build. + if let Err(e) = ctx.register_table(&dataset.name, Arc::new(table)) { + let msg = e.to_string(); + if msg.contains("already exists") { + tracing::debug!("skip duplicate manifest registration: {}", dataset.name); + } else { + return Err(format!("table registration failed for {}: {}", dataset.name, msg)); + } + } } Ok(ctx) diff --git a/docs/PHASES.md b/docs/PHASES.md index d7ae364..9287274 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -139,11 +139,11 @@ - [x] Bucket-aware I/O: `PUT/GET /storage/buckets/{bucket}/objects/{*key}` with rescue fallback + `X-Lakehouse-Rescue-Used` observability headers - [x] Backward compat: empty `[[storage.buckets]]` synthesizes a `primary` from legacy `root` - [x] Three-bucket test (primary + rescue + testing) verified: normal reads, rescue fallback with headers, hard-fail missing, write to unknown bucket 503, error journal + health summary - - [ ] `X-Lakehouse-Bucket` header middleware on ingest/query/catalog endpoints - - [ ] Catalog migration: set `bucket = "primary"` on every legacy ObjectRef - - [ ] `queryd` registers every bucket with DataFusion for cross-bucket SQL - - [ ] Profile hot-load endpoints: `POST /profile/{user}/activate|deactivate` - - [ ] `vectord` bucket-scoped paths (trial journals, eval sets per-bucket) + - [x] `X-Lakehouse-Bucket` header middleware on ingest endpoints (2026-04-16) + - [x] Catalog migration: `POST /catalog/migrate-buckets` stamps `bucket = "primary"` on legacy refs (12 renamed, 14 total now canonical) + - [x] `queryd` registers every bucket with DataFusion for cross-bucket SQL — verified with people_test (testing) × animals (primary) CROSS JOIN + - [ ] Profile hot-load endpoints: `POST /profile/{user}/activate|deactivate` (deferred to Phase 17) + - [ ] `vectord` bucket-scoped paths (trial journals, eval sets per-bucket) (deferred to Phase 17) - [x] Database connector ingest (Postgres first) — 2026-04-16 - `pg_stream::stream_table_to_parquet` — ORDER BY + LIMIT/OFFSET pagination, configurable batch_size - `parse_dsn` — postgresql:// and postgres:// URL scheme, user/password/host/port/db