profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

606 lines
23 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Production Lance vector backend (ADR-019 — hybrid architecture).
//!
//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52
//! / Lance 4 dependency tree. The public API uses only std types
//! (`Vec<f32>`, `Vec<String>`, `String`, `bool`) so nothing Arrow-shaped
//! crosses the crate boundary. That keeps vectord (Arrow 55) from
//! picking up an incompatible dep.
//!
//! Responsibilities:
//! - Migrate an existing binary-blob Parquet vector file into a Lance
//! dataset with `FixedSizeList<Float32, dims>`. One-time cost.
//! - Append new rows natively (no full-file rewrite — Lance's structural win).
//! - Build an IVF_PQ ANN index on the vector column.
//! - Vector search (`search`) using the IVF_PQ index when present,
//! falling back to full scan otherwise.
//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1)
//! lookup that Parquet-on-object-store can't cheaply do.
//! - Cheap count + stats introspection.
use arrow_array::{
Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array,
RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use futures::StreamExt;
use serde::Serialize;
use std::sync::Arc;
use std::time::Instant;
// ================= Public types =================
/// One search result. Mirrors vectord's existing `SearchResult` shape
/// structurally but carries simpler types so this crate stays firewalled.
#[derive(Debug, Clone, Serialize)]
pub struct Hit {
pub doc_id: String,
pub chunk_text: String,
pub score: f32,
/// Optional — set by search_with_vector, not by index-only search.
pub distance: Option<f32>,
}
/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector
/// so callers can do downstream work (rerank, cite, etc.) without a
/// second round trip.
#[derive(Debug, Clone, Serialize)]
pub struct Row {
pub doc_id: String,
pub chunk_text: String,
pub vector: Vec<f32>,
pub source: Option<String>,
pub chunk_idx: Option<i32>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MigrationStats {
pub rows_written: usize,
pub dimensions: usize,
pub disk_bytes: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct AppendStats {
pub rows_appended: usize,
pub disk_bytes_added: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct IndexStats {
pub name: String,
pub num_partitions: u32,
pub num_bits: u32,
pub num_sub_vectors: u32,
pub build_time_secs: f32,
pub disk_bytes_added: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct ScalarIndexStats {
pub name: String,
pub column: String,
pub build_time_secs: f32,
pub disk_bytes_added: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DatasetStats {
pub path: String,
pub rows: usize,
pub disk_bytes: u64,
pub has_vector_index: bool,
pub has_doc_id_index: bool,
}
// ================= The backend =================
/// Thin wrapper around a Lance dataset path. Lance handles the heavy
/// lifting — we just expose a narrow API.
#[derive(Clone)]
pub struct LanceVectorStore {
/// Local filesystem path or object-store URI (file:///..., s3://...).
/// Lance's internal URI parsing handles both.
path: String,
}
impl LanceVectorStore {
pub fn new(path: impl Into<String>) -> Self {
Self { path: path.into() }
}
pub fn path(&self) -> &str { &self.path }
/// Row count via Lance's fast metadata path (no scan).
pub async fn count(&self) -> Result<usize, String> {
let dataset = open_or_err(&self.path).await?;
let n = dataset.count_rows(None).await.map_err(e)?;
Ok(n)
}
/// True if the on-disk dataset exists AND has at least one `vector`
/// column index attached.
pub async fn has_vector_index(&self) -> Result<bool, String> {
use lance_index::DatasetIndexExt;
let dataset = match lance::dataset::Dataset::open(&self.path).await {
Ok(d) => d,
Err(_) => return Ok(false),
};
let indexes = dataset.load_indices().await.map_err(e)?;
Ok(indexes.iter().any(|ix| {
ix.fields.iter().any(|fid| {
dataset.schema().field_by_id(*fid)
.map(|f| f.name == "vector")
.unwrap_or(false)
})
}))
}
pub async fn stats(&self) -> Result<DatasetStats, String> {
let rows = self.count().await.unwrap_or(0);
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let has_vector_index = self.has_vector_index().await.unwrap_or(false);
let has_doc_id_index = self.has_scalar_index("doc_id").await.unwrap_or(false);
Ok(DatasetStats {
path: self.path.clone(),
rows,
disk_bytes,
has_vector_index,
has_doc_id_index,
})
}
/// Migrate a vectord-format Parquet file into a Lance dataset.
///
/// Input schema (vectord's binary-blob format):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : Binary (raw f32 little-endian bytes)
///
/// Output schema (Lance-friendly):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : FixedSizeList<Float32, dims>
///
/// Idempotent at the file level — if the target exists, it's
/// overwritten. Caller must manage destination paths.
pub async fn migrate_from_parquet_bytes(
&self,
parquet_bytes: &[u8],
) -> Result<MigrationStats, String> {
let t0 = Instant::now();
let (schema, batches, rows) = read_parquet(parquet_bytes)?;
let dims = detect_vector_dims(&batches)?;
let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
// Overwrite any prior dataset at this path.
let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path));
write_dataset(&self.path, new_schema, new_batches).await?;
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
Ok(MigrationStats {
rows_written: rows,
dimensions: dims,
disk_bytes,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Native Lance append — does NOT rewrite existing files. New rows
/// land as a separate fragment; readers union across fragments at
/// query time. Contrast: our Parquet path requires rewriting the
/// entire vector file to add rows.
pub async fn append(
&self,
source: Option<String>,
doc_ids: Vec<String>,
chunk_idxs: Vec<i32>,
chunk_texts: Vec<String>,
vectors: Vec<Vec<f32>>,
) -> Result<AppendStats, String> {
let n = doc_ids.len();
if n == 0 {
return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 });
}
if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n {
return Err(format!(
"append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})",
chunk_idxs.len(), chunk_texts.len(), vectors.len(),
));
}
let dims = vectors[0].len();
for (i, v) in vectors.iter().enumerate() {
if v.len() != dims {
return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims));
}
}
let t0 = Instant::now();
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let src_arr = StringArray::from(
(0..n).map(|_| source.clone()).collect::<Vec<_>>()
);
let doc_id_arr = StringArray::from(doc_ids);
let chunk_idx_arr = Int32Array::from(chunk_idxs);
let chunk_text_arr = StringArray::from(chunk_texts);
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for v in vectors { flat.extend(v); }
let values = Float32Array::from(flat);
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
let vec_arr = FixedSizeListArray::try_new(
item_field.clone(), dims as i32, Arc::new(values), None,
).map_err(e)?;
let schema = Arc::new(Schema::new(vec![
Field::new("source", DataType::Utf8, true),
Field::new("doc_id", DataType::Utf8, false),
Field::new("chunk_idx", DataType::Int32, true),
Field::new("chunk_text", DataType::Utf8, true),
Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false),
]));
let arrays: Vec<ArrayRef> = vec![
Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr),
Arc::new(chunk_text_arr), Arc::new(vec_arr),
];
let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema);
use lance::dataset::{Dataset, WriteMode, WriteParams};
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?;
Ok(AppendStats {
rows_appended: n,
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Build an IVF_PQ vector index. Replaces any prior index with the
/// same name. Callers pass explicit params — sensible defaults for
/// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48.
pub async fn build_index(
&self,
num_partitions: u32,
num_bits: u32,
num_sub_vectors: u32,
) -> Result<IndexStats, String> {
use lance::dataset::Dataset;
use lance::index::vector::VectorIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let t0 = Instant::now();
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
let params = VectorIndexParams::ivf_pq(
num_partitions as usize,
num_bits as u8,
num_sub_vectors as usize,
MetricType::Cosine,
50, // max_iterations — same as bench
);
dataset.create_index(
&["vector"],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true, // replace
).await.map_err(e)?;
Ok(IndexStats {
name: "vec_idx".into(),
num_partitions,
num_bits,
num_sub_vectors,
build_time_secs: t0.elapsed().as_secs_f32(),
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
})
}
/// Build a scalar btree index on `doc_id`. Makes `get_by_doc_id`
/// O(log N) instead of a filter-scan of every fragment. Small index
/// footprint — a few hundred KB on 100K rows.
pub async fn build_scalar_index(&self, column: &str) -> Result<ScalarIndexStats, String> {
use lance::dataset::Dataset;
use lance_index::{DatasetIndexExt, IndexType};
use lance_index::scalar::ScalarIndexParams;
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let t0 = Instant::now();
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
let idx_name = format!("{column}_btree");
dataset.create_index(
&[column],
IndexType::Scalar,
Some(idx_name.clone()),
&ScalarIndexParams::default(),
true,
).await.map_err(e)?;
Ok(ScalarIndexStats {
name: idx_name,
column: column.to_string(),
build_time_secs: t0.elapsed().as_secs_f32(),
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
})
}
/// True if a scalar index exists on the named column.
pub async fn has_scalar_index(&self, column: &str) -> Result<bool, String> {
use lance_index::DatasetIndexExt;
let dataset = match lance::dataset::Dataset::open(&self.path).await {
Ok(d) => d,
Err(_) => return Ok(false),
};
let indexes = dataset.load_indices().await.map_err(e)?;
Ok(indexes.iter().any(|ix| {
ix.fields.iter().any(|fid| {
dataset.schema().field_by_id(*fid)
.map(|f| f.name == column)
.unwrap_or(false)
})
}))
}
/// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ
/// index if one exists; otherwise does a full scan (slow but
/// correct — useful during development before index build).
///
/// `nprobes` tells Lance how many IVF partitions to probe per query.
/// Lance's built-in default is **1**, which caps recall well below
/// what the index is actually capable of. Passing `None` keeps that
/// default (only sensible when latency trumps recall); for real
/// workloads set `nprobes` to 510% of `num_partitions` (e.g. 2030
/// on a 316-partition index).
///
/// `refine_factor` re-ranks the PQ-approximate top-k by computing
/// exact distances on `top_k * refine_factor` candidates, then
/// trimming to `top_k`. Cheap way to buy back recall lost to product
/// quantization. `None` skips the re-rank pass.
pub async fn search(
&self,
query: &[f32],
top_k: usize,
nprobes: Option<usize>,
refine_factor: Option<u32>,
) -> Result<Vec<Hit>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let qarr = Float32Array::from(query.to_vec());
let mut scanner = dataset.scan();
scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?;
if let Some(n) = nprobes { scanner.nprobes(n); }
if let Some(f) = refine_factor { scanner.refine(f); }
scanner.project(&["doc_id", "chunk_text"]).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
let mut hits: Vec<Hit> = Vec::with_capacity(top_k);
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
let doc_ids = batch.column_by_name("doc_id")
.ok_or_else(|| "no doc_id column in search result".to_string())?
.as_any().downcast_ref::<StringArray>()
.ok_or_else(|| "doc_id is not StringArray".to_string())?;
let chunk_texts = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
// Lance tacks on a `_distance` column for nearest() queries.
let distances = batch.column_by_name("_distance")
.and_then(|c| c.as_any().downcast_ref::<Float32Array>());
for row in 0..batch.num_rows() {
let d = distances.map(|a| a.value(row));
hits.push(Hit {
doc_id: doc_ids.value(row).to_string(),
chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(),
score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity
distance: d,
});
}
}
Ok(hits)
}
/// Fetch one row by doc_id. Implementation: Lance filter-pushdown
/// scan — O(1) with partition pruning on a proper btree index,
/// O(N) on vector-only datasets (still far faster than reading
/// the whole Parquet file). We don't build a scalar index on
/// doc_id yet; that's a future optimization.
pub async fn get_by_doc_id(&self, doc_id: &str) -> Result<Option<Row>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''"));
let mut scanner = dataset.scan();
scanner.filter(&filter).map_err(e)?;
scanner.limit(Some(1), None).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
if batch.num_rows() == 0 { continue; }
return Ok(Some(row_from_batch(&batch, 0)?));
}
Ok(None)
}
}
// ================= Internal helpers =================
fn e<T: std::fmt::Display>(err: T) -> String { err.to_string() }
/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for
/// helpers that only work on local paths (dir_size_bytes, remove_dir_all).
fn strip_file_uri(uri: &str) -> String {
uri.strip_prefix("file://").unwrap_or(uri).to_string()
}
async fn open_or_err(path: &str) -> Result<lance::dataset::Dataset, String> {
lance::dataset::Dataset::open(path).await.map_err(e)
}
fn dir_size_bytes(path: &str) -> u64 {
fn recurse(p: &std::path::Path) -> u64 {
let Ok(meta) = std::fs::metadata(p) else { return 0; };
if meta.is_file() { return meta.len(); }
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum()
}
recurse(std::path::Path::new(path))
}
fn read_parquet(bytes: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize), String> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes))
.map_err(e)?;
let schema = builder.schema().clone();
let reader = builder.build().map_err(e)?;
let batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().map_err(e)?;
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
Ok((schema, batches, rows))
}
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize, String> {
for batch in batches {
let idx = batch.schema().index_of("vector")
.map_err(|_| "no 'vector' column".to_string())?;
let col = batch.column(idx);
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
for i in 0..binary.len() {
if !binary.is_null(i) {
return Ok(binary.value(i).len() / 4);
}
}
} else if let Some(fsl) = col.as_any().downcast_ref::<FixedSizeListArray>() {
return Ok(fsl.value_length() as usize);
}
}
Err("could not determine vector dimensions".into())
}
fn convert_to_fixed_size_list(
schema: &Arc<Schema>,
batches: Vec<RecordBatch>,
dims: usize,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let new_fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.map(|f| {
if f.name() == "vector" {
Arc::new(Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dims as i32,
),
false,
))
} else {
f.clone()
}
})
.collect();
let new_schema = Arc::new(Schema::new(new_fields));
let mut out = Vec::with_capacity(batches.len());
for batch in batches {
let vec_idx = batch.schema().index_of("vector").map_err(e)?;
let mut new_cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
for (i, col) in batch.columns().iter().enumerate() {
if i == vec_idx {
if let Some(bin) = col.as_any().downcast_ref::<BinaryArray>() {
new_cols.push(Arc::new(binary_to_fsl(bin, dims)?));
} else if col.as_any().is::<FixedSizeListArray>() {
// Already in the right shape — just clone.
new_cols.push(col.clone());
} else {
return Err("vector column is neither Binary nor FixedSizeList".into());
}
} else {
new_cols.push(col.clone());
}
}
out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?);
}
Ok((new_schema, out))
}
fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result<FixedSizeListArray, String> {
let n = bin.len();
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for i in 0..n {
if bin.is_null(i) {
flat.extend(std::iter::repeat(0.0).take(dims));
continue;
}
let b = bin.value(i);
if b.len() != dims * 4 {
return Err(format!(
"row {i}: {} bytes vs expected {} ({} × f32)",
b.len(), dims * 4, dims,
));
}
for c in b.chunks_exact(4) {
flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]]));
}
}
let values = Float32Array::from(flat);
let field = Arc::new(Field::new("item", DataType::Float32, true));
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e)
}
async fn write_dataset(
path: &str,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Result<(), String> {
use lance::dataset::{Dataset, WriteParams};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Dataset::write(reader, path, Some(WriteParams::default()))
.await.map_err(e)?;
Ok(())
}
fn row_from_batch(batch: &RecordBatch, row: usize) -> Result<Row, String> {
let doc_id = batch.column_by_name("doc_id")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.ok_or_else(|| "missing doc_id".to_string())?;
let chunk_text = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.unwrap_or_default();
let source = batch.column_by_name("source")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) });
let chunk_idx = batch.column_by_name("chunk_idx")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) });
let vec_col = batch.column_by_name("vector")
.ok_or_else(|| "no vector column in row fetch".to_string())?;
let fsl = vec_col.as_any().downcast_ref::<FixedSizeListArray>()
.ok_or_else(|| "vector column is not FixedSizeList".to_string())?;
let inner = fsl.value(row);
let floats = inner.as_any().downcast_ref::<Float32Array>()
.ok_or_else(|| "vector inner not Float32".to_string())?;
let mut v = Vec::with_capacity(floats.len());
for i in 0..floats.len() { v.push(floats.value(i)); }
Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx })
}