root 59e72fa566 Scalar btree index on doc_id + auto-build during Lance activation
LanceVectorStore gains build_scalar_index(column) and
has_scalar_index(column). Exposed as POST /vectors/lance/scalar-index/
{index}/{column}. activate_profile auto-builds the doc_id btree
alongside the IVF_PQ vector index when activating a Lance-backed
profile — operators get both indexes without extra API calls.

stats() now reports has_doc_id_index alongside has_vector_index.

Measured on resumes_100k_v2 (100K × 768d): random doc_id fetch
improved from ~5.4ms to ~3.5ms (35% faster). Btree build: 19ms,
+2.7 MB on disk. The remaining ~3ms is vector column materialization,
not index lookup — to close further would need a projection-only
fetch that skips the 768-float vector for text-only RAG retrieval.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 20:49:17 -05:00

586 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Production Lance vector backend (ADR-019 — hybrid architecture).
//!
//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52
//! / Lance 4 dependency tree. The public API uses only std types
//! (`Vec<f32>`, `Vec<String>`, `String`, `bool`) so nothing Arrow-shaped
//! crosses the crate boundary. That keeps vectord (Arrow 55) from
//! picking up an incompatible dep.
//!
//! Responsibilities:
//! - Migrate an existing binary-blob Parquet vector file into a Lance
//! dataset with `FixedSizeList<Float32, dims>`. One-time cost.
//! - Append new rows natively (no full-file rewrite — Lance's structural win).
//! - Build an IVF_PQ ANN index on the vector column.
//! - Vector search (`search`) using the IVF_PQ index when present,
//! falling back to full scan otherwise.
//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1)
//! lookup that Parquet-on-object-store can't cheaply do.
//! - Cheap count + stats introspection.
use arrow_array::{
Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array,
RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use futures::StreamExt;
use serde::Serialize;
use std::sync::Arc;
use std::time::Instant;
// ================= Public types =================
/// One search result. Mirrors vectord's existing `SearchResult` shape
/// structurally but carries simpler types so this crate stays firewalled.
#[derive(Debug, Clone, Serialize)]
pub struct Hit {
pub doc_id: String,
pub chunk_text: String,
pub score: f32,
/// Optional — set by search_with_vector, not by index-only search.
pub distance: Option<f32>,
}
/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector
/// so callers can do downstream work (rerank, cite, etc.) without a
/// second round trip.
#[derive(Debug, Clone, Serialize)]
pub struct Row {
pub doc_id: String,
pub chunk_text: String,
pub vector: Vec<f32>,
pub source: Option<String>,
pub chunk_idx: Option<i32>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MigrationStats {
pub rows_written: usize,
pub dimensions: usize,
pub disk_bytes: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct AppendStats {
pub rows_appended: usize,
pub disk_bytes_added: u64,
pub duration_secs: f32,
}
#[derive(Debug, Clone, Serialize)]
pub struct IndexStats {
pub name: String,
pub num_partitions: u32,
pub num_bits: u32,
pub num_sub_vectors: u32,
pub build_time_secs: f32,
pub disk_bytes_added: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct ScalarIndexStats {
pub name: String,
pub column: String,
pub build_time_secs: f32,
pub disk_bytes_added: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DatasetStats {
pub path: String,
pub rows: usize,
pub disk_bytes: u64,
pub has_vector_index: bool,
pub has_doc_id_index: bool,
}
// ================= The backend =================
/// Thin wrapper around a Lance dataset path. Lance handles the heavy
/// lifting — we just expose a narrow API.
#[derive(Clone)]
pub struct LanceVectorStore {
/// Local filesystem path or object-store URI (file:///..., s3://...).
/// Lance's internal URI parsing handles both.
path: String,
}
impl LanceVectorStore {
pub fn new(path: impl Into<String>) -> Self {
Self { path: path.into() }
}
pub fn path(&self) -> &str { &self.path }
/// Row count via Lance's fast metadata path (no scan).
pub async fn count(&self) -> Result<usize, String> {
let dataset = open_or_err(&self.path).await?;
let n = dataset.count_rows(None).await.map_err(e)?;
Ok(n)
}
/// True if the on-disk dataset exists AND has at least one `vector`
/// column index attached.
pub async fn has_vector_index(&self) -> Result<bool, String> {
use lance_index::DatasetIndexExt;
let dataset = match lance::dataset::Dataset::open(&self.path).await {
Ok(d) => d,
Err(_) => return Ok(false),
};
let indexes = dataset.load_indices().await.map_err(e)?;
Ok(indexes.iter().any(|ix| {
ix.fields.iter().any(|fid| {
dataset.schema().field_by_id(*fid)
.map(|f| f.name == "vector")
.unwrap_or(false)
})
}))
}
pub async fn stats(&self) -> Result<DatasetStats, String> {
let rows = self.count().await.unwrap_or(0);
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let has_vector_index = self.has_vector_index().await.unwrap_or(false);
let has_doc_id_index = self.has_scalar_index("doc_id").await.unwrap_or(false);
Ok(DatasetStats {
path: self.path.clone(),
rows,
disk_bytes,
has_vector_index,
has_doc_id_index,
})
}
/// Migrate a vectord-format Parquet file into a Lance dataset.
///
/// Input schema (vectord's binary-blob format):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : Binary (raw f32 little-endian bytes)
///
/// Output schema (Lance-friendly):
/// - source : Utf8
/// - doc_id : Utf8
/// - chunk_idx : Int32
/// - chunk_text : Utf8
/// - vector : FixedSizeList<Float32, dims>
///
/// Idempotent at the file level — if the target exists, it's
/// overwritten. Caller must manage destination paths.
pub async fn migrate_from_parquet_bytes(
&self,
parquet_bytes: &[u8],
) -> Result<MigrationStats, String> {
let t0 = Instant::now();
let (schema, batches, rows) = read_parquet(parquet_bytes)?;
let dims = detect_vector_dims(&batches)?;
let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
// Overwrite any prior dataset at this path.
let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path));
write_dataset(&self.path, new_schema, new_batches).await?;
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
Ok(MigrationStats {
rows_written: rows,
dimensions: dims,
disk_bytes,
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Native Lance append — does NOT rewrite existing files. New rows
/// land as a separate fragment; readers union across fragments at
/// query time. Contrast: our Parquet path requires rewriting the
/// entire vector file to add rows.
pub async fn append(
&self,
source: Option<String>,
doc_ids: Vec<String>,
chunk_idxs: Vec<i32>,
chunk_texts: Vec<String>,
vectors: Vec<Vec<f32>>,
) -> Result<AppendStats, String> {
let n = doc_ids.len();
if n == 0 {
return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 });
}
if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n {
return Err(format!(
"append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})",
chunk_idxs.len(), chunk_texts.len(), vectors.len(),
));
}
let dims = vectors[0].len();
for (i, v) in vectors.iter().enumerate() {
if v.len() != dims {
return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims));
}
}
let t0 = Instant::now();
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let src_arr = StringArray::from(
(0..n).map(|_| source.clone()).collect::<Vec<_>>()
);
let doc_id_arr = StringArray::from(doc_ids);
let chunk_idx_arr = Int32Array::from(chunk_idxs);
let chunk_text_arr = StringArray::from(chunk_texts);
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for v in vectors { flat.extend(v); }
let values = Float32Array::from(flat);
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
let vec_arr = FixedSizeListArray::try_new(
item_field.clone(), dims as i32, Arc::new(values), None,
).map_err(e)?;
let schema = Arc::new(Schema::new(vec![
Field::new("source", DataType::Utf8, true),
Field::new("doc_id", DataType::Utf8, false),
Field::new("chunk_idx", DataType::Int32, true),
Field::new("chunk_text", DataType::Utf8, true),
Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false),
]));
let arrays: Vec<ArrayRef> = vec![
Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr),
Arc::new(chunk_text_arr), Arc::new(vec_arr),
];
let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema);
use lance::dataset::{Dataset, WriteMode, WriteParams};
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?;
Ok(AppendStats {
rows_appended: n,
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
duration_secs: t0.elapsed().as_secs_f32(),
})
}
/// Build an IVF_PQ vector index. Replaces any prior index with the
/// same name. Callers pass explicit params — sensible defaults for
/// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48.
pub async fn build_index(
&self,
num_partitions: u32,
num_bits: u32,
num_sub_vectors: u32,
) -> Result<IndexStats, String> {
use lance::dataset::Dataset;
use lance::index::vector::VectorIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let t0 = Instant::now();
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
let params = VectorIndexParams::ivf_pq(
num_partitions as usize,
num_bits as u8,
num_sub_vectors as usize,
MetricType::Cosine,
50, // max_iterations — same as bench
);
dataset.create_index(
&["vector"],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true, // replace
).await.map_err(e)?;
Ok(IndexStats {
name: "vec_idx".into(),
num_partitions,
num_bits,
num_sub_vectors,
build_time_secs: t0.elapsed().as_secs_f32(),
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
})
}
/// Build a scalar btree index on `doc_id`. Makes `get_by_doc_id`
/// O(log N) instead of a filter-scan of every fragment. Small index
/// footprint — a few hundred KB on 100K rows.
pub async fn build_scalar_index(&self, column: &str) -> Result<ScalarIndexStats, String> {
use lance::dataset::Dataset;
use lance_index::{DatasetIndexExt, IndexType};
use lance_index::scalar::ScalarIndexParams;
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
let t0 = Instant::now();
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
let idx_name = format!("{column}_btree");
dataset.create_index(
&[column],
IndexType::Scalar,
Some(idx_name.clone()),
&ScalarIndexParams::default(),
true,
).await.map_err(e)?;
Ok(ScalarIndexStats {
name: idx_name,
column: column.to_string(),
build_time_secs: t0.elapsed().as_secs_f32(),
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
})
}
/// True if a scalar index exists on the named column.
pub async fn has_scalar_index(&self, column: &str) -> Result<bool, String> {
use lance_index::DatasetIndexExt;
let dataset = match lance::dataset::Dataset::open(&self.path).await {
Ok(d) => d,
Err(_) => return Ok(false),
};
let indexes = dataset.load_indices().await.map_err(e)?;
Ok(indexes.iter().any(|ix| {
ix.fields.iter().any(|fid| {
dataset.schema().field_by_id(*fid)
.map(|f| f.name == column)
.unwrap_or(false)
})
}))
}
/// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ
/// index if one exists; otherwise does a full scan (slow but
/// correct — useful during development before index build).
pub async fn search(&self, query: &[f32], top_k: usize) -> Result<Vec<Hit>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let qarr = Float32Array::from(query.to_vec());
let mut scanner = dataset.scan();
scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?;
scanner.project(&["doc_id", "chunk_text"]).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
let mut hits: Vec<Hit> = Vec::with_capacity(top_k);
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
let doc_ids = batch.column_by_name("doc_id")
.ok_or_else(|| "no doc_id column in search result".to_string())?
.as_any().downcast_ref::<StringArray>()
.ok_or_else(|| "doc_id is not StringArray".to_string())?;
let chunk_texts = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
// Lance tacks on a `_distance` column for nearest() queries.
let distances = batch.column_by_name("_distance")
.and_then(|c| c.as_any().downcast_ref::<Float32Array>());
for row in 0..batch.num_rows() {
let d = distances.map(|a| a.value(row));
hits.push(Hit {
doc_id: doc_ids.value(row).to_string(),
chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(),
score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity
distance: d,
});
}
}
Ok(hits)
}
/// Fetch one row by doc_id. Implementation: Lance filter-pushdown
/// scan — O(1) with partition pruning on a proper btree index,
/// O(N) on vector-only datasets (still far faster than reading
/// the whole Parquet file). We don't build a scalar index on
/// doc_id yet; that's a future optimization.
pub async fn get_by_doc_id(&self, doc_id: &str) -> Result<Option<Row>, String> {
use lance::dataset::Dataset;
let dataset = Dataset::open(&self.path).await.map_err(e)?;
let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''"));
let mut scanner = dataset.scan();
scanner.filter(&filter).map_err(e)?;
scanner.limit(Some(1), None).map_err(e)?;
let mut stream = scanner.try_into_stream().await.map_err(e)?;
while let Some(batch) = stream.next().await {
let batch = batch.map_err(e)?;
if batch.num_rows() == 0 { continue; }
return Ok(Some(row_from_batch(&batch, 0)?));
}
Ok(None)
}
}
// ================= Internal helpers =================
fn e<T: std::fmt::Display>(err: T) -> String { err.to_string() }
/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for
/// helpers that only work on local paths (dir_size_bytes, remove_dir_all).
fn strip_file_uri(uri: &str) -> String {
uri.strip_prefix("file://").unwrap_or(uri).to_string()
}
async fn open_or_err(path: &str) -> Result<lance::dataset::Dataset, String> {
lance::dataset::Dataset::open(path).await.map_err(e)
}
fn dir_size_bytes(path: &str) -> u64 {
fn recurse(p: &std::path::Path) -> u64 {
let Ok(meta) = std::fs::metadata(p) else { return 0; };
if meta.is_file() { return meta.len(); }
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum()
}
recurse(std::path::Path::new(path))
}
fn read_parquet(bytes: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize), String> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes))
.map_err(e)?;
let schema = builder.schema().clone();
let reader = builder.build().map_err(e)?;
let batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().map_err(e)?;
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
Ok((schema, batches, rows))
}
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize, String> {
for batch in batches {
let idx = batch.schema().index_of("vector")
.map_err(|_| "no 'vector' column".to_string())?;
let col = batch.column(idx);
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
for i in 0..binary.len() {
if !binary.is_null(i) {
return Ok(binary.value(i).len() / 4);
}
}
} else if let Some(fsl) = col.as_any().downcast_ref::<FixedSizeListArray>() {
return Ok(fsl.value_length() as usize);
}
}
Err("could not determine vector dimensions".into())
}
fn convert_to_fixed_size_list(
schema: &Arc<Schema>,
batches: Vec<RecordBatch>,
dims: usize,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let new_fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.map(|f| {
if f.name() == "vector" {
Arc::new(Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dims as i32,
),
false,
))
} else {
f.clone()
}
})
.collect();
let new_schema = Arc::new(Schema::new(new_fields));
let mut out = Vec::with_capacity(batches.len());
for batch in batches {
let vec_idx = batch.schema().index_of("vector").map_err(e)?;
let mut new_cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
for (i, col) in batch.columns().iter().enumerate() {
if i == vec_idx {
if let Some(bin) = col.as_any().downcast_ref::<BinaryArray>() {
new_cols.push(Arc::new(binary_to_fsl(bin, dims)?));
} else if col.as_any().is::<FixedSizeListArray>() {
// Already in the right shape — just clone.
new_cols.push(col.clone());
} else {
return Err("vector column is neither Binary nor FixedSizeList".into());
}
} else {
new_cols.push(col.clone());
}
}
out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?);
}
Ok((new_schema, out))
}
fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result<FixedSizeListArray, String> {
let n = bin.len();
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
for i in 0..n {
if bin.is_null(i) {
flat.extend(std::iter::repeat(0.0).take(dims));
continue;
}
let b = bin.value(i);
if b.len() != dims * 4 {
return Err(format!(
"row {i}: {} bytes vs expected {} ({} × f32)",
b.len(), dims * 4, dims,
));
}
for c in b.chunks_exact(4) {
flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]]));
}
}
let values = Float32Array::from(flat);
let field = Arc::new(Field::new("item", DataType::Float32, true));
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e)
}
async fn write_dataset(
path: &str,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Result<(), String> {
use lance::dataset::{Dataset, WriteParams};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Dataset::write(reader, path, Some(WriteParams::default()))
.await.map_err(e)?;
Ok(())
}
fn row_from_batch(batch: &RecordBatch, row: usize) -> Result<Row, String> {
let doc_id = batch.column_by_name("doc_id")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.ok_or_else(|| "missing doc_id".to_string())?;
let chunk_text = batch.column_by_name("chunk_text")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.map(|a| a.value(row).to_string())
.unwrap_or_default();
let source = batch.column_by_name("source")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) });
let chunk_idx = batch.column_by_name("chunk_idx")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) });
let vec_col = batch.column_by_name("vector")
.ok_or_else(|| "no vector column in row fetch".to_string())?;
let fsl = vec_col.as_any().downcast_ref::<FixedSizeListArray>()
.ok_or_else(|| "vector column is not FixedSizeList".to_string())?;
let inner = fsl.value(row);
let floats = inner.as_any().downcast_ref::<Float32Array>()
.ok_or_else(|| "vector inner not Float32".to_string())?;
let mut v = Vec::with_capacity(floats.len());
for i in 0..floats.len() { v.push(floats.value(i)); }
Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx })
}