root 044650a1da lance-bench: also build doc_id btree post-IVF — match gateway's migrate behavior
The bench's own measure_random_access_lance uses take(row_position) —
doesn't need the btree. But datasets written by this bench are commonly
queried via /vectors/lance/doc/<name>/<doc_id> downstream, and without
the btree that path falls back to a full table scan. Building inline
keeps bench-produced datasets immediately production-shape and removes
a footgun (the same one that made scale_test_10m's doc-fetch ~100ms
until commit 5d30b3d fixed it via the migrate handler path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 22:19:16 -05:00

654 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase B: Lance pilot benchmark.
//!
//! Standalone binary that compares Lance vector storage against our
//! Parquet-with-binary-blob + in-RAM HNSW approach. See
//! docs/EXECUTION_PLAN.md for the decision rules this fuels.
//!
//! Inputs:
//! data/vectors/resumes_100k_v2.parquet — existing 100K × 768d embeddings
//!
//! Output:
//! A JSON report printed to stdout with measurements for:
//! - Cold load time (parquet → arrow) vs Lance open + scan
//! - Disk size
//! - Vector search latency (p50 / p95 / p99)
//! - Single-row random access
//! - Append cost (adding 10K rows)
//!
//! Usage:
//! cargo run --bin lance-bench -- \
//! --parquet data/vectors/resumes_100k_v2.parquet \
//! --lance-out /tmp/lance_resumes_100k_v2 \
//! --json-out /tmp/lance_bench.json
use anyhow::{Context, Result};
use arrow_array::{Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use serde::Serialize;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Serialize)]
struct BenchReport {
vectors: usize,
dimensions: usize,
parquet_path: String,
lance_path: String,
// Parquet baseline
parquet_disk_bytes: u64,
parquet_cold_load_secs: f32,
// Lance numbers
lance_write_secs: f32,
lance_disk_bytes: u64,
lance_cold_open_secs: f32,
// Index + search
lance_index_build_secs: Option<f32>,
lance_index_disk_bytes: Option<u64>,
lance_search_p50_us: Option<f32>,
lance_search_p95_us: Option<f32>,
lance_search_p99_us: Option<f32>,
// Architectural features Parquet+sidecar can't cheaply do
lance_random_row_access_us: Option<f32>, // fetch one row by row_id
parquet_random_row_access_us: Option<f32>, // for comparison — full scan cost
lance_append_10k_secs: Option<f32>, // add 10K new rows
lance_append_disk_bytes_added: Option<u64>,
// Head-to-head reference (from our own measurements)
reference_hnsw_p50_us: f32,
reference_hnsw_p95_us: f32,
reference_brute_force_us: f32,
reference_hnsw_build_secs: f32,
}
#[tokio::main]
async fn main() -> Result<()> {
// Simple positional args: parquet_in, lance_out.
let args: Vec<String> = std::env::args().collect();
let parquet_path = args
.get(1)
.cloned()
.unwrap_or_else(|| "data/vectors/resumes_100k_v2.parquet".to_string());
let lance_path = args
.get(2)
.cloned()
.unwrap_or_else(|| "/tmp/lance_bench_dataset".to_string());
eprintln!("=== Phase B Lance pilot ===");
eprintln!("input parquet: {}", parquet_path);
eprintln!("output lance: {}", lance_path);
// --- 1. Cold-load the existing Parquet vector index into memory
eprintln!("\n[1/4] reading Parquet baseline...");
let t0 = Instant::now();
let (schema, batches, total_rows) = read_parquet_vectors(&parquet_path)
.context("read parquet")?;
let parquet_cold_load_secs = t0.elapsed().as_secs_f32();
let parquet_disk_bytes = std::fs::metadata(&parquet_path)?.len();
let dims = detect_vector_dims(&batches)?;
eprintln!(
" loaded {} rows, {} columns, vectors={}d, disk={:.1} MB, cold load={:.2}s",
total_rows,
schema.fields().len(),
dims,
parquet_disk_bytes as f64 / 1_000_000.0,
parquet_cold_load_secs,
);
// --- 2. Convert from binary-blob-of-f32 to Lance's FixedSizeList<Float32>
eprintln!("\n[2/4] converting binary-blob vectors to Arrow FixedSizeList...");
let t0 = Instant::now();
let (lance_schema, lance_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
eprintln!(" conversion took {:.2}s", t0.elapsed().as_secs_f32());
// --- 3. Write as Lance dataset
eprintln!("\n[3/4] writing Lance dataset...");
let t0 = Instant::now();
// Clean up any prior run
let _ = std::fs::remove_dir_all(&lance_path);
write_lance_dataset(&lance_path, lance_schema.clone(), lance_batches).await?;
let lance_write_secs = t0.elapsed().as_secs_f32();
let lance_disk_bytes = dir_size_bytes(&lance_path);
eprintln!(
" write took {:.2}s, disk={:.1} MB",
lance_write_secs,
lance_disk_bytes as f64 / 1_000_000.0,
);
// --- 4. Cold open + scan the Lance dataset
eprintln!("\n[4/6] cold-opening Lance dataset...");
let t0 = Instant::now();
let scanned_rows = cold_open_and_scan_lance(&lance_path).await?;
let lance_cold_open_secs = t0.elapsed().as_secs_f32();
eprintln!(
" open + full scan: {} rows in {:.2}s",
scanned_rows, lance_cold_open_secs,
);
// --- 5. Build a vector index on the Lance dataset
eprintln!("\n[5/6] building Lance vector index (IVF_PQ)...");
let t0 = Instant::now();
let index_built = build_lance_vector_index(&lance_path, dims).await;
let (lance_index_build_secs, lance_index_disk_bytes) = match index_built {
Ok(()) => {
let secs = t0.elapsed().as_secs_f32();
let disk = dir_size_bytes(&lance_path) - lance_disk_bytes;
eprintln!(" built in {:.2}s, index adds {:.1} MB on disk", secs, disk as f64 / 1e6);
(Some(secs), Some(disk))
}
Err(e) => {
eprintln!(" index build failed: {e:#}");
(None, None)
}
};
// --- 6. Run search queries, measure latency
eprintln!("\n[6/6] running vector search benchmarks...");
let search_stats = if lance_index_build_secs.is_some() {
run_search_benchmarks(&lance_path, dims).await.ok()
} else {
None
};
let (lance_search_p50, lance_search_p95, lance_search_p99) = match search_stats {
Some((p50, p95, p99)) => {
eprintln!(" p50={:.0}us p95={:.0}us p99={:.0}us", p50, p95, p99);
(Some(p50), Some(p95), Some(p99))
}
None => (None, None, None),
};
// --- Random access comparison
eprintln!("\n[7/8] random row access — Lance vs full-scan Parquet...");
let lance_random = measure_random_access_lance(&lance_path).await.ok();
let parquet_random = measure_random_access_parquet(&parquet_path).ok();
if let Some(us) = lance_random {
eprintln!(" Lance random-fetch avg: {:.0}us", us);
}
if let Some(us) = parquet_random {
eprintln!(" Parquet full-scan-to-row avg: {:.0}us", us);
}
// --- Append cost
eprintln!("\n[8/8] append 10K new rows to existing dataset...");
let t0 = Instant::now();
let pre_append_bytes = dir_size_bytes(&lance_path);
let append_result = append_10k_rows(&lance_path, dims).await;
let (lance_append_secs, lance_append_bytes) = match append_result {
Ok(()) => {
let secs = t0.elapsed().as_secs_f32();
let bytes = dir_size_bytes(&lance_path).saturating_sub(pre_append_bytes);
eprintln!(" append took {:.2}s, added {:.1} MB", secs, bytes as f64 / 1e6);
(Some(secs), Some(bytes))
}
Err(e) => {
eprintln!(" append failed: {e:#}");
(None, None)
}
};
// --- Report
let report = BenchReport {
vectors: total_rows,
dimensions: dims,
parquet_path: parquet_path.clone(),
lance_path: lance_path.clone(),
parquet_disk_bytes,
parquet_cold_load_secs,
lance_write_secs,
lance_disk_bytes,
lance_cold_open_secs,
lance_index_build_secs,
lance_index_disk_bytes,
lance_search_p50_us: lance_search_p50,
lance_search_p95_us: lance_search_p95,
lance_search_p99_us: lance_search_p99,
lance_random_row_access_us: lance_random,
parquet_random_row_access_us: parquet_random,
lance_append_10k_secs: lance_append_secs,
lance_append_disk_bytes_added: lance_append_bytes,
// From our Phase 15 trial on the SAME index (ec=80 es=30, recall=1.00):
reference_hnsw_p50_us: 873.0,
reference_hnsw_p95_us: 1413.0,
reference_brute_force_us: 43983.0,
reference_hnsw_build_secs: 230.0,
};
let json = serde_json::to_string_pretty(&report)?;
println!("{}", json);
eprintln!("\n=== Summary ===");
eprintln!(" Parquet cold load: {:.2}s", report.parquet_cold_load_secs);
eprintln!(" Lance cold open: {:.2}s ({})",
report.lance_cold_open_secs,
format_ratio(report.parquet_cold_load_secs, report.lance_cold_open_secs));
eprintln!(" Parquet disk: {:.1} MB", report.parquet_disk_bytes as f64 / 1e6);
eprintln!(" Lance disk: {:.1} MB ({})",
report.lance_disk_bytes as f64 / 1e6,
format_ratio(report.parquet_disk_bytes as f32, report.lance_disk_bytes as f32));
if let (Some(p50), Some(p95)) = (report.lance_search_p50_us, report.lance_search_p95_us) {
eprintln!(" Lance search p50: {:.0}us vs our HNSW {:.0}us ({})",
p50, report.reference_hnsw_p50_us,
format_ratio(report.reference_hnsw_p50_us, p50));
eprintln!(" Lance search p95: {:.0}us vs our HNSW {:.0}us ({})",
p95, report.reference_hnsw_p95_us,
format_ratio(report.reference_hnsw_p95_us, p95));
eprintln!(" Speedup vs brute force: {:.1}× (Lance) vs {:.1}× (HNSW)",
report.reference_brute_force_us / p50,
report.reference_brute_force_us / report.reference_hnsw_p50_us);
}
if let Some(build) = report.lance_index_build_secs {
eprintln!(" Index build: {:.1}s (Lance IVF_PQ) vs {:.0}s (our HNSW ec=80) ({}× faster)",
build, report.reference_hnsw_build_secs, report.reference_hnsw_build_secs / build);
}
if let (Some(lance_us), Some(parquet_us)) = (report.lance_random_row_access_us, report.parquet_random_row_access_us) {
eprintln!(" Random row access: {:.0}us (Lance) vs {:.0}us (Parquet scan) ({})",
lance_us, parquet_us, format_ratio(parquet_us, lance_us));
}
if let Some(append_secs) = report.lance_append_10k_secs {
eprintln!(" Append 10K rows: {:.2}s (Lance native) [Parquet would require full rewrite]",
append_secs);
}
Ok(())
}
fn format_ratio(baseline: f32, candidate: f32) -> String {
if candidate == 0.0 { return "inf".into(); }
let ratio = baseline / candidate;
if ratio >= 1.0 {
format!("{:.2}× faster/smaller", ratio)
} else {
format!("{:.2}× slower/larger", 1.0 / ratio)
}
}
fn dir_size_bytes(path: &str) -> u64 {
fn recurse(p: &std::path::Path) -> u64 {
let Ok(meta) = std::fs::metadata(p) else { return 0; };
if meta.is_file() { return meta.len(); }
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
entries
.filter_map(|e| e.ok())
.map(|e| recurse(&e.path()))
.sum()
}
recurse(std::path::Path::new(path))
}
/// Read the existing vector Parquet (binary-blob format: source, doc_id,
/// chunk_idx, chunk_text, vector as Binary bytes).
fn read_parquet_vectors(path: &str) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize)> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path).with_context(|| format!("open {path}"))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
Ok((schema, batches, rows))
}
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize> {
for batch in batches {
let vector_col_idx = batch
.schema()
.index_of("vector")
.context("no 'vector' column in parquet")?;
let col = batch.column(vector_col_idx);
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
for i in 0..binary.len() {
if !binary.is_null(i) {
let bytes = binary.value(i);
return Ok(bytes.len() / 4); // f32 = 4 bytes
}
}
}
}
anyhow::bail!("could not determine vector dimensions")
}
/// Convert our binary-blob vector representation into Arrow's native
/// FixedSizeList<Float32> — that's what Lance expects for vector columns.
fn convert_to_fixed_size_list(
schema: &Arc<Schema>,
batches: Vec<RecordBatch>,
dims: usize,
) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
// New schema keeps everything identical but replaces the vector column
// with a FixedSizeList<Float32, dims>.
let new_fields: Vec<Arc<Field>> = schema
.fields()
.iter()
.map(|f| {
if f.name() == "vector" {
Arc::new(Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dims as i32,
),
false,
))
} else {
f.clone()
}
})
.collect();
let new_schema = Arc::new(Schema::new(new_fields));
let mut new_batches = Vec::with_capacity(batches.len());
for batch in batches {
let vector_idx = batch.schema().index_of("vector")?;
let mut new_arrays: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
for (i, col) in batch.columns().iter().enumerate() {
if i == vector_idx {
let binary = col
.as_any()
.downcast_ref::<BinaryArray>()
.context("vector column must be Binary")?;
let fsl = binary_to_fixed_size_list(binary, dims)?;
new_arrays.push(Arc::new(fsl));
} else {
new_arrays.push(col.clone());
}
}
new_batches.push(RecordBatch::try_new(new_schema.clone(), new_arrays)?);
}
Ok((new_schema, new_batches))
}
fn binary_to_fixed_size_list(binary: &BinaryArray, dims: usize) -> Result<FixedSizeListArray> {
let n = binary.len();
let mut all_floats: Vec<f32> = Vec::with_capacity(n * dims);
for i in 0..n {
if binary.is_null(i) {
all_floats.extend(std::iter::repeat(0.0).take(dims));
continue;
}
let bytes = binary.value(i);
if bytes.len() != dims * 4 {
anyhow::bail!(
"row {} has {} bytes, expected {} ({} × f32)",
i, bytes.len(), dims * 4, dims,
);
}
for chunk in bytes.chunks_exact(4) {
all_floats.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
}
}
let values = Float32Array::from(all_floats);
let field = Arc::new(Field::new("item", DataType::Float32, true));
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None)
.context("build FixedSizeListArray")
}
/// Write batches into a Lance dataset at the given path.
async fn write_lance_dataset(
path: &str,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Result<()> {
use lance::dataset::{Dataset, WriteParams};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Dataset::write(reader, path, Some(WriteParams::default()))
.await
.context("Dataset::write")?;
Ok(())
}
/// Open a Lance dataset cold (from disk) and scan it fully — measuring the
/// equivalent of our "load embeddings from Parquet" cost.
async fn cold_open_and_scan_lance(path: &str) -> Result<usize> {
use futures::StreamExt;
use lance::dataset::Dataset;
let dataset = Dataset::open(path).await.context("Dataset::open")?;
let scanner = dataset.scan();
let mut stream = scanner.try_into_stream().await?;
let mut total = 0usize;
while let Some(batch) = stream.next().await {
let batch = batch?;
total += batch.num_rows();
}
Ok(total)
}
/// Build an IVF_PQ vector index on the `vector` column. IVF_PQ (Inverted File
/// with Product Quantization) is Lance's native ANN index — comparable to
/// HNSW in intent, but on-disk and compatible with Lance's random-access
/// model.
async fn build_lance_vector_index(path: &str, _dims: usize) -> Result<()> {
use lance::dataset::Dataset;
use lance::index::vector::VectorIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
let mut dataset = Dataset::open(path).await?;
// IVF_PQ with ~sqrt(N) partitions is a reasonable default for 100K.
// num_sub_vectors must divide dims evenly: 768/48 = 16 dims per subvector.
// num_bits = 8 gives 256 codes per subvector (good recall/size trade).
// max_iterations = 50 is plenty for this scale.
let params = VectorIndexParams::ivf_pq(
316, // num_partitions (~sqrt(100000))
8, // num_bits
48, // num_sub_vectors
MetricType::Cosine,
50, // max_iterations
);
dataset
.create_index(
&["vector"],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true,
)
.await
.context("create_index")?;
// Also build the scalar btree on doc_id. This bench's
// measure_random_access_lance uses take(row_position) which doesn't
// need the btree, but the dataset this bench writes is also queried
// downstream by /vectors/lance/doc/<name>/<doc_id> (the production
// lookup path) — without this index that path falls back to a full
// table scan. Cheap to build (~1.2s on 10M rows) and matches the
// gateway's lance_migrate handler behavior so bench-produced datasets
// are immediately production-shape.
use lance_index::scalar::ScalarIndexParams;
dataset
.create_index(
&["doc_id"],
IndexType::Scalar,
Some("doc_id_btree".into()),
&ScalarIndexParams::default(),
true,
)
.await
.context("create_index doc_id btree")?;
Ok(())
}
/// Run N vector searches against the Lance dataset and return (p50, p95, p99) latencies in us.
/// Uses a handful of random rows as queries — same pattern as our harness::synthetic_from_chunks.
async fn run_search_benchmarks(path: &str, _dims: usize) -> Result<(f32, f32, f32)> {
use futures::StreamExt;
use lance::dataset::Dataset;
let dataset = Dataset::open(path).await?;
// Pick 20 representative query vectors from the data itself.
// (Synthetic — same pattern as our existing harness.)
let query_vectors = sample_query_vectors(&dataset, 20).await?;
let mut latencies_us: Vec<f32> = Vec::with_capacity(query_vectors.len());
for (i, qv) in query_vectors.iter().enumerate() {
let qarr = Arc::new(Float32Array::from(qv.clone())) as ArrayRef;
let t0 = Instant::now();
let mut scanner = dataset.scan();
scanner
.nearest("vector", qarr.as_any().downcast_ref::<Float32Array>().unwrap(), 10)
.context("scanner.nearest")?;
let mut stream = scanner.try_into_stream().await?;
let mut hits = 0;
while let Some(batch) = stream.next().await {
let batch = batch?;
hits += batch.num_rows();
}
let us = t0.elapsed().as_micros() as f32;
latencies_us.push(us);
if i == 0 {
eprintln!(" first query: {} hits in {:.0}us (includes any lazy init)", hits, us);
}
}
latencies_us.sort_by(|a, b| a.partial_cmp(b).unwrap());
let p = |pct: f32| -> f32 {
let idx = ((latencies_us.len() as f32 - 1.0) * pct).round() as usize;
latencies_us[idx.min(latencies_us.len() - 1)]
};
Ok((p(0.50), p(0.95), p(0.99)))
}
/// Random row access via Lance's `take` — fetch 20 random rows by index, measure avg latency.
async fn measure_random_access_lance(path: &str) -> Result<f32> {
use lance::dataset::Dataset;
let dataset = Dataset::open(path).await?;
let n = dataset.count_rows(None).await?;
let indices: Vec<u64> = (0..20).map(|i| ((i as u64) * (n as u64 / 23)) % (n as u64)).collect();
// Full-schema projection — Lance's Schema implements Into<ProjectionRequest>.
let schema = dataset.schema().clone();
let mut total_us: u128 = 0;
for idx in &indices {
let t0 = Instant::now();
let _batch = dataset.take(&[*idx], schema.clone()).await?;
total_us += t0.elapsed().as_micros();
}
Ok(total_us as f32 / indices.len() as f32)
}
/// Random row access for Parquet — full scan + filter. There's no random-access
/// primitive in vanilla Parquet, so this is the cost of finding one specific row.
/// This is the cost our current design pays for "get doc X's full text for RAG."
fn measure_random_access_parquet(path: &str) -> Result<f32> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
// We simulate 5 lookups — full scan each time. 20 would be painful.
let iters = 5;
let mut total_us: u128 = 0;
for _ in 0..iters {
let t0 = Instant::now();
let file = File::open(path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let reader = builder.build()?;
// Iterate until we've conceptually found a row — we stop early if
// we wanted row 50000, but we have to at least read its batch.
let mut seen = 0usize;
for b in reader {
let b = b?;
seen += b.num_rows();
if seen > 50000 { break; }
}
total_us += t0.elapsed().as_micros();
}
Ok(total_us as f32 / iters as f32)
}
/// Append 10K new rows to the existing Lance dataset.
/// Measures the "ingest delta" cost without full rewrite.
async fn append_10k_rows(path: &str, dims: usize) -> Result<()> {
use lance::dataset::{Dataset, WriteMode, WriteParams};
let dataset = Dataset::open(path).await?;
let schema = dataset.schema();
let arrow_schema: Arc<Schema> = Arc::new(schema.into());
// Build a 10K row batch with random-ish data matching the existing schema.
let n = 10_000;
let arrays: Vec<ArrayRef> = arrow_schema
.fields()
.iter()
.map(|f| -> Result<ArrayRef> {
match f.data_type() {
DataType::Utf8 => {
let vals: Vec<String> = (0..n).map(|i| format!("appended-{}", i)).collect();
Ok(Arc::new(arrow_array::StringArray::from(vals)))
}
DataType::Int32 => {
let vals: Vec<i32> = (0..n as i32).collect();
Ok(Arc::new(arrow_array::Int32Array::from(vals)))
}
DataType::FixedSizeList(_, _) => {
let floats: Vec<f32> = (0..n * dims).map(|i| (i as f32).sin()).collect();
let values = Float32Array::from(floats);
let field = Arc::new(Field::new("item", DataType::Float32, true));
let fsl = FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None)?;
Ok(Arc::new(fsl))
}
other => anyhow::bail!("unsupported append column type: {:?}", other),
}
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new(arrow_schema.clone(), arrays)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), arrow_schema);
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
Dataset::write(reader, path, Some(params)).await?;
Ok(())
}
/// Grab a few existing vectors from the dataset to use as self-similar queries.
async fn sample_query_vectors(
dataset: &lance::dataset::Dataset,
count: usize,
) -> Result<Vec<Vec<f32>>> {
use futures::StreamExt;
// Just take the first `count` rows; good enough for latency measurement.
let scanner = dataset.scan();
let mut scanner = scanner;
scanner.limit(Some(count as i64), None)?;
scanner.project(&["vector"])?;
let mut stream = scanner.try_into_stream().await?;
let mut out = Vec::with_capacity(count);
while let Some(batch) = stream.next().await {
let batch = batch?;
let vector_col = batch
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.context("vector column must be FixedSizeList")?;
for row in 0..vector_col.len() {
if out.len() >= count { break; }
let values = vector_col.value(row);
let f32_arr = values
.as_any()
.downcast_ref::<Float32Array>()
.context("inner array must be Float32")?;
let mut v = Vec::with_capacity(f32_arr.len());
for i in 0..f32_arr.len() {
v.push(f32_arr.value(i));
}
out.push(v);
}
if out.len() >= count { break; }
}
Ok(out)
}