//! ADR-019 hybrid: routing layer for the Lance vector backend. //! //! Holds a `LanceRegistry` which maps an index name to its //! `LanceVectorStore` instance, lazy-creating on first touch. Path //! resolution: the index's bucket gives us a bucket root; we append //! `lance/{index_name}` and use that as the dataset URI. For local //! buckets that means a directory under the bucket's root. //! //! S3 buckets: in principle Lance accepts s3://... URIs, but the //! firewall crate (`vectord-lance`) doesn't pull the S3 feature in by //! default. When we promote profile buckets onto S3 in a future phase, //! enable that feature and update `lance_uri_for` accordingly. use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; use storaged::registry::BucketRegistry; use vectord_lance::LanceVectorStore; use crate::index_registry::IndexRegistry; /// Convert a bucket+index pair into the URI Lance should use as the /// dataset path. Supports both local (filesystem) and S3 buckets. /// /// **Local buckets:** path resolution mirrors lakehouse.toml's convention. /// Returns an absolute filesystem path. /// /// **S3 buckets:** returns `s3://{s3_bucket}/lance/{index_name}`. Lance's /// internal object_store crate reads `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` /// / `AWS_ENDPOINT` from environment (or the S3 feature's default chain). /// For MinIO: set `AWS_ENDPOINT=http://localhost:9000` and /// `AWS_ALLOW_HTTP=true` before starting the gateway. /// /// Refuses unknown buckets so a typo doesn't silently land Lance data /// in a directory / prefix the rest of the system can't see. pub fn lance_uri_for( buckets: &BucketRegistry, bucket: &str, index_name: &str, ) -> Result { if !buckets.contains(bucket) { return Err(format!("bucket '{bucket}' not registered")); } // Check if this bucket is S3-backed by looking for a bucket config // with backend="s3". BucketRegistry exposes backend type through // the list() info, but that's async. The simpler signal: if the // bucket name matches one we know is S3 (configured via lakehouse.toml // with backend="s3"), use the s3:// URI scheme. // // For the synchronous path, we check a naming convention: buckets // whose name starts with "s3:" are treated as S3 targets. The rest // of the name is the S3 bucket name. Convention-based, explicit, // no async needed. // // Additionally, any bucket registered with backend="s3" in the // config will have its BucketConfig.bucket field set — that's the // actual S3 bucket name. We can't access BucketConfig synchronously // from the current registry API, so for now the naming convention // is the primary signal. if bucket.starts_with("s3:") { let s3_bucket = &bucket["s3:".len()..]; return Ok(format!("s3://{s3_bucket}/lance/{index_name}")); } // Local path resolution. let root: PathBuf = match bucket { "primary" => PathBuf::from("./data"), "rescue" => PathBuf::from("./data/_rescue"), "testing" => PathBuf::from("./data/_testing"), b if b.starts_with("profile:") => { let safe = b.replace(':', "_"); PathBuf::from(format!("./data/_profiles/{safe}")) } b => PathBuf::from(format!("./data/_buckets/{}", b.replace(':', "_"))), }; let _ = std::fs::create_dir_all(root.join("lance")); let abs = match std::fs::canonicalize(&root) { Ok(p) => p.join("lance").join(index_name), Err(_) => root.join("lance").join(index_name), }; Ok(abs.to_string_lossy().to_string()) } /// Lookup-by-index registry of `LanceVectorStore` handles. One handle /// per index, lazy-created on first call. Cheap to keep alive — it's /// just a path string + handle to Lance's metadata cache. #[derive(Clone)] pub struct LanceRegistry { buckets: Arc, indexes: IndexRegistry, stores: Arc>>, } impl LanceRegistry { pub fn new(buckets: Arc, indexes: IndexRegistry) -> Self { Self { buckets, indexes, stores: Arc::new(RwLock::new(HashMap::new())), } } /// Get (or lazy-create) the LanceVectorStore for an index. /// Resolves bucket → URI via the registry; backend stays whatever /// IndexMeta says (no enforcement here — caller decides whether to /// route to Lance based on profile/index backend choice). pub async fn store_for(&self, index_name: &str) -> Result { if let Some(s) = self.stores.read().await.get(index_name) { return Ok(s.clone()); } let bucket = self.indexes .get(index_name).await .map(|m| m.bucket) .unwrap_or_else(|| "primary".to_string()); let uri = lance_uri_for(&self.buckets, &bucket, index_name)?; let store = LanceVectorStore::new(uri); self.stores.write().await.insert(index_name.to_string(), store.clone()); Ok(store) } /// For freshly-created indexes that don't have IndexMeta yet — caller /// supplies the bucket explicitly. pub async fn store_for_new(&self, index_name: &str, bucket: &str) -> Result { let uri = lance_uri_for(&self.buckets, bucket, index_name)?; let store = LanceVectorStore::new(uri); self.stores.write().await.insert(index_name.to_string(), store.clone()); Ok(store) } }