/// Multi-backend bucket registry — the federation foundation. /// /// Federation rule: every `ObjectRef` belongs to exactly one named bucket. /// The registry resolves bucket names to `object_store` backends, handles /// rescue-bucket fallback on read failure, writes every failure to the /// error journal, and exposes a health summary for operators. /// /// Existing call sites can keep using `ops::*` with `registry.get(name)`. /// New bucket-aware call sites use `registry.read_smart` / `write_smart` /// which handle fallback + journaling automatically. use object_store::ObjectStore; use object_store::local::LocalFileSystem; use serde::Serialize; use shared::config::{BucketConfig, StorageConfig}; use shared::secrets::{BucketCredentials, SecretsProvider}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use crate::error_journal::{BucketErrorEvent, ErrorJournal}; /// A registered bucket — the store handle + its configuration. pub struct BucketEntry { pub name: String, pub backend: String, pub store: Arc, pub config: BucketConfig, } /// Read outcome — may have been rescued. #[derive(Debug, Clone)] pub struct ReadOutcome { pub data: bytes::Bytes, pub rescued: bool, pub original_bucket: String, pub served_by: String, } /// Summary entry for GET /storage/buckets. #[derive(Debug, Clone, Serialize)] pub struct BucketInfo { pub name: String, pub backend: String, pub reachable: bool, pub role: BucketRole, } #[derive(Debug, Clone, Serialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum BucketRole { Primary, Rescue, Profile, Tenant, } pub struct BucketRegistry { /// RwLock because federation layer 2 adds runtime bucket lifecycle /// (`POST /storage/buckets` / `DELETE /storage/buckets/{name}`). /// Almost all accesses are reads — writes happen at provision / /// deactivate time only. buckets: RwLock>>, default: String, rescue: Option, profile_root: String, /// Held so runtime `add_bucket` can resolve secret_ref handles. secrets: Arc, journal: ErrorJournal, } impl BucketRegistry { /// Build the registry from storage config + secrets provider. /// Back-compat: if `buckets` is empty, synthesize a `primary` bucket from /// the legacy `root` field so pre-federation configs keep working. pub async fn from_config( cfg: &StorageConfig, secrets: Arc, ) -> Result { let mut buckets: HashMap> = HashMap::new(); let bucket_configs: Vec = if cfg.buckets.is_empty() { vec![BucketConfig { name: "primary".to_string(), backend: "local".to_string(), root: Some(cfg.root.clone()), bucket: None, region: None, endpoint: None, secret_ref: None, }] } else { cfg.buckets.clone() }; for bc in bucket_configs { let store = build_store(&bc, secrets.as_ref()).await?; let entry = Arc::new(BucketEntry { name: bc.name.clone(), backend: bc.backend.clone(), store, config: bc.clone(), }); buckets.insert(bc.name.clone(), entry); } // Ensure `primary` always exists — it's where error journals live. if !buckets.contains_key("primary") { return Err("no bucket named 'primary' configured — required as error-journal home".into()); } // Rescue bucket is optional but, if named, must exist. if let Some(r) = &cfg.rescue_bucket { if !buckets.contains_key(r) { return Err(format!("rescue_bucket '{r}' not found among configured buckets")); } } let journal = ErrorJournal::new(buckets.get("primary").unwrap().store.clone()); let _ = journal.load_recent().await; Ok(Self { buckets: RwLock::new(buckets), default: "primary".to_string(), rescue: cfg.rescue_bucket.clone(), profile_root: cfg.profile_root.clone(), secrets, journal, }) } pub fn default_name(&self) -> &str { &self.default } pub fn rescue_name(&self) -> Option<&str> { self.rescue.as_deref() } pub fn profile_root(&self) -> &str { &self.profile_root } pub fn journal(&self) -> &ErrorJournal { &self.journal } /// Resolve a bucket name to its object store. Existing call sites use /// this as a drop-in replacement for the old single-store pattern. /// /// Uses `std::sync::RwLock` — every caller holds the guard for /// microseconds (clones an Arc and drops), so there's no async- /// blocking concern. Never hold across an await. pub fn get(&self, bucket: &str) -> Result, String> { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard .get(bucket) .map(|e| e.store.clone()) .ok_or_else(|| format!("unknown bucket: {bucket}")) } /// The default bucket's store — use for code paths that don't yet know /// about buckets. pub fn default_store(&self) -> Arc { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard.get(&self.default).unwrap().store.clone() } /// True if this bucket name is registered. pub fn contains(&self, bucket: &str) -> bool { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard.contains_key(bucket) } /// List all registered buckets. Checks reachability by doing a trivial /// `list` with limit 1 on each. pub async fn list(&self) -> Vec { // Snapshot (name, backend, store_clone) under the lock, then probe // outside — probing can be slow (network), don't hold the lock. let snapshot: Vec<(String, String, Arc)> = { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard .iter() .map(|(n, e)| (n.clone(), e.backend.clone(), e.store.clone())) .collect() }; let mut out = Vec::with_capacity(snapshot.len()); for (name, backend, store) in snapshot { let reachable = probe(&store).await; let role = self.classify(&name); out.push(BucketInfo { name, backend, reachable, role }); } out.sort_by(|a, b| a.name.cmp(&b.name)); out } fn classify(&self, name: &str) -> BucketRole { if name == self.default { BucketRole::Primary } else if Some(name) == self.rescue.as_deref() { BucketRole::Rescue } else if name.starts_with("profile:") { BucketRole::Profile } else { BucketRole::Tenant } } /// Provision + register a bucket at runtime (federation layer 2). /// Returns Err if the name is already registered. Local backends get /// their root directory created automatically. pub async fn add_bucket(&self, bc: BucketConfig) -> Result { if self.contains(&bc.name) { return Err(format!("bucket '{}' already registered", bc.name)); } let store = build_store(&bc, self.secrets.as_ref()).await?; let reachable = probe(&store).await; let entry = Arc::new(BucketEntry { name: bc.name.clone(), backend: bc.backend.clone(), store, config: bc.clone(), }); let role = self.classify(&bc.name); let info = BucketInfo { name: bc.name.clone(), backend: bc.backend.clone(), reachable, role, }; { let mut guard = self.buckets.write().expect("bucket registry lock poisoned"); guard.insert(bc.name.clone(), entry); } tracing::info!("registered bucket '{}' backend={} reachable={}", bc.name, bc.backend, reachable); Ok(info) } /// Unregister a bucket. Refuses to remove `primary` or the configured /// rescue bucket — those are load-bearing. Caller is responsible for /// any final flush of bucket-local state before calling this. pub fn remove_bucket(&self, name: &str) -> Result<(), String> { if name == self.default { return Err("cannot remove primary bucket".into()); } if Some(name) == self.rescue.as_deref() { return Err(format!("cannot remove rescue bucket '{name}'")); } let removed = { let mut guard = self.buckets.write().expect("bucket registry lock poisoned"); guard.remove(name) }; if removed.is_none() { return Err(format!("bucket '{name}' not registered")); } tracing::info!("unregistered bucket '{}'", name); Ok(()) } /// Read with rescue-bucket fallback. If the target bucket fails and a /// rescue is configured, retries against rescue. Records every failure /// in the error journal. pub async fn read_smart(&self, bucket: &str, key: &str) -> Result { let target_store = { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard.get(bucket).map(|e| e.store.clone()) .ok_or_else(|| format!("unknown bucket: {bucket}"))? }; match crate::ops::get(&target_store, key).await { Ok(data) => Ok(ReadOutcome { data, rescued: false, original_bucket: bucket.to_string(), served_by: bucket.to_string(), }), Err(err) => { // Record failure regardless of what happens next. self.journal.append(BucketErrorEvent::new_read(bucket, key, &err)).await; // Try rescue, if any. if let Some(rescue_name) = &self.rescue { if rescue_name != bucket { let rescue_store = { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard.get(rescue_name).map(|e| e.store.clone()) }; if let Some(rescue_store) = rescue_store { match crate::ops::get(&rescue_store, key).await { Ok(data) => { self.journal.mark_rescued_last(bucket, key).await; return Ok(ReadOutcome { data, rescued: true, original_bucket: bucket.to_string(), served_by: rescue_name.clone(), }); } Err(rescue_err) => { return Err(format!( "read '{key}' failed in '{bucket}' ({err}); rescue '{rescue_name}' also failed ({rescue_err})" )); } } } } } Err(format!("read '{key}' failed in '{bucket}': {err}")) } } } /// Write always goes to target. No rescue fallback for writes — writes /// that silently vanish are the worst possible failure. pub async fn write_smart( &self, bucket: &str, key: &str, data: bytes::Bytes, ) -> Result<(), String> { let target_store = { let guard = self.buckets.read().expect("bucket registry lock poisoned"); guard.get(bucket).map(|e| e.store.clone()) .ok_or_else(|| format!("unknown bucket: {bucket}"))? }; match crate::ops::put(&target_store, key, data).await { Ok(()) => Ok(()), Err(err) => { self.journal.append(BucketErrorEvent::new_write(bucket, key, &err)).await; Err(format!("write '{key}' failed in '{bucket}': {err}")) } } } } /// Trivial reachability check — try to list with limit 0. async fn probe(store: &Arc) -> bool { use futures::StreamExt; let mut stream = store.list(None); // Pulling the first item confirms the store responds. Empty bucket = ok. match stream.next().await { Some(Ok(_)) => true, None => true, // empty but reachable Some(Err(_)) => false, } } /// Build a concrete ObjectStore from a BucketConfig. async fn build_store( bc: &BucketConfig, secrets: &dyn SecretsProvider, ) -> Result, String> { match bc.backend.as_str() { "local" => { let root = bc.root.as_deref() .ok_or_else(|| format!("bucket '{}' is backend=local but has no root", bc.name))?; std::fs::create_dir_all(root) .map_err(|e| format!("create bucket dir '{root}': {e}"))?; let fs = LocalFileSystem::new_with_prefix(root) .map_err(|e| format!("init local bucket '{}': {e}", bc.name))?; Ok(Arc::new(fs)) } "s3" => { let handle = bc.secret_ref.as_deref() .ok_or_else(|| format!("s3 bucket '{}' has no secret_ref", bc.name))?; let creds: BucketCredentials = secrets.resolve(handle).await?; let s3_bucket = bc.bucket.as_deref() .ok_or_else(|| format!("s3 bucket '{}' has no `bucket` name", bc.name))?; let region = bc.region.as_deref().unwrap_or("us-east-1"); let mut builder = object_store::aws::AmazonS3Builder::new() .with_bucket_name(s3_bucket) .with_region(region) .with_access_key_id(&creds.access_key) .with_secret_access_key(&creds.secret_key); if let Some(endpoint) = &bc.endpoint { builder = builder.with_endpoint(endpoint); // MinIO and other S3-compatible services often run on plain // HTTP. object_store refuses HTTP by default — opt in when // a custom endpoint is configured (TLS endpoints work either way). if endpoint.starts_with("http://") { builder = builder.with_allow_http(true); } } let s3 = builder.build() .map_err(|e| format!("init s3 bucket '{}': {e}", bc.name))?; Ok(Arc::new(s3)) } other => Err(format!("unknown backend '{other}' for bucket '{}'", bc.name)), } }