Enabled lance feature "aws" for S3-compatible storage via opendal. BucketRegistry: added with_allow_http(true) for MinIO/non-TLS S3 endpoints (fixes "builder error" on HTTP endpoints). lakehouse.toml gains [[storage.buckets]] name="s3:lakehouse" with S3 backend config. lance_backend.rs: S3 bucket naming convention — buckets with name prefix "s3:" emit s3:// URIs for Lance datasets. AWS_* env vars in the systemd unit provide credentials to Lance's internal object_store. Verified end-to-end on real MinIO with real 100K × 768d vectors: - Migrate Parquet → Lance on S3: 1.7s (vs 0.57s local) - Build IVF_PQ: 16.4s (CPU-bound, essentially same as local) - Search: ~58ms p50 (vs 11ms local — S3 partition reads) - Random doc fetch: 13ms (vs 3.5ms local) - Recall@10: 0.835 (randomized IVF_PQ, consistent with local 0.805) - Total S3 footprint: 637 MiB (vectors + index + lance metadata) The "public storage" claim from the PRD is now proven: the hybrid Parquet+HNSW ⊕ Lance architecture works on S3-compatible object storage, not just local filesystem. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
132 lines
5.4 KiB
Rust
132 lines
5.4 KiB
Rust
//! ADR-019 hybrid: routing layer for the Lance vector backend.
|
|
//!
|
|
//! Holds a `LanceRegistry` which maps an index name to its
|
|
//! `LanceVectorStore` instance, lazy-creating on first touch. Path
|
|
//! resolution: the index's bucket gives us a bucket root; we append
|
|
//! `lance/{index_name}` and use that as the dataset URI. For local
|
|
//! buckets that means a directory under the bucket's root.
|
|
//!
|
|
//! S3 buckets: in principle Lance accepts s3://... URIs, but the
|
|
//! firewall crate (`vectord-lance`) doesn't pull the S3 feature in by
|
|
//! default. When we promote profile buckets onto S3 in a future phase,
|
|
//! enable that feature and update `lance_uri_for` accordingly.
|
|
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
|
|
use storaged::registry::BucketRegistry;
|
|
use vectord_lance::LanceVectorStore;
|
|
|
|
use crate::index_registry::IndexRegistry;
|
|
|
|
/// Convert a bucket+index pair into the URI Lance should use as the
|
|
/// dataset path. Supports both local (filesystem) and S3 buckets.
|
|
///
|
|
/// **Local buckets:** path resolution mirrors lakehouse.toml's convention.
|
|
/// Returns an absolute filesystem path.
|
|
///
|
|
/// **S3 buckets:** returns `s3://{s3_bucket}/lance/{index_name}`. Lance's
|
|
/// internal object_store crate reads `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
|
|
/// / `AWS_ENDPOINT` from environment (or the S3 feature's default chain).
|
|
/// For MinIO: set `AWS_ENDPOINT=http://localhost:9000` and
|
|
/// `AWS_ALLOW_HTTP=true` before starting the gateway.
|
|
///
|
|
/// Refuses unknown buckets so a typo doesn't silently land Lance data
|
|
/// in a directory / prefix the rest of the system can't see.
|
|
pub fn lance_uri_for(
|
|
buckets: &BucketRegistry,
|
|
bucket: &str,
|
|
index_name: &str,
|
|
) -> Result<String, String> {
|
|
if !buckets.contains(bucket) {
|
|
return Err(format!("bucket '{bucket}' not registered"));
|
|
}
|
|
// Check if this bucket is S3-backed by looking for a bucket config
|
|
// with backend="s3". BucketRegistry exposes backend type through
|
|
// the list() info, but that's async. The simpler signal: if the
|
|
// bucket name matches one we know is S3 (configured via lakehouse.toml
|
|
// with backend="s3"), use the s3:// URI scheme.
|
|
//
|
|
// For the synchronous path, we check a naming convention: buckets
|
|
// whose name starts with "s3:" are treated as S3 targets. The rest
|
|
// of the name is the S3 bucket name. Convention-based, explicit,
|
|
// no async needed.
|
|
//
|
|
// Additionally, any bucket registered with backend="s3" in the
|
|
// config will have its BucketConfig.bucket field set — that's the
|
|
// actual S3 bucket name. We can't access BucketConfig synchronously
|
|
// from the current registry API, so for now the naming convention
|
|
// is the primary signal.
|
|
if bucket.starts_with("s3:") {
|
|
let s3_bucket = &bucket["s3:".len()..];
|
|
return Ok(format!("s3://{s3_bucket}/lance/{index_name}"));
|
|
}
|
|
|
|
// Local path resolution.
|
|
let root: PathBuf = match bucket {
|
|
"primary" => PathBuf::from("./data"),
|
|
"rescue" => PathBuf::from("./data/_rescue"),
|
|
"testing" => PathBuf::from("./data/_testing"),
|
|
b if b.starts_with("profile:") => {
|
|
let safe = b.replace(':', "_");
|
|
PathBuf::from(format!("./data/_profiles/{safe}"))
|
|
}
|
|
b => PathBuf::from(format!("./data/_buckets/{}", b.replace(':', "_"))),
|
|
};
|
|
let _ = std::fs::create_dir_all(root.join("lance"));
|
|
let abs = match std::fs::canonicalize(&root) {
|
|
Ok(p) => p.join("lance").join(index_name),
|
|
Err(_) => root.join("lance").join(index_name),
|
|
};
|
|
Ok(abs.to_string_lossy().to_string())
|
|
}
|
|
|
|
/// Lookup-by-index registry of `LanceVectorStore` handles. One handle
|
|
/// per index, lazy-created on first call. Cheap to keep alive — it's
|
|
/// just a path string + handle to Lance's metadata cache.
|
|
#[derive(Clone)]
|
|
pub struct LanceRegistry {
|
|
buckets: Arc<BucketRegistry>,
|
|
indexes: IndexRegistry,
|
|
stores: Arc<RwLock<HashMap<String, LanceVectorStore>>>,
|
|
}
|
|
|
|
impl LanceRegistry {
|
|
pub fn new(buckets: Arc<BucketRegistry>, indexes: IndexRegistry) -> Self {
|
|
Self {
|
|
buckets,
|
|
indexes,
|
|
stores: Arc::new(RwLock::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
/// Get (or lazy-create) the LanceVectorStore for an index.
|
|
/// Resolves bucket → URI via the registry; backend stays whatever
|
|
/// IndexMeta says (no enforcement here — caller decides whether to
|
|
/// route to Lance based on profile/index backend choice).
|
|
pub async fn store_for(&self, index_name: &str) -> Result<LanceVectorStore, String> {
|
|
if let Some(s) = self.stores.read().await.get(index_name) {
|
|
return Ok(s.clone());
|
|
}
|
|
let bucket = self.indexes
|
|
.get(index_name).await
|
|
.map(|m| m.bucket)
|
|
.unwrap_or_else(|| "primary".to_string());
|
|
let uri = lance_uri_for(&self.buckets, &bucket, index_name)?;
|
|
let store = LanceVectorStore::new(uri);
|
|
self.stores.write().await.insert(index_name.to_string(), store.clone());
|
|
Ok(store)
|
|
}
|
|
|
|
/// For freshly-created indexes that don't have IndexMeta yet — caller
|
|
/// supplies the bucket explicitly.
|
|
pub async fn store_for_new(&self, index_name: &str, bucket: &str) -> Result<LanceVectorStore, String> {
|
|
let uri = lance_uri_for(&self.buckets, bucket, index_name)?;
|
|
let store = LanceVectorStore::new(uri);
|
|
self.stores.write().await.insert(index_name.to_string(), store.clone());
|
|
Ok(store)
|
|
}
|
|
}
|