Federation layer 2: header routing + cross-bucket SQL

Three pieces of the multi-bucket federation made real:

1. Catalog migration (POST /catalog/migrate-buckets)
   - One-shot normalizer for ObjectRef.bucket field
   - Empty -> "primary"; legacy "data"/"local" -> "primary"
   - Idempotent; re-running on canonical state is no-op
   - Ran on existing catalog: 12 refs renamed from "data", 2 already
     "primary", all 14 now canonical

2. X-Lakehouse-Bucket header middleware on ingest
   - resolve_bucket() helper extracts header, returns
     (bucket_name, store) or 404 with valid bucket list
   - ingest_file and ingest_db_stream now route writes per-request
   - Defaults to "primary" when header absent
   - pipeline::ingest_file_to_bucket records the actual bucket on the
     ObjectRef so catalog stays the source of truth for "where does this
     data live"
   - Verified: ingest with X-Lakehouse-Bucket: testing lands in
     data/_testing/, ingest without header lands in data/, bad header
     returns 404 with hint

3. queryd registers every bucket with DataFusion
   - QueryEngine now holds Arc<BucketRegistry> instead of single store
   - build_context iterates all buckets, registers each as a separate
     ObjectStore under URL scheme "lakehouse-{bucket}://"
   - ListingTable URLs include the per-object bucket scheme so
     DataFusion routes scans automatically based on ObjectRef.bucket
   - Profile bucket names like "profile:user" sanitized to
     "lakehouse-profile-user" since URL host segments can't contain ":"
   - Tolerant of duplicate manifest entries (pre-existing
     pipeline::ingest_file behavior creates a fresh dataset id per
     ingest); duplicates skipped with debug log
   - Backward compat: legacy "lakehouse://data/" URL still registered
     pointing at primary

Success gate: cross-bucket CROSS JOIN
  SELECT p.name, p.role, a.species
  FROM people_test p          (bucket: testing)
  CROSS JOIN animals a        (bucket: primary)
  LIMIT 5
returns rows correctly. DataFusion routed each scan to its bucket's
ObjectStore based on the URL scheme.

No regressions: SELECT COUNT(*) FROM candidates still returns 100000
from the primary bucket.

Deferred to Phase 17:
- POST /profile/{user}/activate (HNSW hot-load on profile switch)
- vectord storage paths becoming bucket-scoped (trial journals,
  eval sets per-profile)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 08:52:32 -05:00
parent 650f5e97b6
commit 24f1249a62
7 changed files with 225 additions and 30 deletions

View File

@ -9,6 +9,15 @@ use tokio::sync::RwLock;
use storaged::ops;
use object_store::ObjectStore;
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct MigrateBucketsReport {
pub refs_examined: usize,
pub refs_renamed: usize, // legacy "data"/"local" → "primary"
pub refs_stamped: usize, // empty → "primary"
pub refs_unchanged: usize, // already canonical
pub manifests_persisted: usize,
}
/// Partial metadata update — only set fields are applied.
#[derive(Debug, Clone, Default, serde::Deserialize)]
pub struct MetadataUpdate {
@ -295,6 +304,56 @@ impl Registry {
Ok(())
}
/// Federation layer 2: stamp `bucket = "primary"` on every ObjectRef
/// whose `bucket` field is empty or matches a legacy value (`"data"`,
/// `"local"`). One-shot migration; re-running is a safe no-op once
/// every ref is canonical.
pub async fn migrate_buckets_to_primary(&self) -> Result<MigrateBucketsReport, String> {
let mut datasets = self.datasets.write().await;
let mut report = MigrateBucketsReport::default();
let mut to_persist: Vec<(DatasetId, DatasetManifest)> = Vec::new();
for manifest in datasets.values_mut() {
let mut changed = false;
for obj in manifest.objects.iter_mut() {
report.refs_examined += 1;
if obj.bucket.is_empty() {
obj.bucket = "primary".to_string();
report.refs_stamped += 1;
changed = true;
} else if obj.bucket == "data" || obj.bucket == "local" {
obj.bucket = "primary".to_string();
report.refs_renamed += 1;
changed = true;
} else {
report.refs_unchanged += 1;
}
}
if changed {
manifest.updated_at = chrono::Utc::now();
to_persist.push((manifest.id.clone(), manifest.clone()));
}
}
// Persist updated manifests after we've finished mutating the map.
for (id, manifest) in to_persist {
let key = format!("{MANIFEST_PREFIX}/{}.json", id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
report.manifests_persisted += 1;
}
tracing::info!(
"bucket migration: examined {} refs, renamed {}, stamped {}, unchanged {}, persisted {} manifests",
report.refs_examined,
report.refs_renamed,
report.refs_stamped,
report.refs_unchanged,
report.manifests_persisted,
);
Ok(report)
}
/// List datasets whose `embedding_stale_since` is set — they need a refresh.
pub async fn stale_datasets(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;

View File

@ -21,6 +21,7 @@ pub fn router(registry: Registry) -> Router {
.route("/datasets/by-name/{name}/metadata", post(update_metadata))
.route("/datasets/by-name/{name}/resync", post(resync_dataset))
.route("/resync-missing", post(resync_all_missing))
.route("/migrate-buckets", post(migrate_buckets))
.with_state(registry)
}
@ -194,3 +195,13 @@ async fn resync_all_missing(State(registry): State<Registry>) -> impl IntoRespon
failed: err.into_iter().map(|(name, error)| ResyncErr { name, error }).collect(),
})
}
/// Federation layer 2 one-shot: normalize every ObjectRef.bucket field
/// to the canonical "primary" value. Idempotent — re-running once
/// everything is canonical is a safe no-op.
async fn migrate_buckets(State(registry): State<Registry>) -> impl IntoResponse {
match registry.migrate_buckets_to_primary().await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -57,7 +57,7 @@ async fn main() {
// Query engine with 16GB memory cache
let cache = queryd::cache::MemCache::new(16 * 1024 * 1024 * 1024);
let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone(), cache);
let engine = queryd::context::QueryEngine::new(registry.clone(), bucket_registry.clone(), cache);
// Event journal — append-only mutation log (flush every 100 events)
let journal = journald::journal::Journal::new(store.clone(), 100);
@ -86,6 +86,7 @@ async fn main() {
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
store: store.clone(),
registry: registry.clone(),
buckets: bucket_registry.clone(),
}))
.nest("/vectors", vectord::service::router({
let index_reg = vectord::index_registry::IndexRegistry::new(store.clone());

View File

@ -28,12 +28,29 @@ pub struct IngestResult {
}
/// Full ingest pipeline: detect → parse → dedup → store → register.
///
/// Bucket name (federation layer 2) is recorded on the ObjectRef so the
/// catalog knows which bucket holds the data — defaults to "primary" via
/// the ingest service's resolve_bucket helper.
pub async fn ingest_file(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
ingest_file_to_bucket(filename, content, dataset_name, "primary", store, registry).await
}
/// Same as `ingest_file` but with explicit target bucket name on the
/// ObjectRef. Header-aware ingest endpoints call this directly.
pub async fn ingest_file_to_bucket(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
bucket: &str,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
// 1. Detect file type
let file_type = detect_file_type(filename, content);
@ -98,7 +115,7 @@ pub async fn ingest_file(
let schema_fp = fingerprint_schema(&schema);
let now = chrono::Utc::now();
let obj_ref = ObjectRef {
bucket: "data".to_string(),
bucket: bucket.to_string(),
key: storage_key.clone(),
size_bytes: parquet_size,
created_at: now,

View File

@ -1,7 +1,7 @@
use axum::{
Json, Router,
extract::{Multipart, Query, State},
http::StatusCode,
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::{get, post},
};
@ -15,11 +15,37 @@ use crate::{db_ingest, pg_stream, pipeline};
use shared::arrow_helpers::record_batch_to_parquet;
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
use storaged::registry::BucketRegistry;
#[derive(Clone)]
pub struct IngestState {
pub store: Arc<dyn ObjectStore>,
pub registry: Registry,
/// Federation layer 2: lookup target bucket from request headers.
pub buckets: Arc<BucketRegistry>,
}
/// Resolve the target bucket from `X-Lakehouse-Bucket` header.
/// Returns `(bucket_name, store_for_writes)`. Falls back to "primary"
/// when the header is absent. Returns Err with the canonical bucket
/// list when the header names an unknown bucket.
fn resolve_bucket(
headers: &HeaderMap,
buckets: &BucketRegistry,
) -> Result<(String, Arc<dyn ObjectStore>), (StatusCode, String)> {
let target = headers
.get("x-lakehouse-bucket")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| buckets.default_name().to_string());
match buckets.get(&target) {
Ok(store) => Ok((target, store)),
Err(_) => Err((
StatusCode::NOT_FOUND,
format!("unknown bucket '{}' — use GET /storage/buckets to list", target),
)),
}
}
pub fn router(state: IngestState) -> Router {
@ -44,8 +70,11 @@ struct IngestQuery {
async fn ingest_file(
State(state): State<IngestState>,
Query(query): Query<IngestQuery>,
headers: HeaderMap,
mut multipart: Multipart,
) -> impl IntoResponse {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
let field = match multipart.next_field().await {
Ok(Some(f)) => f,
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
@ -56,11 +85,14 @@ async fn ingest_file(
let content = field.bytes().await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len());
tracing::info!(
"ingest '{}' ({} bytes) -> bucket={}",
filename, content.len(), bucket,
);
let dataset_name = query.name.as_deref();
match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await {
match pipeline::ingest_file_to_bucket(&filename, &content, dataset_name, &bucket, &store, &state.registry).await {
Ok(result) => {
if result.deduplicated {
Ok((StatusCode::OK, Json(result)))
@ -195,11 +227,13 @@ async fn import_pg_table(
/// `POST /ingest/db` with `{dsn, table, dataset_name}`.
async fn ingest_db_stream(
State(state): State<IngestState>,
headers: HeaderMap,
Json(req): Json<pg_stream::PgStreamRequest>,
) -> Result<(StatusCode, Json<serde_json::Value>), (StatusCode, String)> {
let (bucket, store) = resolve_bucket(&headers, &state.buckets)?;
tracing::info!(
"pg stream ingest: table='{}' dataset='{:?}' batch_size={:?}",
req.table, req.dataset_name, req.batch_size,
"pg stream ingest: table='{}' dataset='{:?}' bucket='{}' batch_size={:?}",
req.table, req.dataset_name, bucket, req.batch_size,
);
// Stream from Postgres into Parquet bytes.
@ -224,7 +258,7 @@ async fn ingest_db_stream(
let storage_key = format!("datasets/{}.parquet", dataset_name);
let size_bytes = parquet.len() as u64;
ops::put(&state.store, &storage_key, parquet)
ops::put(&store, &storage_key, parquet)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
@ -234,7 +268,7 @@ async fn ingest_db_stream(
dataset_name.clone(),
SchemaFingerprint(schema_fp.0),
vec![ObjectRef {
bucket: "primary".to_string(),
bucket: bucket.clone(),
key: storage_key.clone(),
size_bytes,
created_at: now,

View File

@ -5,6 +5,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::prelude::*;
use object_store::ObjectStore;
use std::sync::Arc;
use storaged::registry::BucketRegistry;
use url::Url;
use crate::cache::MemCache;
@ -12,27 +13,56 @@ use crate::delta;
const STORE_SCHEME: &str = "lakehouse";
/// URL scheme used to register a bucket's object store with DataFusion.
/// Bucket names safely embed in URL host components after sanitizing
/// `:` (used in `profile:user`) to `-`.
fn bucket_scheme(bucket: &str) -> String {
format!("{STORE_SCHEME}-{}", bucket.replace(':', "-"))
}
/// Build a `ListingTableUrl`-shaped string for an object in a bucket.
fn bucket_object_url(bucket: &str, key: &str) -> String {
format!("{}://data/{}", bucket_scheme(bucket), key)
}
/// Build the registration base URL for a bucket.
fn bucket_base_url(bucket: &str) -> String {
format!("{}://data/", bucket_scheme(bucket))
}
/// Query engine with in-memory cache and delta merge support.
/// Federation layer 2: holds a `BucketRegistry` so cross-bucket queries
/// route reads to the right ObjectStore based on `ObjectRef.bucket`.
#[derive(Clone)]
pub struct QueryEngine {
registry: Registry,
buckets: Arc<BucketRegistry>,
/// Primary bucket store cached for backward-compat callers (compact,
/// workspace ops) that haven't been migrated to bucket-aware paths.
store: Arc<dyn ObjectStore>,
cache: MemCache,
}
impl QueryEngine {
pub fn new(registry: Registry, store: Arc<dyn ObjectStore>, cache: MemCache) -> Self {
Self { registry, store, cache }
pub fn new(registry: Registry, buckets: Arc<BucketRegistry>, cache: MemCache) -> Self {
let store = buckets.default_store();
Self { registry, buckets, store, cache }
}
pub fn cache(&self) -> &MemCache {
&self.cache
}
/// Backward-compat: returns the primary bucket's store. Bucket-aware
/// callers should use `engine.buckets()` directly.
pub fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
}
pub fn buckets(&self) -> &Arc<BucketRegistry> {
&self.buckets
}
/// Execute a SQL query. Uses cache for hot data, falls back to Parquet.
pub async fn query(&self, sql: &str) -> Result<Vec<arrow::array::RecordBatch>, String> {
let ctx = self.build_context().await?;
@ -43,11 +73,16 @@ impl QueryEngine {
/// Pin a dataset into the memory cache.
pub async fn pin_dataset(&self, name: &str) -> Result<(), String> {
// Read from Parquet
let ctx = SessionContext::new();
let base_url = Url::parse(&format!("{STORE_SCHEME}://data/"))
.map_err(|e| format!("invalid store url: {e}"))?;
ctx.runtime_env().register_object_store(&base_url, self.store.clone());
// Register every bucket so a multi-bucket dataset can be pinned.
for info in self.buckets.list().await {
let url = Url::parse(&bucket_base_url(&info.name))
.map_err(|e| format!("invalid store url: {e}"))?;
let store = self.buckets.get(&info.name)
.map_err(|e| format!("registry inconsistency: {e}"))?;
ctx.runtime_env().register_object_store(&url, store);
}
let dataset = self.registry.get_by_name(name).await
.ok_or_else(|| format!("dataset not found: {name}"))?;
@ -58,7 +93,10 @@ impl QueryEngine {
let opts = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_paths: Vec<ListingTableUrl> = dataset.objects.iter()
.filter_map(|o| ListingTableUrl::parse(&format!("{STORE_SCHEME}://data/{}", o.key)).ok())
.filter_map(|o| {
let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket };
ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok()
})
.collect();
let schema = opts.infer_schema(&ctx.state(), &table_paths[0]).await
@ -79,9 +117,28 @@ impl QueryEngine {
async fn build_context(&self) -> Result<SessionContext, String> {
let ctx = SessionContext::new();
let base_url = Url::parse(&format!("{STORE_SCHEME}://data/"))
.map_err(|e| format!("invalid store url: {e}"))?;
ctx.runtime_env().register_object_store(&base_url, self.store.clone());
// Federation layer 2: register every configured bucket as its own
// DataFusion ObjectStore under a distinct URL scheme. Each
// dataset's ObjectRef.bucket determines which store DataFusion
// routes to at scan time.
let bucket_infos = self.buckets.list().await;
for info in &bucket_infos {
let url_str = bucket_base_url(&info.name);
let url = Url::parse(&url_str)
.map_err(|e| format!("invalid store url '{url_str}': {e}"))?;
// unknown-bucket here would be a config invariant violation,
// since we just listed them — propagate as an error to surface it.
let store = self.buckets.get(&info.name)
.map_err(|e| format!("registry inconsistency: {e}"))?;
ctx.runtime_env().register_object_store(&url, store);
}
// Backward-compat: also register the legacy `lakehouse://data/`
// URL pointing at primary, in case any code path still constructs
// ListingTableUrls without a bucket prefix.
let legacy_url = Url::parse(&format!("{STORE_SCHEME}://data/"))
.map_err(|e| format!("invalid legacy store url: {e}"))?;
ctx.runtime_env().register_object_store(&legacy_url, self.store.clone());
let datasets = self.registry.list().await;
for dataset in &datasets {
@ -89,9 +146,9 @@ impl QueryEngine {
continue;
}
// Check cache first
// Check cache first — cached batches are bucket-agnostic since
// they're already materialized in memory.
if let Some(cached) = self.cache.get(&dataset.name).await {
// Load any delta files and merge
let delta_batches = delta::load_deltas(&self.store, &dataset.name).await.unwrap_or_default();
let mut all_batches = cached.batches;
@ -106,10 +163,15 @@ impl QueryEngine {
continue;
}
// Fall back to Parquet file
// Build ListingTable URLs that include the per-object bucket
// scheme. DataFusion routes each scan to the right store
// automatically based on the URL.
let opts = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_paths: Vec<ListingTableUrl> = dataset.objects.iter()
.filter_map(|o| ListingTableUrl::parse(&format!("{STORE_SCHEME}://data/{}", o.key)).ok())
.filter_map(|o| {
let bucket = if o.bucket.is_empty() { "primary" } else { &o.bucket };
ListingTableUrl::parse(&bucket_object_url(bucket, &o.key)).ok()
})
.collect();
if table_paths.is_empty() {
@ -124,8 +186,19 @@ impl QueryEngine {
let table = ListingTable::try_new(config)
.map_err(|e| format!("table creation failed for {}: {e}", dataset.name))?;
ctx.register_table(&dataset.name, Arc::new(table))
.map_err(|e| format!("table registration failed for {}: {e}", dataset.name))?;
// Tolerate duplicate manifest entries for the same name —
// pre-existing pipeline::ingest_file behavior creates a fresh
// dataset id on every ingest. First registration wins; later
// ones are skipped with a warning rather than failing the
// whole context build.
if let Err(e) = ctx.register_table(&dataset.name, Arc::new(table)) {
let msg = e.to_string();
if msg.contains("already exists") {
tracing::debug!("skip duplicate manifest registration: {}", dataset.name);
} else {
return Err(format!("table registration failed for {}: {}", dataset.name, msg));
}
}
}
Ok(ctx)

View File

@ -139,11 +139,11 @@
- [x] Bucket-aware I/O: `PUT/GET /storage/buckets/{bucket}/objects/{*key}` with rescue fallback + `X-Lakehouse-Rescue-Used` observability headers
- [x] Backward compat: empty `[[storage.buckets]]` synthesizes a `primary` from legacy `root`
- [x] Three-bucket test (primary + rescue + testing) verified: normal reads, rescue fallback with headers, hard-fail missing, write to unknown bucket 503, error journal + health summary
- [ ] `X-Lakehouse-Bucket` header middleware on ingest/query/catalog endpoints
- [ ] Catalog migration: set `bucket = "primary"` on every legacy ObjectRef
- [ ] `queryd` registers every bucket with DataFusion for cross-bucket SQL
- [ ] Profile hot-load endpoints: `POST /profile/{user}/activate|deactivate`
- [ ] `vectord` bucket-scoped paths (trial journals, eval sets per-bucket)
- [x] `X-Lakehouse-Bucket` header middleware on ingest endpoints (2026-04-16)
- [x] Catalog migration: `POST /catalog/migrate-buckets` stamps `bucket = "primary"` on legacy refs (12 renamed, 14 total now canonical)
- [x] `queryd` registers every bucket with DataFusion for cross-bucket SQL — verified with people_test (testing) × animals (primary) CROSS JOIN
- [ ] Profile hot-load endpoints: `POST /profile/{user}/activate|deactivate` (deferred to Phase 17)
- [ ] `vectord` bucket-scoped paths (trial journals, eval sets per-bucket) (deferred to Phase 17)
- [x] Database connector ingest (Postgres first) — 2026-04-16
- `pg_stream::stream_table_to_parquet` — ORDER BY + LIMIT/OFFSET pagination, configurable batch_size
- `parse_dsn` — postgresql:// and postgres:// URL scheme, user/password/host/port/db