//! Phase B: Lance pilot benchmark. //! //! Standalone binary that compares Lance vector storage against our //! Parquet-with-binary-blob + in-RAM HNSW approach. See //! docs/EXECUTION_PLAN.md for the decision rules this fuels. //! //! Inputs: //! data/vectors/resumes_100k_v2.parquet — existing 100K × 768d embeddings //! //! Output: //! A JSON report printed to stdout with measurements for: //! - Cold load time (parquet → arrow) vs Lance open + scan //! - Disk size //! - Vector search latency (p50 / p95 / p99) //! - Single-row random access //! - Append cost (adding 10K rows) //! //! Usage: //! cargo run --bin lance-bench -- \ //! --parquet data/vectors/resumes_100k_v2.parquet \ //! --lance-out /tmp/lance_resumes_100k_v2 \ //! --json-out /tmp/lance_bench.json use anyhow::{Context, Result}; use arrow_array::{Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, Schema}; use serde::Serialize; use std::sync::Arc; use std::time::Instant; #[derive(Debug, Serialize)] struct BenchReport { vectors: usize, dimensions: usize, parquet_path: String, lance_path: String, // Parquet baseline parquet_disk_bytes: u64, parquet_cold_load_secs: f32, // Lance numbers lance_write_secs: f32, lance_disk_bytes: u64, lance_cold_open_secs: f32, // Index + search lance_index_build_secs: Option, lance_index_disk_bytes: Option, lance_search_p50_us: Option, lance_search_p95_us: Option, lance_search_p99_us: Option, // Architectural features Parquet+sidecar can't cheaply do lance_random_row_access_us: Option, // fetch one row by row_id parquet_random_row_access_us: Option, // for comparison — full scan cost lance_append_10k_secs: Option, // add 10K new rows lance_append_disk_bytes_added: Option, // Head-to-head reference (from our own measurements) reference_hnsw_p50_us: f32, reference_hnsw_p95_us: f32, reference_brute_force_us: f32, reference_hnsw_build_secs: f32, } #[tokio::main] async fn main() -> Result<()> { // Simple positional args: parquet_in, lance_out. let args: Vec = std::env::args().collect(); let parquet_path = args .get(1) .cloned() .unwrap_or_else(|| "data/vectors/resumes_100k_v2.parquet".to_string()); let lance_path = args .get(2) .cloned() .unwrap_or_else(|| "/tmp/lance_bench_dataset".to_string()); eprintln!("=== Phase B Lance pilot ==="); eprintln!("input parquet: {}", parquet_path); eprintln!("output lance: {}", lance_path); // --- 1. Cold-load the existing Parquet vector index into memory eprintln!("\n[1/4] reading Parquet baseline..."); let t0 = Instant::now(); let (schema, batches, total_rows) = read_parquet_vectors(&parquet_path) .context("read parquet")?; let parquet_cold_load_secs = t0.elapsed().as_secs_f32(); let parquet_disk_bytes = std::fs::metadata(&parquet_path)?.len(); let dims = detect_vector_dims(&batches)?; eprintln!( " loaded {} rows, {} columns, vectors={}d, disk={:.1} MB, cold load={:.2}s", total_rows, schema.fields().len(), dims, parquet_disk_bytes as f64 / 1_000_000.0, parquet_cold_load_secs, ); // --- 2. Convert from binary-blob-of-f32 to Lance's FixedSizeList eprintln!("\n[2/4] converting binary-blob vectors to Arrow FixedSizeList..."); let t0 = Instant::now(); let (lance_schema, lance_batches) = convert_to_fixed_size_list(&schema, batches, dims)?; eprintln!(" conversion took {:.2}s", t0.elapsed().as_secs_f32()); // --- 3. Write as Lance dataset eprintln!("\n[3/4] writing Lance dataset..."); let t0 = Instant::now(); // Clean up any prior run let _ = std::fs::remove_dir_all(&lance_path); write_lance_dataset(&lance_path, lance_schema.clone(), lance_batches).await?; let lance_write_secs = t0.elapsed().as_secs_f32(); let lance_disk_bytes = dir_size_bytes(&lance_path); eprintln!( " write took {:.2}s, disk={:.1} MB", lance_write_secs, lance_disk_bytes as f64 / 1_000_000.0, ); // --- 4. Cold open + scan the Lance dataset eprintln!("\n[4/6] cold-opening Lance dataset..."); let t0 = Instant::now(); let scanned_rows = cold_open_and_scan_lance(&lance_path).await?; let lance_cold_open_secs = t0.elapsed().as_secs_f32(); eprintln!( " open + full scan: {} rows in {:.2}s", scanned_rows, lance_cold_open_secs, ); // --- 5. Build a vector index on the Lance dataset eprintln!("\n[5/6] building Lance vector index (IVF_PQ)..."); let t0 = Instant::now(); let index_built = build_lance_vector_index(&lance_path, dims).await; let (lance_index_build_secs, lance_index_disk_bytes) = match index_built { Ok(()) => { let secs = t0.elapsed().as_secs_f32(); let disk = dir_size_bytes(&lance_path) - lance_disk_bytes; eprintln!(" built in {:.2}s, index adds {:.1} MB on disk", secs, disk as f64 / 1e6); (Some(secs), Some(disk)) } Err(e) => { eprintln!(" index build failed: {e:#}"); (None, None) } }; // --- 6. Run search queries, measure latency eprintln!("\n[6/6] running vector search benchmarks..."); let search_stats = if lance_index_build_secs.is_some() { run_search_benchmarks(&lance_path, dims).await.ok() } else { None }; let (lance_search_p50, lance_search_p95, lance_search_p99) = match search_stats { Some((p50, p95, p99)) => { eprintln!(" p50={:.0}us p95={:.0}us p99={:.0}us", p50, p95, p99); (Some(p50), Some(p95), Some(p99)) } None => (None, None, None), }; // --- Random access comparison eprintln!("\n[7/8] random row access — Lance vs full-scan Parquet..."); let lance_random = measure_random_access_lance(&lance_path).await.ok(); let parquet_random = measure_random_access_parquet(&parquet_path).ok(); if let Some(us) = lance_random { eprintln!(" Lance random-fetch avg: {:.0}us", us); } if let Some(us) = parquet_random { eprintln!(" Parquet full-scan-to-row avg: {:.0}us", us); } // --- Append cost eprintln!("\n[8/8] append 10K new rows to existing dataset..."); let t0 = Instant::now(); let pre_append_bytes = dir_size_bytes(&lance_path); let append_result = append_10k_rows(&lance_path, dims).await; let (lance_append_secs, lance_append_bytes) = match append_result { Ok(()) => { let secs = t0.elapsed().as_secs_f32(); let bytes = dir_size_bytes(&lance_path).saturating_sub(pre_append_bytes); eprintln!(" append took {:.2}s, added {:.1} MB", secs, bytes as f64 / 1e6); (Some(secs), Some(bytes)) } Err(e) => { eprintln!(" append failed: {e:#}"); (None, None) } }; // --- Report let report = BenchReport { vectors: total_rows, dimensions: dims, parquet_path: parquet_path.clone(), lance_path: lance_path.clone(), parquet_disk_bytes, parquet_cold_load_secs, lance_write_secs, lance_disk_bytes, lance_cold_open_secs, lance_index_build_secs, lance_index_disk_bytes, lance_search_p50_us: lance_search_p50, lance_search_p95_us: lance_search_p95, lance_search_p99_us: lance_search_p99, lance_random_row_access_us: lance_random, parquet_random_row_access_us: parquet_random, lance_append_10k_secs: lance_append_secs, lance_append_disk_bytes_added: lance_append_bytes, // From our Phase 15 trial on the SAME index (ec=80 es=30, recall=1.00): reference_hnsw_p50_us: 873.0, reference_hnsw_p95_us: 1413.0, reference_brute_force_us: 43983.0, reference_hnsw_build_secs: 230.0, }; let json = serde_json::to_string_pretty(&report)?; println!("{}", json); eprintln!("\n=== Summary ==="); eprintln!(" Parquet cold load: {:.2}s", report.parquet_cold_load_secs); eprintln!(" Lance cold open: {:.2}s ({})", report.lance_cold_open_secs, format_ratio(report.parquet_cold_load_secs, report.lance_cold_open_secs)); eprintln!(" Parquet disk: {:.1} MB", report.parquet_disk_bytes as f64 / 1e6); eprintln!(" Lance disk: {:.1} MB ({})", report.lance_disk_bytes as f64 / 1e6, format_ratio(report.parquet_disk_bytes as f32, report.lance_disk_bytes as f32)); if let (Some(p50), Some(p95)) = (report.lance_search_p50_us, report.lance_search_p95_us) { eprintln!(" Lance search p50: {:.0}us vs our HNSW {:.0}us ({})", p50, report.reference_hnsw_p50_us, format_ratio(report.reference_hnsw_p50_us, p50)); eprintln!(" Lance search p95: {:.0}us vs our HNSW {:.0}us ({})", p95, report.reference_hnsw_p95_us, format_ratio(report.reference_hnsw_p95_us, p95)); eprintln!(" Speedup vs brute force: {:.1}× (Lance) vs {:.1}× (HNSW)", report.reference_brute_force_us / p50, report.reference_brute_force_us / report.reference_hnsw_p50_us); } if let Some(build) = report.lance_index_build_secs { eprintln!(" Index build: {:.1}s (Lance IVF_PQ) vs {:.0}s (our HNSW ec=80) ({}× faster)", build, report.reference_hnsw_build_secs, report.reference_hnsw_build_secs / build); } if let (Some(lance_us), Some(parquet_us)) = (report.lance_random_row_access_us, report.parquet_random_row_access_us) { eprintln!(" Random row access: {:.0}us (Lance) vs {:.0}us (Parquet scan) ({})", lance_us, parquet_us, format_ratio(parquet_us, lance_us)); } if let Some(append_secs) = report.lance_append_10k_secs { eprintln!(" Append 10K rows: {:.2}s (Lance native) [Parquet would require full rewrite]", append_secs); } Ok(()) } fn format_ratio(baseline: f32, candidate: f32) -> String { if candidate == 0.0 { return "inf".into(); } let ratio = baseline / candidate; if ratio >= 1.0 { format!("{:.2}× faster/smaller", ratio) } else { format!("{:.2}× slower/larger", 1.0 / ratio) } } fn dir_size_bytes(path: &str) -> u64 { fn recurse(p: &std::path::Path) -> u64 { let Ok(meta) = std::fs::metadata(p) else { return 0; }; if meta.is_file() { return meta.len(); } let Ok(entries) = std::fs::read_dir(p) else { return 0; }; entries .filter_map(|e| e.ok()) .map(|e| recurse(&e.path())) .sum() } recurse(std::path::Path::new(path)) } /// Read the existing vector Parquet (binary-blob format: source, doc_id, /// chunk_idx, chunk_text, vector as Binary bytes). fn read_parquet_vectors(path: &str) -> Result<(Arc, Vec, usize)> { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use std::fs::File; let file = File::open(path).with_context(|| format!("open {path}"))?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; let schema = builder.schema().clone(); let reader = builder.build()?; let batches: Vec = reader.collect::, _>>()?; let rows: usize = batches.iter().map(|b| b.num_rows()).sum(); Ok((schema, batches, rows)) } fn detect_vector_dims(batches: &[RecordBatch]) -> Result { for batch in batches { let vector_col_idx = batch .schema() .index_of("vector") .context("no 'vector' column in parquet")?; let col = batch.column(vector_col_idx); if let Some(binary) = col.as_any().downcast_ref::() { for i in 0..binary.len() { if !binary.is_null(i) { let bytes = binary.value(i); return Ok(bytes.len() / 4); // f32 = 4 bytes } } } } anyhow::bail!("could not determine vector dimensions") } /// Convert our binary-blob vector representation into Arrow's native /// FixedSizeList — that's what Lance expects for vector columns. fn convert_to_fixed_size_list( schema: &Arc, batches: Vec, dims: usize, ) -> Result<(Arc, Vec)> { // New schema keeps everything identical but replaces the vector column // with a FixedSizeList. let new_fields: Vec> = schema .fields() .iter() .map(|f| { if f.name() == "vector" { Arc::new(Field::new( "vector", DataType::FixedSizeList( Arc::new(Field::new("item", DataType::Float32, true)), dims as i32, ), false, )) } else { f.clone() } }) .collect(); let new_schema = Arc::new(Schema::new(new_fields)); let mut new_batches = Vec::with_capacity(batches.len()); for batch in batches { let vector_idx = batch.schema().index_of("vector")?; let mut new_arrays: Vec = Vec::with_capacity(batch.num_columns()); for (i, col) in batch.columns().iter().enumerate() { if i == vector_idx { let binary = col .as_any() .downcast_ref::() .context("vector column must be Binary")?; let fsl = binary_to_fixed_size_list(binary, dims)?; new_arrays.push(Arc::new(fsl)); } else { new_arrays.push(col.clone()); } } new_batches.push(RecordBatch::try_new(new_schema.clone(), new_arrays)?); } Ok((new_schema, new_batches)) } fn binary_to_fixed_size_list(binary: &BinaryArray, dims: usize) -> Result { let n = binary.len(); let mut all_floats: Vec = Vec::with_capacity(n * dims); for i in 0..n { if binary.is_null(i) { all_floats.extend(std::iter::repeat(0.0).take(dims)); continue; } let bytes = binary.value(i); if bytes.len() != dims * 4 { anyhow::bail!( "row {} has {} bytes, expected {} ({} × f32)", i, bytes.len(), dims * 4, dims, ); } for chunk in bytes.chunks_exact(4) { all_floats.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])); } } let values = Float32Array::from(all_floats); let field = Arc::new(Field::new("item", DataType::Float32, true)); FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None) .context("build FixedSizeListArray") } /// Write batches into a Lance dataset at the given path. async fn write_lance_dataset( path: &str, schema: Arc, batches: Vec, ) -> Result<()> { use lance::dataset::{Dataset, WriteParams}; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); Dataset::write(reader, path, Some(WriteParams::default())) .await .context("Dataset::write")?; Ok(()) } /// Open a Lance dataset cold (from disk) and scan it fully — measuring the /// equivalent of our "load embeddings from Parquet" cost. async fn cold_open_and_scan_lance(path: &str) -> Result { use futures::StreamExt; use lance::dataset::Dataset; let dataset = Dataset::open(path).await.context("Dataset::open")?; let scanner = dataset.scan(); let mut stream = scanner.try_into_stream().await?; let mut total = 0usize; while let Some(batch) = stream.next().await { let batch = batch?; total += batch.num_rows(); } Ok(total) } /// Build an IVF_PQ vector index on the `vector` column. IVF_PQ (Inverted File /// with Product Quantization) is Lance's native ANN index — comparable to /// HNSW in intent, but on-disk and compatible with Lance's random-access /// model. async fn build_lance_vector_index(path: &str, _dims: usize) -> Result<()> { use lance::dataset::Dataset; use lance::index::vector::VectorIndexParams; use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; let mut dataset = Dataset::open(path).await?; // IVF_PQ with ~sqrt(N) partitions is a reasonable default for 100K. // num_sub_vectors must divide dims evenly: 768/48 = 16 dims per subvector. // num_bits = 8 gives 256 codes per subvector (good recall/size trade). // max_iterations = 50 is plenty for this scale. let params = VectorIndexParams::ivf_pq( 316, // num_partitions (~sqrt(100000)) 8, // num_bits 48, // num_sub_vectors MetricType::Cosine, 50, // max_iterations ); dataset .create_index( &["vector"], IndexType::Vector, Some("vec_idx".into()), ¶ms, true, ) .await .context("create_index")?; Ok(()) } /// Run N vector searches against the Lance dataset and return (p50, p95, p99) latencies in us. /// Uses a handful of random rows as queries — same pattern as our harness::synthetic_from_chunks. async fn run_search_benchmarks(path: &str, _dims: usize) -> Result<(f32, f32, f32)> { use futures::StreamExt; use lance::dataset::Dataset; let dataset = Dataset::open(path).await?; // Pick 20 representative query vectors from the data itself. // (Synthetic — same pattern as our existing harness.) let query_vectors = sample_query_vectors(&dataset, 20).await?; let mut latencies_us: Vec = Vec::with_capacity(query_vectors.len()); for (i, qv) in query_vectors.iter().enumerate() { let qarr = Arc::new(Float32Array::from(qv.clone())) as ArrayRef; let t0 = Instant::now(); let mut scanner = dataset.scan(); scanner .nearest("vector", qarr.as_any().downcast_ref::().unwrap(), 10) .context("scanner.nearest")?; let mut stream = scanner.try_into_stream().await?; let mut hits = 0; while let Some(batch) = stream.next().await { let batch = batch?; hits += batch.num_rows(); } let us = t0.elapsed().as_micros() as f32; latencies_us.push(us); if i == 0 { eprintln!(" first query: {} hits in {:.0}us (includes any lazy init)", hits, us); } } latencies_us.sort_by(|a, b| a.partial_cmp(b).unwrap()); let p = |pct: f32| -> f32 { let idx = ((latencies_us.len() as f32 - 1.0) * pct).round() as usize; latencies_us[idx.min(latencies_us.len() - 1)] }; Ok((p(0.50), p(0.95), p(0.99))) } /// Random row access via Lance's `take` — fetch 20 random rows by index, measure avg latency. async fn measure_random_access_lance(path: &str) -> Result { use lance::dataset::Dataset; let dataset = Dataset::open(path).await?; let n = dataset.count_rows(None).await?; let indices: Vec = (0..20).map(|i| ((i as u64) * (n as u64 / 23)) % (n as u64)).collect(); // Full-schema projection — Lance's Schema implements Into. let schema = dataset.schema().clone(); let mut total_us: u128 = 0; for idx in &indices { let t0 = Instant::now(); let _batch = dataset.take(&[*idx], schema.clone()).await?; total_us += t0.elapsed().as_micros(); } Ok(total_us as f32 / indices.len() as f32) } /// Random row access for Parquet — full scan + filter. There's no random-access /// primitive in vanilla Parquet, so this is the cost of finding one specific row. /// This is the cost our current design pays for "get doc X's full text for RAG." fn measure_random_access_parquet(path: &str) -> Result { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use std::fs::File; // We simulate 5 lookups — full scan each time. 20 would be painful. let iters = 5; let mut total_us: u128 = 0; for _ in 0..iters { let t0 = Instant::now(); let file = File::open(path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; let reader = builder.build()?; // Iterate until we've conceptually found a row — we stop early if // we wanted row 50000, but we have to at least read its batch. let mut seen = 0usize; for b in reader { let b = b?; seen += b.num_rows(); if seen > 50000 { break; } } total_us += t0.elapsed().as_micros(); } Ok(total_us as f32 / iters as f32) } /// Append 10K new rows to the existing Lance dataset. /// Measures the "ingest delta" cost without full rewrite. async fn append_10k_rows(path: &str, dims: usize) -> Result<()> { use lance::dataset::{Dataset, WriteMode, WriteParams}; let dataset = Dataset::open(path).await?; let schema = dataset.schema(); let arrow_schema: Arc = Arc::new(schema.into()); // Build a 10K row batch with random-ish data matching the existing schema. let n = 10_000; let arrays: Vec = arrow_schema .fields() .iter() .map(|f| -> Result { match f.data_type() { DataType::Utf8 => { let vals: Vec = (0..n).map(|i| format!("appended-{}", i)).collect(); Ok(Arc::new(arrow_array::StringArray::from(vals))) } DataType::Int32 => { let vals: Vec = (0..n as i32).collect(); Ok(Arc::new(arrow_array::Int32Array::from(vals))) } DataType::FixedSizeList(_, _) => { let floats: Vec = (0..n * dims).map(|i| (i as f32).sin()).collect(); let values = Float32Array::from(floats); let field = Arc::new(Field::new("item", DataType::Float32, true)); let fsl = FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None)?; Ok(Arc::new(fsl)) } other => anyhow::bail!("unsupported append column type: {:?}", other), } }) .collect::>>()?; let batch = RecordBatch::try_new(arrow_schema.clone(), arrays)?; let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), arrow_schema); let params = WriteParams { mode: WriteMode::Append, ..Default::default() }; Dataset::write(reader, path, Some(params)).await?; Ok(()) } /// Grab a few existing vectors from the dataset to use as self-similar queries. async fn sample_query_vectors( dataset: &lance::dataset::Dataset, count: usize, ) -> Result>> { use futures::StreamExt; // Just take the first `count` rows; good enough for latency measurement. let scanner = dataset.scan(); let mut scanner = scanner; scanner.limit(Some(count as i64), None)?; scanner.project(&["vector"])?; let mut stream = scanner.try_into_stream().await?; let mut out = Vec::with_capacity(count); while let Some(batch) = stream.next().await { let batch = batch?; let vector_col = batch .column(0) .as_any() .downcast_ref::() .context("vector column must be FixedSizeList")?; for row in 0..vector_col.len() { if out.len() >= count { break; } let values = vector_col.value(row); let f32_arr = values .as_any() .downcast_ref::() .context("inner array must be Float32")?; let mut v = Vec::with_capacity(f32_arr.len()); for i in 0..f32_arr.len() { v.push(f32_arr.value(i)); } out.push(v); } if out.len() >= count { break; } } Ok(out) }