root 9e6002c4d4 S3 backend for Lance — hybrid operates on real MinIO object storage
Enabled lance feature "aws" for S3-compatible storage via opendal.
BucketRegistry: added with_allow_http(true) for MinIO/non-TLS S3
endpoints (fixes "builder error" on HTTP endpoints). lakehouse.toml
gains [[storage.buckets]] name="s3:lakehouse" with S3 backend config.

lance_backend.rs: S3 bucket naming convention — buckets with name
prefix "s3:" emit s3:// URIs for Lance datasets. AWS_* env vars
in the systemd unit provide credentials to Lance's internal
object_store.

Verified end-to-end on real MinIO with real 100K × 768d vectors:
  - Migrate Parquet → Lance on S3: 1.7s (vs 0.57s local)
  - Build IVF_PQ: 16.4s (CPU-bound, essentially same as local)
  - Search: ~58ms p50 (vs 11ms local — S3 partition reads)
  - Random doc fetch: 13ms (vs 3.5ms local)
  - Recall@10: 0.835 (randomized IVF_PQ, consistent with local 0.805)
  - Total S3 footprint: 637 MiB (vectors + index + lance metadata)

The "public storage" claim from the PRD is now proven: the hybrid
Parquet+HNSW ⊕ Lance architecture works on S3-compatible object
storage, not just local filesystem.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 21:09:42 -05:00

378 lines
15 KiB
Rust

/// Multi-backend bucket registry — the federation foundation.
///
/// Federation rule: every `ObjectRef` belongs to exactly one named bucket.
/// The registry resolves bucket names to `object_store` backends, handles
/// rescue-bucket fallback on read failure, writes every failure to the
/// error journal, and exposes a health summary for operators.
///
/// Existing call sites can keep using `ops::*` with `registry.get(name)`.
/// New bucket-aware call sites use `registry.read_smart` / `write_smart`
/// which handle fallback + journaling automatically.
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use serde::Serialize;
use shared::config::{BucketConfig, StorageConfig};
use shared::secrets::{BucketCredentials, SecretsProvider};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use crate::error_journal::{BucketErrorEvent, ErrorJournal};
/// A registered bucket — the store handle + its configuration.
pub struct BucketEntry {
pub name: String,
pub backend: String,
pub store: Arc<dyn ObjectStore>,
pub config: BucketConfig,
}
/// Read outcome — may have been rescued.
#[derive(Debug, Clone)]
pub struct ReadOutcome {
pub data: bytes::Bytes,
pub rescued: bool,
pub original_bucket: String,
pub served_by: String,
}
/// Summary entry for GET /storage/buckets.
#[derive(Debug, Clone, Serialize)]
pub struct BucketInfo {
pub name: String,
pub backend: String,
pub reachable: bool,
pub role: BucketRole,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum BucketRole {
Primary,
Rescue,
Profile,
Tenant,
}
pub struct BucketRegistry {
/// RwLock because federation layer 2 adds runtime bucket lifecycle
/// (`POST /storage/buckets` / `DELETE /storage/buckets/{name}`).
/// Almost all accesses are reads — writes happen at provision /
/// deactivate time only.
buckets: RwLock<HashMap<String, Arc<BucketEntry>>>,
default: String,
rescue: Option<String>,
profile_root: String,
/// Held so runtime `add_bucket` can resolve secret_ref handles.
secrets: Arc<dyn SecretsProvider>,
journal: ErrorJournal,
}
impl BucketRegistry {
/// Build the registry from storage config + secrets provider.
/// Back-compat: if `buckets` is empty, synthesize a `primary` bucket from
/// the legacy `root` field so pre-federation configs keep working.
pub async fn from_config(
cfg: &StorageConfig,
secrets: Arc<dyn SecretsProvider>,
) -> Result<Self, String> {
let mut buckets: HashMap<String, Arc<BucketEntry>> = HashMap::new();
let bucket_configs: Vec<BucketConfig> = if cfg.buckets.is_empty() {
vec![BucketConfig {
name: "primary".to_string(),
backend: "local".to_string(),
root: Some(cfg.root.clone()),
bucket: None,
region: None,
endpoint: None,
secret_ref: None,
}]
} else {
cfg.buckets.clone()
};
for bc in bucket_configs {
let store = build_store(&bc, secrets.as_ref()).await?;
let entry = Arc::new(BucketEntry {
name: bc.name.clone(),
backend: bc.backend.clone(),
store,
config: bc.clone(),
});
buckets.insert(bc.name.clone(), entry);
}
// Ensure `primary` always exists — it's where error journals live.
if !buckets.contains_key("primary") {
return Err("no bucket named 'primary' configured — required as error-journal home".into());
}
// Rescue bucket is optional but, if named, must exist.
if let Some(r) = &cfg.rescue_bucket {
if !buckets.contains_key(r) {
return Err(format!("rescue_bucket '{r}' not found among configured buckets"));
}
}
let journal = ErrorJournal::new(buckets.get("primary").unwrap().store.clone());
let _ = journal.load_recent().await;
Ok(Self {
buckets: RwLock::new(buckets),
default: "primary".to_string(),
rescue: cfg.rescue_bucket.clone(),
profile_root: cfg.profile_root.clone(),
secrets,
journal,
})
}
pub fn default_name(&self) -> &str { &self.default }
pub fn rescue_name(&self) -> Option<&str> { self.rescue.as_deref() }
pub fn profile_root(&self) -> &str { &self.profile_root }
pub fn journal(&self) -> &ErrorJournal { &self.journal }
/// Resolve a bucket name to its object store. Existing call sites use
/// this as a drop-in replacement for the old single-store pattern.
///
/// Uses `std::sync::RwLock` — every caller holds the guard for
/// microseconds (clones an Arc and drops), so there's no async-
/// blocking concern. Never hold across an await.
pub fn get(&self, bucket: &str) -> Result<Arc<dyn ObjectStore>, String> {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard
.get(bucket)
.map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))
}
/// The default bucket's store — use for code paths that don't yet know
/// about buckets.
pub fn default_store(&self) -> Arc<dyn ObjectStore> {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(&self.default).unwrap().store.clone()
}
/// True if this bucket name is registered.
pub fn contains(&self, bucket: &str) -> bool {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.contains_key(bucket)
}
/// List all registered buckets. Checks reachability by doing a trivial
/// `list` with limit 1 on each.
pub async fn list(&self) -> Vec<BucketInfo> {
// Snapshot (name, backend, store_clone) under the lock, then probe
// outside — probing can be slow (network), don't hold the lock.
let snapshot: Vec<(String, String, Arc<dyn ObjectStore>)> = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard
.iter()
.map(|(n, e)| (n.clone(), e.backend.clone(), e.store.clone()))
.collect()
};
let mut out = Vec::with_capacity(snapshot.len());
for (name, backend, store) in snapshot {
let reachable = probe(&store).await;
let role = self.classify(&name);
out.push(BucketInfo { name, backend, reachable, role });
}
out.sort_by(|a, b| a.name.cmp(&b.name));
out
}
fn classify(&self, name: &str) -> BucketRole {
if name == self.default { BucketRole::Primary }
else if Some(name) == self.rescue.as_deref() { BucketRole::Rescue }
else if name.starts_with("profile:") { BucketRole::Profile }
else { BucketRole::Tenant }
}
/// Provision + register a bucket at runtime (federation layer 2).
/// Returns Err if the name is already registered. Local backends get
/// their root directory created automatically.
pub async fn add_bucket(&self, bc: BucketConfig) -> Result<BucketInfo, String> {
if self.contains(&bc.name) {
return Err(format!("bucket '{}' already registered", bc.name));
}
let store = build_store(&bc, self.secrets.as_ref()).await?;
let reachable = probe(&store).await;
let entry = Arc::new(BucketEntry {
name: bc.name.clone(),
backend: bc.backend.clone(),
store,
config: bc.clone(),
});
let role = self.classify(&bc.name);
let info = BucketInfo {
name: bc.name.clone(),
backend: bc.backend.clone(),
reachable,
role,
};
{
let mut guard = self.buckets.write().expect("bucket registry lock poisoned");
guard.insert(bc.name.clone(), entry);
}
tracing::info!("registered bucket '{}' backend={} reachable={}",
bc.name, bc.backend, reachable);
Ok(info)
}
/// Unregister a bucket. Refuses to remove `primary` or the configured
/// rescue bucket — those are load-bearing. Caller is responsible for
/// any final flush of bucket-local state before calling this.
pub fn remove_bucket(&self, name: &str) -> Result<(), String> {
if name == self.default {
return Err("cannot remove primary bucket".into());
}
if Some(name) == self.rescue.as_deref() {
return Err(format!("cannot remove rescue bucket '{name}'"));
}
let removed = {
let mut guard = self.buckets.write().expect("bucket registry lock poisoned");
guard.remove(name)
};
if removed.is_none() {
return Err(format!("bucket '{name}' not registered"));
}
tracing::info!("unregistered bucket '{}'", name);
Ok(())
}
/// Read with rescue-bucket fallback. If the target bucket fails and a
/// rescue is configured, retries against rescue. Records every failure
/// in the error journal.
pub async fn read_smart(&self, bucket: &str, key: &str) -> Result<ReadOutcome, String> {
let target_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(bucket).map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))?
};
match crate::ops::get(&target_store, key).await {
Ok(data) => Ok(ReadOutcome {
data, rescued: false,
original_bucket: bucket.to_string(),
served_by: bucket.to_string(),
}),
Err(err) => {
// Record failure regardless of what happens next.
self.journal.append(BucketErrorEvent::new_read(bucket, key, &err)).await;
// Try rescue, if any.
if let Some(rescue_name) = &self.rescue {
if rescue_name != bucket {
let rescue_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(rescue_name).map(|e| e.store.clone())
};
if let Some(rescue_store) = rescue_store {
match crate::ops::get(&rescue_store, key).await {
Ok(data) => {
self.journal.mark_rescued_last(bucket, key).await;
return Ok(ReadOutcome {
data, rescued: true,
original_bucket: bucket.to_string(),
served_by: rescue_name.clone(),
});
}
Err(rescue_err) => {
return Err(format!(
"read '{key}' failed in '{bucket}' ({err}); rescue '{rescue_name}' also failed ({rescue_err})"
));
}
}
}
}
}
Err(format!("read '{key}' failed in '{bucket}': {err}"))
}
}
}
/// Write always goes to target. No rescue fallback for writes — writes
/// that silently vanish are the worst possible failure.
pub async fn write_smart(
&self,
bucket: &str,
key: &str,
data: bytes::Bytes,
) -> Result<(), String> {
let target_store = {
let guard = self.buckets.read().expect("bucket registry lock poisoned");
guard.get(bucket).map(|e| e.store.clone())
.ok_or_else(|| format!("unknown bucket: {bucket}"))?
};
match crate::ops::put(&target_store, key, data).await {
Ok(()) => Ok(()),
Err(err) => {
self.journal.append(BucketErrorEvent::new_write(bucket, key, &err)).await;
Err(format!("write '{key}' failed in '{bucket}': {err}"))
}
}
}
}
/// Trivial reachability check — try to list with limit 0.
async fn probe(store: &Arc<dyn ObjectStore>) -> bool {
use futures::StreamExt;
let mut stream = store.list(None);
// Pulling the first item confirms the store responds. Empty bucket = ok.
match stream.next().await {
Some(Ok(_)) => true,
None => true, // empty but reachable
Some(Err(_)) => false,
}
}
/// Build a concrete ObjectStore from a BucketConfig.
async fn build_store(
bc: &BucketConfig,
secrets: &dyn SecretsProvider,
) -> Result<Arc<dyn ObjectStore>, String> {
match bc.backend.as_str() {
"local" => {
let root = bc.root.as_deref()
.ok_or_else(|| format!("bucket '{}' is backend=local but has no root", bc.name))?;
std::fs::create_dir_all(root)
.map_err(|e| format!("create bucket dir '{root}': {e}"))?;
let fs = LocalFileSystem::new_with_prefix(root)
.map_err(|e| format!("init local bucket '{}': {e}", bc.name))?;
Ok(Arc::new(fs))
}
"s3" => {
let handle = bc.secret_ref.as_deref()
.ok_or_else(|| format!("s3 bucket '{}' has no secret_ref", bc.name))?;
let creds: BucketCredentials = secrets.resolve(handle).await?;
let s3_bucket = bc.bucket.as_deref()
.ok_or_else(|| format!("s3 bucket '{}' has no `bucket` name", bc.name))?;
let region = bc.region.as_deref().unwrap_or("us-east-1");
let mut builder = object_store::aws::AmazonS3Builder::new()
.with_bucket_name(s3_bucket)
.with_region(region)
.with_access_key_id(&creds.access_key)
.with_secret_access_key(&creds.secret_key);
if let Some(endpoint) = &bc.endpoint {
builder = builder.with_endpoint(endpoint);
// MinIO and other S3-compatible services often run on plain
// HTTP. object_store refuses HTTP by default — opt in when
// a custom endpoint is configured (TLS endpoints work either way).
if endpoint.starts_with("http://") {
builder = builder.with_allow_http(true);
}
}
let s3 = builder.build()
.map_err(|e| format!("init s3 bucket '{}': {e}", bc.name))?;
Ok(Arc::new(s3))
}
other => Err(format!("unknown backend '{other}' for bucket '{}'", bc.name)),
}
}