Four findings deferred from the 2026-05-02 scrum, all 1-5 line fixes: W1 (kimi WARN @ scrum_master_pipeline.ts:1143) — `gemini-3-flash-preview` hardcoded twice in MAP and REDUCE phases. Extracted TREE_SPLIT_MODEL + TREE_SPLIT_PROVIDER constants near the existing config block. Diverging the two would break tree-split coherence (per-shard digests must come from the same model the reducer collapses). W2 (qwen WARN @ providers.toml:30) — stale `kimi-k2:1t` reference in operator-facing comments after PR #13 noted it's upstream-broken. Reframed as historical context ("was X here pre-2026-05-03 — that model is broken") so future operators don't paste-route from the comment. W3 (opus WARN @ vectord-lance/src/lib.rs:622) — temp_path() entropy was only pid+nanos, which collide under tokio scheduling when multiple tests in the same cargo process create temp dirs back-to-back. Added per-process AtomicU64 sequence counter — guarantees uniqueness regardless of clock. W4 (opus INFO @ scripts/lance_smoke.sh:38) — `|| echo '{}'` swallowed curl transport failures (gateway down, network broken, timeout), surfacing as misleading "no method field" jq errors at the next probe. Now captures $? separately, gates a "curl reachable" probe, and only falls back to empty body for the dependent jq parse. Smoke went 9 → 10 probes. Verified: vectord-lance 7/7 tests PASS, gateway cargo check clean, lance_smoke.sh 10/10 PASS against live gateway. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
813 lines
32 KiB
Rust
813 lines
32 KiB
Rust
//! Production Lance vector backend (ADR-019 — hybrid architecture).
|
||
//!
|
||
//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52
|
||
//! / Lance 4 dependency tree. The public API uses only std types
|
||
//! (`Vec<f32>`, `Vec<String>`, `String`, `bool`) so nothing Arrow-shaped
|
||
//! crosses the crate boundary. That keeps vectord (Arrow 55) from
|
||
//! picking up an incompatible dep.
|
||
//!
|
||
//! Responsibilities:
|
||
//! - Migrate an existing binary-blob Parquet vector file into a Lance
|
||
//! dataset with `FixedSizeList<Float32, dims>`. One-time cost.
|
||
//! - Append new rows natively (no full-file rewrite — Lance's structural win).
|
||
//! - Build an IVF_PQ ANN index on the vector column.
|
||
//! - Vector search (`search`) using the IVF_PQ index when present,
|
||
//! falling back to full scan otherwise.
|
||
//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1)
|
||
//! lookup that Parquet-on-object-store can't cheaply do.
|
||
//! - Cheap count + stats introspection.
|
||
|
||
use arrow_array::{
|
||
Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array,
|
||
RecordBatch, RecordBatchIterator, StringArray,
|
||
};
|
||
use arrow_schema::{DataType, Field, Schema};
|
||
use futures::StreamExt;
|
||
use serde::Serialize;
|
||
use std::sync::Arc;
|
||
use std::time::Instant;
|
||
|
||
// ================= Public types =================
|
||
|
||
/// One search result. Mirrors vectord's existing `SearchResult` shape
|
||
/// structurally but carries simpler types so this crate stays firewalled.
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct Hit {
|
||
pub doc_id: String,
|
||
pub chunk_text: String,
|
||
pub score: f32,
|
||
/// Optional — set by search_with_vector, not by index-only search.
|
||
pub distance: Option<f32>,
|
||
}
|
||
|
||
/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector
|
||
/// so callers can do downstream work (rerank, cite, etc.) without a
|
||
/// second round trip.
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct Row {
|
||
pub doc_id: String,
|
||
pub chunk_text: String,
|
||
pub vector: Vec<f32>,
|
||
pub source: Option<String>,
|
||
pub chunk_idx: Option<i32>,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct MigrationStats {
|
||
pub rows_written: usize,
|
||
pub dimensions: usize,
|
||
pub disk_bytes: u64,
|
||
pub duration_secs: f32,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct AppendStats {
|
||
pub rows_appended: usize,
|
||
pub disk_bytes_added: u64,
|
||
pub duration_secs: f32,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct IndexStats {
|
||
pub name: String,
|
||
pub num_partitions: u32,
|
||
pub num_bits: u32,
|
||
pub num_sub_vectors: u32,
|
||
pub build_time_secs: f32,
|
||
pub disk_bytes_added: u64,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct ScalarIndexStats {
|
||
pub name: String,
|
||
pub column: String,
|
||
pub build_time_secs: f32,
|
||
pub disk_bytes_added: u64,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct DatasetStats {
|
||
pub path: String,
|
||
pub rows: usize,
|
||
pub disk_bytes: u64,
|
||
pub has_vector_index: bool,
|
||
pub has_doc_id_index: bool,
|
||
}
|
||
|
||
// ================= The backend =================
|
||
|
||
/// Thin wrapper around a Lance dataset path. Lance handles the heavy
|
||
/// lifting — we just expose a narrow API.
|
||
#[derive(Clone)]
|
||
pub struct LanceVectorStore {
|
||
/// Local filesystem path or object-store URI (file:///..., s3://...).
|
||
/// Lance's internal URI parsing handles both.
|
||
path: String,
|
||
}
|
||
|
||
impl LanceVectorStore {
|
||
pub fn new(path: impl Into<String>) -> Self {
|
||
Self { path: path.into() }
|
||
}
|
||
|
||
pub fn path(&self) -> &str { &self.path }
|
||
|
||
/// Row count via Lance's fast metadata path (no scan).
|
||
pub async fn count(&self) -> Result<usize, String> {
|
||
let dataset = open_or_err(&self.path).await?;
|
||
let n = dataset.count_rows(None).await.map_err(e)?;
|
||
Ok(n)
|
||
}
|
||
|
||
/// True if the on-disk dataset exists AND has at least one `vector`
|
||
/// column index attached.
|
||
pub async fn has_vector_index(&self) -> Result<bool, String> {
|
||
use lance_index::DatasetIndexExt;
|
||
let dataset = match lance::dataset::Dataset::open(&self.path).await {
|
||
Ok(d) => d,
|
||
Err(_) => return Ok(false),
|
||
};
|
||
let indexes = dataset.load_indices().await.map_err(e)?;
|
||
Ok(indexes.iter().any(|ix| {
|
||
ix.fields.iter().any(|fid| {
|
||
dataset.schema().field_by_id(*fid)
|
||
.map(|f| f.name == "vector")
|
||
.unwrap_or(false)
|
||
})
|
||
}))
|
||
}
|
||
|
||
pub async fn stats(&self) -> Result<DatasetStats, String> {
|
||
let rows = self.count().await.unwrap_or(0);
|
||
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let has_vector_index = self.has_vector_index().await.unwrap_or(false);
|
||
let has_doc_id_index = self.has_scalar_index("doc_id").await.unwrap_or(false);
|
||
Ok(DatasetStats {
|
||
path: self.path.clone(),
|
||
rows,
|
||
disk_bytes,
|
||
has_vector_index,
|
||
has_doc_id_index,
|
||
})
|
||
}
|
||
|
||
/// Migrate a vectord-format Parquet file into a Lance dataset.
|
||
///
|
||
/// Input schema (vectord's binary-blob format):
|
||
/// - source : Utf8
|
||
/// - doc_id : Utf8
|
||
/// - chunk_idx : Int32
|
||
/// - chunk_text : Utf8
|
||
/// - vector : Binary (raw f32 little-endian bytes)
|
||
///
|
||
/// Output schema (Lance-friendly):
|
||
/// - source : Utf8
|
||
/// - doc_id : Utf8
|
||
/// - chunk_idx : Int32
|
||
/// - chunk_text : Utf8
|
||
/// - vector : FixedSizeList<Float32, dims>
|
||
///
|
||
/// Idempotent at the file level — if the target exists, it's
|
||
/// overwritten. Caller must manage destination paths.
|
||
pub async fn migrate_from_parquet_bytes(
|
||
&self,
|
||
parquet_bytes: &[u8],
|
||
) -> Result<MigrationStats, String> {
|
||
let t0 = Instant::now();
|
||
let (schema, batches, rows) = read_parquet(parquet_bytes)?;
|
||
let dims = detect_vector_dims(&batches)?;
|
||
let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
|
||
|
||
// Overwrite any prior dataset at this path.
|
||
let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path));
|
||
|
||
write_dataset(&self.path, new_schema, new_batches).await?;
|
||
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
|
||
Ok(MigrationStats {
|
||
rows_written: rows,
|
||
dimensions: dims,
|
||
disk_bytes,
|
||
duration_secs: t0.elapsed().as_secs_f32(),
|
||
})
|
||
}
|
||
|
||
/// Native Lance append — does NOT rewrite existing files. New rows
|
||
/// land as a separate fragment; readers union across fragments at
|
||
/// query time. Contrast: our Parquet path requires rewriting the
|
||
/// entire vector file to add rows.
|
||
pub async fn append(
|
||
&self,
|
||
source: Option<String>,
|
||
doc_ids: Vec<String>,
|
||
chunk_idxs: Vec<i32>,
|
||
chunk_texts: Vec<String>,
|
||
vectors: Vec<Vec<f32>>,
|
||
) -> Result<AppendStats, String> {
|
||
let n = doc_ids.len();
|
||
if n == 0 {
|
||
return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 });
|
||
}
|
||
if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n {
|
||
return Err(format!(
|
||
"append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})",
|
||
chunk_idxs.len(), chunk_texts.len(), vectors.len(),
|
||
));
|
||
}
|
||
let dims = vectors[0].len();
|
||
for (i, v) in vectors.iter().enumerate() {
|
||
if v.len() != dims {
|
||
return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims));
|
||
}
|
||
}
|
||
|
||
let t0 = Instant::now();
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
|
||
let src_arr = StringArray::from(
|
||
(0..n).map(|_| source.clone()).collect::<Vec<_>>()
|
||
);
|
||
let doc_id_arr = StringArray::from(doc_ids);
|
||
let chunk_idx_arr = Int32Array::from(chunk_idxs);
|
||
let chunk_text_arr = StringArray::from(chunk_texts);
|
||
|
||
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
|
||
for v in vectors { flat.extend(v); }
|
||
let values = Float32Array::from(flat);
|
||
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
|
||
let vec_arr = FixedSizeListArray::try_new(
|
||
item_field.clone(), dims as i32, Arc::new(values), None,
|
||
).map_err(e)?;
|
||
|
||
let schema = Arc::new(Schema::new(vec![
|
||
Field::new("source", DataType::Utf8, true),
|
||
Field::new("doc_id", DataType::Utf8, false),
|
||
Field::new("chunk_idx", DataType::Int32, true),
|
||
Field::new("chunk_text", DataType::Utf8, true),
|
||
Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false),
|
||
]));
|
||
|
||
let arrays: Vec<ArrayRef> = vec![
|
||
Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr),
|
||
Arc::new(chunk_text_arr), Arc::new(vec_arr),
|
||
];
|
||
let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?;
|
||
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema);
|
||
|
||
use lance::dataset::{Dataset, WriteMode, WriteParams};
|
||
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
|
||
Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?;
|
||
|
||
Ok(AppendStats {
|
||
rows_appended: n,
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
duration_secs: t0.elapsed().as_secs_f32(),
|
||
})
|
||
}
|
||
|
||
/// Build an IVF_PQ vector index. Replaces any prior index with the
|
||
/// same name. Callers pass explicit params — sensible defaults for
|
||
/// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48.
|
||
pub async fn build_index(
|
||
&self,
|
||
num_partitions: u32,
|
||
num_bits: u32,
|
||
num_sub_vectors: u32,
|
||
) -> Result<IndexStats, String> {
|
||
use lance::dataset::Dataset;
|
||
use lance::index::vector::VectorIndexParams;
|
||
use lance_index::{DatasetIndexExt, IndexType};
|
||
use lance_linalg::distance::MetricType;
|
||
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let t0 = Instant::now();
|
||
|
||
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let params = VectorIndexParams::ivf_pq(
|
||
num_partitions as usize,
|
||
num_bits as u8,
|
||
num_sub_vectors as usize,
|
||
MetricType::Cosine,
|
||
50, // max_iterations — same as bench
|
||
);
|
||
dataset.create_index(
|
||
&["vector"],
|
||
IndexType::Vector,
|
||
Some("vec_idx".into()),
|
||
¶ms,
|
||
true, // replace
|
||
).await.map_err(e)?;
|
||
|
||
Ok(IndexStats {
|
||
name: "vec_idx".into(),
|
||
num_partitions,
|
||
num_bits,
|
||
num_sub_vectors,
|
||
build_time_secs: t0.elapsed().as_secs_f32(),
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
})
|
||
}
|
||
|
||
/// Build a scalar btree index on `doc_id`. Makes `get_by_doc_id`
|
||
/// O(log N) instead of a filter-scan of every fragment. Small index
|
||
/// footprint — a few hundred KB on 100K rows.
|
||
pub async fn build_scalar_index(&self, column: &str) -> Result<ScalarIndexStats, String> {
|
||
use lance::dataset::Dataset;
|
||
use lance_index::{DatasetIndexExt, IndexType};
|
||
use lance_index::scalar::ScalarIndexParams;
|
||
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let t0 = Instant::now();
|
||
|
||
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let idx_name = format!("{column}_btree");
|
||
dataset.create_index(
|
||
&[column],
|
||
IndexType::Scalar,
|
||
Some(idx_name.clone()),
|
||
&ScalarIndexParams::default(),
|
||
true,
|
||
).await.map_err(e)?;
|
||
|
||
Ok(ScalarIndexStats {
|
||
name: idx_name,
|
||
column: column.to_string(),
|
||
build_time_secs: t0.elapsed().as_secs_f32(),
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
})
|
||
}
|
||
|
||
/// True if a scalar index exists on the named column.
|
||
pub async fn has_scalar_index(&self, column: &str) -> Result<bool, String> {
|
||
use lance_index::DatasetIndexExt;
|
||
let dataset = match lance::dataset::Dataset::open(&self.path).await {
|
||
Ok(d) => d,
|
||
Err(_) => return Ok(false),
|
||
};
|
||
let indexes = dataset.load_indices().await.map_err(e)?;
|
||
Ok(indexes.iter().any(|ix| {
|
||
ix.fields.iter().any(|fid| {
|
||
dataset.schema().field_by_id(*fid)
|
||
.map(|f| f.name == column)
|
||
.unwrap_or(false)
|
||
})
|
||
}))
|
||
}
|
||
|
||
/// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ
|
||
/// index if one exists; otherwise does a full scan (slow but
|
||
/// correct — useful during development before index build).
|
||
///
|
||
/// `nprobes` tells Lance how many IVF partitions to probe per query.
|
||
/// Lance's built-in default is **1**, which caps recall well below
|
||
/// what the index is actually capable of. Passing `None` keeps that
|
||
/// default (only sensible when latency trumps recall); for real
|
||
/// workloads set `nprobes` to 5–10% of `num_partitions` (e.g. 20–30
|
||
/// on a 316-partition index).
|
||
///
|
||
/// `refine_factor` re-ranks the PQ-approximate top-k by computing
|
||
/// exact distances on `top_k * refine_factor` candidates, then
|
||
/// trimming to `top_k`. Cheap way to buy back recall lost to product
|
||
/// quantization. `None` skips the re-rank pass.
|
||
pub async fn search(
|
||
&self,
|
||
query: &[f32],
|
||
top_k: usize,
|
||
nprobes: Option<usize>,
|
||
refine_factor: Option<u32>,
|
||
) -> Result<Vec<Hit>, String> {
|
||
use lance::dataset::Dataset;
|
||
|
||
let dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let qarr = Float32Array::from(query.to_vec());
|
||
|
||
let mut scanner = dataset.scan();
|
||
scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?;
|
||
if let Some(n) = nprobes { scanner.nprobes(n); }
|
||
if let Some(f) = refine_factor { scanner.refine(f); }
|
||
scanner.project(&["doc_id", "chunk_text"]).map_err(e)?;
|
||
|
||
let mut stream = scanner.try_into_stream().await.map_err(e)?;
|
||
let mut hits: Vec<Hit> = Vec::with_capacity(top_k);
|
||
|
||
while let Some(batch) = stream.next().await {
|
||
let batch = batch.map_err(e)?;
|
||
let doc_ids = batch.column_by_name("doc_id")
|
||
.ok_or_else(|| "no doc_id column in search result".to_string())?
|
||
.as_any().downcast_ref::<StringArray>()
|
||
.ok_or_else(|| "doc_id is not StringArray".to_string())?;
|
||
let chunk_texts = batch.column_by_name("chunk_text")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
|
||
// Lance tacks on a `_distance` column for nearest() queries.
|
||
let distances = batch.column_by_name("_distance")
|
||
.and_then(|c| c.as_any().downcast_ref::<Float32Array>());
|
||
for row in 0..batch.num_rows() {
|
||
let d = distances.map(|a| a.value(row));
|
||
hits.push(Hit {
|
||
doc_id: doc_ids.value(row).to_string(),
|
||
chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(),
|
||
score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity
|
||
distance: d,
|
||
});
|
||
}
|
||
}
|
||
Ok(hits)
|
||
}
|
||
|
||
/// Fetch one row by doc_id. Implementation: Lance filter-pushdown
|
||
/// scan — O(1) with partition pruning on a proper btree index,
|
||
/// O(N) on vector-only datasets (still far faster than reading
|
||
/// the whole Parquet file). We don't build a scalar index on
|
||
/// doc_id yet; that's a future optimization.
|
||
pub async fn get_by_doc_id(&self, doc_id: &str) -> Result<Option<Row>, String> {
|
||
use lance::dataset::Dataset;
|
||
|
||
let dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''"));
|
||
let mut scanner = dataset.scan();
|
||
scanner.filter(&filter).map_err(e)?;
|
||
scanner.limit(Some(1), None).map_err(e)?;
|
||
let mut stream = scanner.try_into_stream().await.map_err(e)?;
|
||
|
||
while let Some(batch) = stream.next().await {
|
||
let batch = batch.map_err(e)?;
|
||
if batch.num_rows() == 0 { continue; }
|
||
return Ok(Some(row_from_batch(&batch, 0)?));
|
||
}
|
||
Ok(None)
|
||
}
|
||
}
|
||
|
||
// ================= Internal helpers =================
|
||
|
||
fn e<T: std::fmt::Display>(err: T) -> String { err.to_string() }
|
||
|
||
/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for
|
||
/// helpers that only work on local paths (dir_size_bytes, remove_dir_all).
|
||
fn strip_file_uri(uri: &str) -> String {
|
||
uri.strip_prefix("file://").unwrap_or(uri).to_string()
|
||
}
|
||
|
||
async fn open_or_err(path: &str) -> Result<lance::dataset::Dataset, String> {
|
||
lance::dataset::Dataset::open(path).await.map_err(e)
|
||
}
|
||
|
||
fn dir_size_bytes(path: &str) -> u64 {
|
||
fn recurse(p: &std::path::Path) -> u64 {
|
||
let Ok(meta) = std::fs::metadata(p) else { return 0; };
|
||
if meta.is_file() { return meta.len(); }
|
||
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
|
||
entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum()
|
||
}
|
||
recurse(std::path::Path::new(path))
|
||
}
|
||
|
||
fn read_parquet(bytes: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize), String> {
|
||
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes))
|
||
.map_err(e)?;
|
||
let schema = builder.schema().clone();
|
||
let reader = builder.build().map_err(e)?;
|
||
let batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().map_err(e)?;
|
||
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||
Ok((schema, batches, rows))
|
||
}
|
||
|
||
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize, String> {
|
||
for batch in batches {
|
||
let idx = batch.schema().index_of("vector")
|
||
.map_err(|_| "no 'vector' column".to_string())?;
|
||
let col = batch.column(idx);
|
||
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
|
||
for i in 0..binary.len() {
|
||
if !binary.is_null(i) {
|
||
return Ok(binary.value(i).len() / 4);
|
||
}
|
||
}
|
||
} else if let Some(fsl) = col.as_any().downcast_ref::<FixedSizeListArray>() {
|
||
return Ok(fsl.value_length() as usize);
|
||
}
|
||
}
|
||
Err("could not determine vector dimensions".into())
|
||
}
|
||
|
||
fn convert_to_fixed_size_list(
|
||
schema: &Arc<Schema>,
|
||
batches: Vec<RecordBatch>,
|
||
dims: usize,
|
||
) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
|
||
let new_fields: Vec<Arc<Field>> = schema
|
||
.fields()
|
||
.iter()
|
||
.map(|f| {
|
||
if f.name() == "vector" {
|
||
Arc::new(Field::new(
|
||
"vector",
|
||
DataType::FixedSizeList(
|
||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||
dims as i32,
|
||
),
|
||
false,
|
||
))
|
||
} else {
|
||
f.clone()
|
||
}
|
||
})
|
||
.collect();
|
||
let new_schema = Arc::new(Schema::new(new_fields));
|
||
|
||
let mut out = Vec::with_capacity(batches.len());
|
||
for batch in batches {
|
||
let vec_idx = batch.schema().index_of("vector").map_err(e)?;
|
||
let mut new_cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
|
||
for (i, col) in batch.columns().iter().enumerate() {
|
||
if i == vec_idx {
|
||
if let Some(bin) = col.as_any().downcast_ref::<BinaryArray>() {
|
||
new_cols.push(Arc::new(binary_to_fsl(bin, dims)?));
|
||
} else if col.as_any().is::<FixedSizeListArray>() {
|
||
// Already in the right shape — just clone.
|
||
new_cols.push(col.clone());
|
||
} else {
|
||
return Err("vector column is neither Binary nor FixedSizeList".into());
|
||
}
|
||
} else {
|
||
new_cols.push(col.clone());
|
||
}
|
||
}
|
||
out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?);
|
||
}
|
||
Ok((new_schema, out))
|
||
}
|
||
|
||
fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result<FixedSizeListArray, String> {
|
||
let n = bin.len();
|
||
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
|
||
for i in 0..n {
|
||
if bin.is_null(i) {
|
||
flat.extend(std::iter::repeat(0.0).take(dims));
|
||
continue;
|
||
}
|
||
let b = bin.value(i);
|
||
if b.len() != dims * 4 {
|
||
return Err(format!(
|
||
"row {i}: {} bytes vs expected {} ({} × f32)",
|
||
b.len(), dims * 4, dims,
|
||
));
|
||
}
|
||
for c in b.chunks_exact(4) {
|
||
flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]]));
|
||
}
|
||
}
|
||
let values = Float32Array::from(flat);
|
||
let field = Arc::new(Field::new("item", DataType::Float32, true));
|
||
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e)
|
||
}
|
||
|
||
async fn write_dataset(
|
||
path: &str,
|
||
schema: Arc<Schema>,
|
||
batches: Vec<RecordBatch>,
|
||
) -> Result<(), String> {
|
||
use lance::dataset::{Dataset, WriteParams};
|
||
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||
Dataset::write(reader, path, Some(WriteParams::default()))
|
||
.await.map_err(e)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn row_from_batch(batch: &RecordBatch, row: usize) -> Result<Row, String> {
|
||
let doc_id = batch.column_by_name("doc_id")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.map(|a| a.value(row).to_string())
|
||
.ok_or_else(|| "missing doc_id".to_string())?;
|
||
let chunk_text = batch.column_by_name("chunk_text")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.map(|a| a.value(row).to_string())
|
||
.unwrap_or_default();
|
||
let source = batch.column_by_name("source")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) });
|
||
let chunk_idx = batch.column_by_name("chunk_idx")
|
||
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
|
||
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) });
|
||
|
||
let vec_col = batch.column_by_name("vector")
|
||
.ok_or_else(|| "no vector column in row fetch".to_string())?;
|
||
let fsl = vec_col.as_any().downcast_ref::<FixedSizeListArray>()
|
||
.ok_or_else(|| "vector column is not FixedSizeList".to_string())?;
|
||
let inner = fsl.value(row);
|
||
let floats = inner.as_any().downcast_ref::<Float32Array>()
|
||
.ok_or_else(|| "vector inner not Float32".to_string())?;
|
||
let mut v = Vec::with_capacity(floats.len());
|
||
for i in 0..floats.len() { v.push(floats.value(i)); }
|
||
|
||
Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx })
|
||
}
|
||
|
||
// =================== Tests ===================
|
||
//
|
||
// All tests run against a temp directory — never the production
|
||
// data/lance/ tree. Lance reads/writes are async + filesystem-bound,
|
||
// so we use #[tokio::test]. Each test uses a unique per-pid + per-
|
||
// nanosecond temp dir so concurrent runs don't collide and a re-run
|
||
// of a single test doesn't see prior state.
|
||
//
|
||
// Surfaced 2026-05-02 audit: vectord-lance had ZERO tests despite
|
||
// being on the live HTTP path. These are the load-bearing locks for
|
||
// the public API contract.
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
fn temp_path(label: &str) -> String {
|
||
// Per-process atomic counter — guarantees uniqueness regardless
|
||
// of clock resolution or test scheduling. Combined with pid, the
|
||
// result is unique within and across processes for any practical
|
||
// test workload. Nanosecond timestamps were not enough on their
|
||
// own: opus WARN at lib.rs:622 from the 2026-05-02 scrum noted
|
||
// that under tokio scheduling, multiple tests in the same cargo
|
||
// process can hit the same nanos bucket.
|
||
use std::sync::atomic::{AtomicU64, Ordering};
|
||
static COUNTER: AtomicU64 = AtomicU64::new(0);
|
||
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||
let pid = std::process::id();
|
||
let nanos = std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.map(|d| d.subsec_nanos())
|
||
.unwrap_or(0);
|
||
std::env::temp_dir()
|
||
.join(format!("vlance_test_{label}_{pid}_{nanos}_{seq}"))
|
||
.to_string_lossy()
|
||
.to_string()
|
||
}
|
||
|
||
/// Build a minimal in-memory Parquet file matching vectord's
|
||
/// binary-blob schema. Used as input to migrate_from_parquet_bytes.
|
||
fn synth_parquet_bytes(n_rows: usize, dims: usize) -> Vec<u8> {
|
||
use parquet::arrow::ArrowWriter;
|
||
use std::io::Cursor;
|
||
|
||
let schema = Arc::new(Schema::new(vec![
|
||
Field::new("source", DataType::Utf8, true),
|
||
Field::new("doc_id", DataType::Utf8, false),
|
||
Field::new("chunk_idx", DataType::Int32, true),
|
||
Field::new("chunk_text", DataType::Utf8, true),
|
||
Field::new("vector", DataType::Binary, false),
|
||
]));
|
||
|
||
let sources: Vec<Option<&str>> = (0..n_rows).map(|_| Some("test")).collect();
|
||
let doc_ids: Vec<String> = (0..n_rows).map(|i| format!("DOC-{i:04}")).collect();
|
||
let chunk_idxs: Vec<Option<i32>> = (0..n_rows).map(|i| Some(i as i32)).collect();
|
||
let chunk_texts: Vec<String> = (0..n_rows).map(|i| format!("synth chunk {i}")).collect();
|
||
let vectors: Vec<Vec<u8>> = (0..n_rows).map(|i| {
|
||
let v: Vec<f32> = (0..dims).map(|j| (i * dims + j) as f32 * 0.01).collect();
|
||
let mut bytes = Vec::with_capacity(dims * 4);
|
||
for f in v { bytes.extend_from_slice(&f.to_le_bytes()); }
|
||
bytes
|
||
}).collect();
|
||
|
||
let batch = RecordBatch::try_new(schema.clone(), vec![
|
||
Arc::new(StringArray::from(sources)),
|
||
Arc::new(StringArray::from(doc_ids)),
|
||
Arc::new(Int32Array::from(chunk_idxs)),
|
||
Arc::new(StringArray::from(chunk_texts)),
|
||
Arc::new(BinaryArray::from(vectors.iter().map(|v| v.as_slice()).collect::<Vec<_>>())),
|
||
]).expect("synth parquet batch");
|
||
|
||
let mut buf = Cursor::new(Vec::new());
|
||
let mut writer = ArrowWriter::try_new(&mut buf, schema, None).expect("arrow writer");
|
||
writer.write(&batch).expect("write batch");
|
||
writer.close().expect("close writer");
|
||
buf.into_inner()
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn fresh_store_reports_no_state() {
|
||
let path = temp_path("fresh");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
assert_eq!(store.path(), path);
|
||
assert_eq!(store.count().await.unwrap_or(0), 0);
|
||
assert!(!store.has_vector_index().await.unwrap_or(true));
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn migrate_then_count_and_fetch() {
|
||
let path = temp_path("migrate_fetch");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
let bytes = synth_parquet_bytes(8, 4);
|
||
|
||
let stats = store.migrate_from_parquet_bytes(&bytes).await.expect("migrate");
|
||
assert_eq!(stats.rows_written, 8);
|
||
assert_eq!(stats.dimensions, 4);
|
||
assert!(stats.disk_bytes > 0, "lance dataset should occupy disk");
|
||
|
||
assert_eq!(store.count().await.unwrap(), 8);
|
||
|
||
let row = store.get_by_doc_id("DOC-0003").await
|
||
.expect("get_by_doc_id Ok").expect("DOC-0003 exists");
|
||
assert_eq!(row.doc_id, "DOC-0003");
|
||
assert_eq!(row.chunk_text, "synth chunk 3");
|
||
assert_eq!(row.vector.len(), 4);
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
|
||
/// Load-bearing contract: get_by_doc_id distinguishes "dataset
|
||
/// missing" (Err) from "id missing" (Ok(None)) so the HTTP
|
||
/// handler can return 404 without inspecting error strings.
|
||
#[tokio::test]
|
||
async fn get_by_doc_id_missing_returns_none() {
|
||
let path = temp_path("missing_id");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
store.migrate_from_parquet_bytes(&synth_parquet_bytes(4, 4)).await.expect("migrate");
|
||
|
||
let row = store.get_by_doc_id("DOC-NEVER-EXISTS").await.expect("Ok");
|
||
assert!(row.is_none(), "missing id must return Ok(None), not Err");
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
|
||
/// Verifies the load-bearing structural-difference claim of
|
||
/// ADR-019: Lance appends without rewriting the whole file. Row
|
||
/// count grows; new rows are fetchable by their doc_ids.
|
||
#[tokio::test]
|
||
async fn append_grows_count_and_new_rows_fetchable() {
|
||
let path = temp_path("append");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
store.migrate_from_parquet_bytes(&synth_parquet_bytes(4, 4)).await.expect("migrate");
|
||
assert_eq!(store.count().await.unwrap(), 4);
|
||
|
||
let stats = store.append(
|
||
Some("appended".into()),
|
||
vec!["NEW-A".into(), "NEW-B".into()],
|
||
vec![0, 0],
|
||
vec!["new chunk a".into(), "new chunk b".into()],
|
||
vec![vec![0.1, 0.2, 0.3, 0.4], vec![0.5, 0.6, 0.7, 0.8]],
|
||
).await.expect("append");
|
||
|
||
assert_eq!(stats.rows_appended, 2);
|
||
assert_eq!(store.count().await.unwrap(), 6);
|
||
|
||
let new_a = store.get_by_doc_id("NEW-A").await.unwrap().expect("NEW-A");
|
||
assert_eq!(new_a.chunk_text, "new chunk a");
|
||
assert_eq!(new_a.source.as_deref(), Some("appended"));
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
|
||
/// Without this guard a dim-mismatch row would land on disk and
|
||
/// silently break search at query time.
|
||
#[tokio::test]
|
||
async fn append_dim_mismatch_errors() {
|
||
let path = temp_path("dim_mismatch");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
store.migrate_from_parquet_bytes(&synth_parquet_bytes(4, 4)).await.expect("migrate");
|
||
|
||
let err = store.append(
|
||
None, vec!["X".into(), "Y".into()], vec![0, 0],
|
||
vec!["a".into(), "b".into()],
|
||
vec![vec![1.0, 2.0, 3.0, 4.0], vec![1.0, 2.0]],
|
||
).await;
|
||
assert!(err.is_err(), "dim mismatch must error");
|
||
let msg = err.unwrap_err();
|
||
assert!(msg.contains("dim") || msg.contains("expected"),
|
||
"error must mention the dimension problem; got: {msg}");
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
|
||
/// Search round-trip: query the exact vector for one row, top-1
|
||
/// must be that row. Verifies the search path works on small
|
||
/// datasets where IVF training would normally be skipped.
|
||
#[tokio::test]
|
||
async fn search_returns_nearest() {
|
||
let path = temp_path("search");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
store.migrate_from_parquet_bytes(&synth_parquet_bytes(8, 4)).await.expect("migrate");
|
||
|
||
let target: Vec<f32> = (0..4).map(|j| (5 * 4 + j) as f32 * 0.01).collect();
|
||
let hits = store.search(&target, 3, None, None).await.expect("search");
|
||
assert!(!hits.is_empty(), "search must return at least 1 hit");
|
||
assert_eq!(hits[0].doc_id, "DOC-0005",
|
||
"exact-vector match should be top-1; got {hits:?}");
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
|
||
/// stats() summarizes the dataset state in one call. Locks the
|
||
/// field shape so downstream consumers don't break on a rename.
|
||
#[tokio::test]
|
||
async fn stats_reports_post_migrate_state() {
|
||
let path = temp_path("stats");
|
||
let store = LanceVectorStore::new(path.clone());
|
||
store.migrate_from_parquet_bytes(&synth_parquet_bytes(5, 4)).await.expect("migrate");
|
||
|
||
let s = store.stats().await.expect("stats");
|
||
assert_eq!(s.rows, 5);
|
||
assert!(s.disk_bytes > 0);
|
||
assert!(!s.has_vector_index, "no vector index built yet");
|
||
|
||
let _ = std::fs::remove_dir_all(&path);
|
||
}
|
||
}
|