Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning): - Phase 36: embed_semaphore on VectorState (permits=1) serializes seed embed calls — prevents sidecar socket collisions under concurrent /seed stress load - Phase 31+: run_stress.ts 6-task diverse stress scaffolding; run_e2e_rated.ts + orchestrator.ts tightening - Catalog dedupe cleanup: 16 duplicate manifests removed; canonical candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB -> 11KB) regenerated post-dedupe; fresh manifests for active datasets - vectord: harness EvalSet refinements (+181), agent portfolio rotation + ingest triggers (+158), autotune + rag adjustments - catalogd/storaged/ingestd/mcp-server: misc tightening - docs: Phase 28-36 PRD entries + DECISIONS ADR additions; control-plane pivot banner added to top of docs/PRD.md (pointing at docs/CONTROL_PLANE_PRD.md which lands in next commit) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
606 lines
23 KiB
Rust
606 lines
23 KiB
Rust
//! Production Lance vector backend (ADR-019 — hybrid architecture).
|
||
//!
|
||
//! This is the firewall crate. It owns its own Arrow 57 / DataFusion 52
|
||
//! / Lance 4 dependency tree. The public API uses only std types
|
||
//! (`Vec<f32>`, `Vec<String>`, `String`, `bool`) so nothing Arrow-shaped
|
||
//! crosses the crate boundary. That keeps vectord (Arrow 55) from
|
||
//! picking up an incompatible dep.
|
||
//!
|
||
//! Responsibilities:
|
||
//! - Migrate an existing binary-blob Parquet vector file into a Lance
|
||
//! dataset with `FixedSizeList<Float32, dims>`. One-time cost.
|
||
//! - Append new rows natively (no full-file rewrite — Lance's structural win).
|
||
//! - Build an IVF_PQ ANN index on the vector column.
|
||
//! - Vector search (`search`) using the IVF_PQ index when present,
|
||
//! falling back to full scan otherwise.
|
||
//! - Random-access row fetch by `doc_id` (`get_by_doc_id`) — the O(1)
|
||
//! lookup that Parquet-on-object-store can't cheaply do.
|
||
//! - Cheap count + stats introspection.
|
||
|
||
use arrow_array::{
|
||
Array, ArrayRef, BinaryArray, FixedSizeListArray, Float32Array, Int32Array,
|
||
RecordBatch, RecordBatchIterator, StringArray,
|
||
};
|
||
use arrow_schema::{DataType, Field, Schema};
|
||
use futures::StreamExt;
|
||
use serde::Serialize;
|
||
use std::sync::Arc;
|
||
use std::time::Instant;
|
||
|
||
// ================= Public types =================
|
||
|
||
/// One search result. Mirrors vectord's existing `SearchResult` shape
|
||
/// structurally but carries simpler types so this crate stays firewalled.
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct Hit {
|
||
pub doc_id: String,
|
||
pub chunk_text: String,
|
||
pub score: f32,
|
||
/// Optional — set by search_with_vector, not by index-only search.
|
||
pub distance: Option<f32>,
|
||
}
|
||
|
||
/// A fully-hydrated row fetched by `get_by_doc_id` — includes the vector
|
||
/// so callers can do downstream work (rerank, cite, etc.) without a
|
||
/// second round trip.
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct Row {
|
||
pub doc_id: String,
|
||
pub chunk_text: String,
|
||
pub vector: Vec<f32>,
|
||
pub source: Option<String>,
|
||
pub chunk_idx: Option<i32>,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct MigrationStats {
|
||
pub rows_written: usize,
|
||
pub dimensions: usize,
|
||
pub disk_bytes: u64,
|
||
pub duration_secs: f32,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct AppendStats {
|
||
pub rows_appended: usize,
|
||
pub disk_bytes_added: u64,
|
||
pub duration_secs: f32,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct IndexStats {
|
||
pub name: String,
|
||
pub num_partitions: u32,
|
||
pub num_bits: u32,
|
||
pub num_sub_vectors: u32,
|
||
pub build_time_secs: f32,
|
||
pub disk_bytes_added: u64,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct ScalarIndexStats {
|
||
pub name: String,
|
||
pub column: String,
|
||
pub build_time_secs: f32,
|
||
pub disk_bytes_added: u64,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Serialize)]
|
||
pub struct DatasetStats {
|
||
pub path: String,
|
||
pub rows: usize,
|
||
pub disk_bytes: u64,
|
||
pub has_vector_index: bool,
|
||
pub has_doc_id_index: bool,
|
||
}
|
||
|
||
// ================= The backend =================
|
||
|
||
/// Thin wrapper around a Lance dataset path. Lance handles the heavy
|
||
/// lifting — we just expose a narrow API.
|
||
#[derive(Clone)]
|
||
pub struct LanceVectorStore {
|
||
/// Local filesystem path or object-store URI (file:///..., s3://...).
|
||
/// Lance's internal URI parsing handles both.
|
||
path: String,
|
||
}
|
||
|
||
impl LanceVectorStore {
|
||
pub fn new(path: impl Into<String>) -> Self {
|
||
Self { path: path.into() }
|
||
}
|
||
|
||
pub fn path(&self) -> &str { &self.path }
|
||
|
||
/// Row count via Lance's fast metadata path (no scan).
|
||
pub async fn count(&self) -> Result<usize, String> {
|
||
let dataset = open_or_err(&self.path).await?;
|
||
let n = dataset.count_rows(None).await.map_err(e)?;
|
||
Ok(n)
|
||
}
|
||
|
||
/// True if the on-disk dataset exists AND has at least one `vector`
|
||
/// column index attached.
|
||
pub async fn has_vector_index(&self) -> Result<bool, String> {
|
||
use lance_index::DatasetIndexExt;
|
||
let dataset = match lance::dataset::Dataset::open(&self.path).await {
|
||
Ok(d) => d,
|
||
Err(_) => return Ok(false),
|
||
};
|
||
let indexes = dataset.load_indices().await.map_err(e)?;
|
||
Ok(indexes.iter().any(|ix| {
|
||
ix.fields.iter().any(|fid| {
|
||
dataset.schema().field_by_id(*fid)
|
||
.map(|f| f.name == "vector")
|
||
.unwrap_or(false)
|
||
})
|
||
}))
|
||
}
|
||
|
||
pub async fn stats(&self) -> Result<DatasetStats, String> {
|
||
let rows = self.count().await.unwrap_or(0);
|
||
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let has_vector_index = self.has_vector_index().await.unwrap_or(false);
|
||
let has_doc_id_index = self.has_scalar_index("doc_id").await.unwrap_or(false);
|
||
Ok(DatasetStats {
|
||
path: self.path.clone(),
|
||
rows,
|
||
disk_bytes,
|
||
has_vector_index,
|
||
has_doc_id_index,
|
||
})
|
||
}
|
||
|
||
/// Migrate a vectord-format Parquet file into a Lance dataset.
|
||
///
|
||
/// Input schema (vectord's binary-blob format):
|
||
/// - source : Utf8
|
||
/// - doc_id : Utf8
|
||
/// - chunk_idx : Int32
|
||
/// - chunk_text : Utf8
|
||
/// - vector : Binary (raw f32 little-endian bytes)
|
||
///
|
||
/// Output schema (Lance-friendly):
|
||
/// - source : Utf8
|
||
/// - doc_id : Utf8
|
||
/// - chunk_idx : Int32
|
||
/// - chunk_text : Utf8
|
||
/// - vector : FixedSizeList<Float32, dims>
|
||
///
|
||
/// Idempotent at the file level — if the target exists, it's
|
||
/// overwritten. Caller must manage destination paths.
|
||
pub async fn migrate_from_parquet_bytes(
|
||
&self,
|
||
parquet_bytes: &[u8],
|
||
) -> Result<MigrationStats, String> {
|
||
let t0 = Instant::now();
|
||
let (schema, batches, rows) = read_parquet(parquet_bytes)?;
|
||
let dims = detect_vector_dims(&batches)?;
|
||
let (new_schema, new_batches) = convert_to_fixed_size_list(&schema, batches, dims)?;
|
||
|
||
// Overwrite any prior dataset at this path.
|
||
let _ = std::fs::remove_dir_all(&strip_file_uri(&self.path));
|
||
|
||
write_dataset(&self.path, new_schema, new_batches).await?;
|
||
let disk_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
|
||
Ok(MigrationStats {
|
||
rows_written: rows,
|
||
dimensions: dims,
|
||
disk_bytes,
|
||
duration_secs: t0.elapsed().as_secs_f32(),
|
||
})
|
||
}
|
||
|
||
/// Native Lance append — does NOT rewrite existing files. New rows
|
||
/// land as a separate fragment; readers union across fragments at
|
||
/// query time. Contrast: our Parquet path requires rewriting the
|
||
/// entire vector file to add rows.
|
||
pub async fn append(
|
||
&self,
|
||
source: Option<String>,
|
||
doc_ids: Vec<String>,
|
||
chunk_idxs: Vec<i32>,
|
||
chunk_texts: Vec<String>,
|
||
vectors: Vec<Vec<f32>>,
|
||
) -> Result<AppendStats, String> {
|
||
let n = doc_ids.len();
|
||
if n == 0 {
|
||
return Ok(AppendStats { rows_appended: 0, disk_bytes_added: 0, duration_secs: 0.0 });
|
||
}
|
||
if chunk_idxs.len() != n || chunk_texts.len() != n || vectors.len() != n {
|
||
return Err(format!(
|
||
"append: length mismatch (doc_ids={n}, chunk_idxs={}, chunk_texts={}, vectors={})",
|
||
chunk_idxs.len(), chunk_texts.len(), vectors.len(),
|
||
));
|
||
}
|
||
let dims = vectors[0].len();
|
||
for (i, v) in vectors.iter().enumerate() {
|
||
if v.len() != dims {
|
||
return Err(format!("append: row {i} has {} dims, expected {}", v.len(), dims));
|
||
}
|
||
}
|
||
|
||
let t0 = Instant::now();
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
|
||
let src_arr = StringArray::from(
|
||
(0..n).map(|_| source.clone()).collect::<Vec<_>>()
|
||
);
|
||
let doc_id_arr = StringArray::from(doc_ids);
|
||
let chunk_idx_arr = Int32Array::from(chunk_idxs);
|
||
let chunk_text_arr = StringArray::from(chunk_texts);
|
||
|
||
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
|
||
for v in vectors { flat.extend(v); }
|
||
let values = Float32Array::from(flat);
|
||
let item_field = Arc::new(Field::new("item", DataType::Float32, true));
|
||
let vec_arr = FixedSizeListArray::try_new(
|
||
item_field.clone(), dims as i32, Arc::new(values), None,
|
||
).map_err(e)?;
|
||
|
||
let schema = Arc::new(Schema::new(vec![
|
||
Field::new("source", DataType::Utf8, true),
|
||
Field::new("doc_id", DataType::Utf8, false),
|
||
Field::new("chunk_idx", DataType::Int32, true),
|
||
Field::new("chunk_text", DataType::Utf8, true),
|
||
Field::new("vector", DataType::FixedSizeList(item_field, dims as i32), false),
|
||
]));
|
||
|
||
let arrays: Vec<ArrayRef> = vec![
|
||
Arc::new(src_arr), Arc::new(doc_id_arr), Arc::new(chunk_idx_arr),
|
||
Arc::new(chunk_text_arr), Arc::new(vec_arr),
|
||
];
|
||
let batch = RecordBatch::try_new(schema.clone(), arrays).map_err(e)?;
|
||
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema);
|
||
|
||
use lance::dataset::{Dataset, WriteMode, WriteParams};
|
||
let params = WriteParams { mode: WriteMode::Append, ..Default::default() };
|
||
Dataset::write(reader, &self.path, Some(params)).await.map_err(e)?;
|
||
|
||
Ok(AppendStats {
|
||
rows_appended: n,
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
duration_secs: t0.elapsed().as_secs_f32(),
|
||
})
|
||
}
|
||
|
||
/// Build an IVF_PQ vector index. Replaces any prior index with the
|
||
/// same name. Callers pass explicit params — sensible defaults for
|
||
/// ~100K × 768d: num_partitions=316 (≈√N), num_bits=8, num_sub_vectors=48.
|
||
pub async fn build_index(
|
||
&self,
|
||
num_partitions: u32,
|
||
num_bits: u32,
|
||
num_sub_vectors: u32,
|
||
) -> Result<IndexStats, String> {
|
||
use lance::dataset::Dataset;
|
||
use lance::index::vector::VectorIndexParams;
|
||
use lance_index::{DatasetIndexExt, IndexType};
|
||
use lance_linalg::distance::MetricType;
|
||
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let t0 = Instant::now();
|
||
|
||
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let params = VectorIndexParams::ivf_pq(
|
||
num_partitions as usize,
|
||
num_bits as u8,
|
||
num_sub_vectors as usize,
|
||
MetricType::Cosine,
|
||
50, // max_iterations — same as bench
|
||
);
|
||
dataset.create_index(
|
||
&["vector"],
|
||
IndexType::Vector,
|
||
Some("vec_idx".into()),
|
||
¶ms,
|
||
true, // replace
|
||
).await.map_err(e)?;
|
||
|
||
Ok(IndexStats {
|
||
name: "vec_idx".into(),
|
||
num_partitions,
|
||
num_bits,
|
||
num_sub_vectors,
|
||
build_time_secs: t0.elapsed().as_secs_f32(),
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
})
|
||
}
|
||
|
||
/// Build a scalar btree index on `doc_id`. Makes `get_by_doc_id`
|
||
/// O(log N) instead of a filter-scan of every fragment. Small index
|
||
/// footprint — a few hundred KB on 100K rows.
|
||
pub async fn build_scalar_index(&self, column: &str) -> Result<ScalarIndexStats, String> {
|
||
use lance::dataset::Dataset;
|
||
use lance_index::{DatasetIndexExt, IndexType};
|
||
use lance_index::scalar::ScalarIndexParams;
|
||
|
||
let pre_bytes = dir_size_bytes(&strip_file_uri(&self.path));
|
||
let t0 = Instant::now();
|
||
|
||
let mut dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let idx_name = format!("{column}_btree");
|
||
dataset.create_index(
|
||
&[column],
|
||
IndexType::Scalar,
|
||
Some(idx_name.clone()),
|
||
&ScalarIndexParams::default(),
|
||
true,
|
||
).await.map_err(e)?;
|
||
|
||
Ok(ScalarIndexStats {
|
||
name: idx_name,
|
||
column: column.to_string(),
|
||
build_time_secs: t0.elapsed().as_secs_f32(),
|
||
disk_bytes_added: dir_size_bytes(&strip_file_uri(&self.path)).saturating_sub(pre_bytes),
|
||
})
|
||
}
|
||
|
||
/// True if a scalar index exists on the named column.
|
||
pub async fn has_scalar_index(&self, column: &str) -> Result<bool, String> {
|
||
use lance_index::DatasetIndexExt;
|
||
let dataset = match lance::dataset::Dataset::open(&self.path).await {
|
||
Ok(d) => d,
|
||
Err(_) => return Ok(false),
|
||
};
|
||
let indexes = dataset.load_indices().await.map_err(e)?;
|
||
Ok(indexes.iter().any(|ix| {
|
||
ix.fields.iter().any(|fid| {
|
||
dataset.schema().field_by_id(*fid)
|
||
.map(|f| f.name == column)
|
||
.unwrap_or(false)
|
||
})
|
||
}))
|
||
}
|
||
|
||
/// Search for top_k nearest neighbors of `query`. Uses the IVF_PQ
|
||
/// index if one exists; otherwise does a full scan (slow but
|
||
/// correct — useful during development before index build).
|
||
///
|
||
/// `nprobes` tells Lance how many IVF partitions to probe per query.
|
||
/// Lance's built-in default is **1**, which caps recall well below
|
||
/// what the index is actually capable of. Passing `None` keeps that
|
||
/// default (only sensible when latency trumps recall); for real
|
||
/// workloads set `nprobes` to 5–10% of `num_partitions` (e.g. 20–30
|
||
/// on a 316-partition index).
|
||
///
|
||
/// `refine_factor` re-ranks the PQ-approximate top-k by computing
|
||
/// exact distances on `top_k * refine_factor` candidates, then
|
||
/// trimming to `top_k`. Cheap way to buy back recall lost to product
|
||
/// quantization. `None` skips the re-rank pass.
|
||
pub async fn search(
|
||
&self,
|
||
query: &[f32],
|
||
top_k: usize,
|
||
nprobes: Option<usize>,
|
||
refine_factor: Option<u32>,
|
||
) -> Result<Vec<Hit>, String> {
|
||
use lance::dataset::Dataset;
|
||
|
||
let dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let qarr = Float32Array::from(query.to_vec());
|
||
|
||
let mut scanner = dataset.scan();
|
||
scanner.nearest("vector", &qarr, top_k as usize).map_err(e)?;
|
||
if let Some(n) = nprobes { scanner.nprobes(n); }
|
||
if let Some(f) = refine_factor { scanner.refine(f); }
|
||
scanner.project(&["doc_id", "chunk_text"]).map_err(e)?;
|
||
|
||
let mut stream = scanner.try_into_stream().await.map_err(e)?;
|
||
let mut hits: Vec<Hit> = Vec::with_capacity(top_k);
|
||
|
||
while let Some(batch) = stream.next().await {
|
||
let batch = batch.map_err(e)?;
|
||
let doc_ids = batch.column_by_name("doc_id")
|
||
.ok_or_else(|| "no doc_id column in search result".to_string())?
|
||
.as_any().downcast_ref::<StringArray>()
|
||
.ok_or_else(|| "doc_id is not StringArray".to_string())?;
|
||
let chunk_texts = batch.column_by_name("chunk_text")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
|
||
// Lance tacks on a `_distance` column for nearest() queries.
|
||
let distances = batch.column_by_name("_distance")
|
||
.and_then(|c| c.as_any().downcast_ref::<Float32Array>());
|
||
for row in 0..batch.num_rows() {
|
||
let d = distances.map(|a| a.value(row));
|
||
hits.push(Hit {
|
||
doc_id: doc_ids.value(row).to_string(),
|
||
chunk_text: chunk_texts.map(|a| a.value(row).to_string()).unwrap_or_default(),
|
||
score: d.map(|d| 1.0 - d).unwrap_or(0.0), // cosine distance → similarity
|
||
distance: d,
|
||
});
|
||
}
|
||
}
|
||
Ok(hits)
|
||
}
|
||
|
||
/// Fetch one row by doc_id. Implementation: Lance filter-pushdown
|
||
/// scan — O(1) with partition pruning on a proper btree index,
|
||
/// O(N) on vector-only datasets (still far faster than reading
|
||
/// the whole Parquet file). We don't build a scalar index on
|
||
/// doc_id yet; that's a future optimization.
|
||
pub async fn get_by_doc_id(&self, doc_id: &str) -> Result<Option<Row>, String> {
|
||
use lance::dataset::Dataset;
|
||
|
||
let dataset = Dataset::open(&self.path).await.map_err(e)?;
|
||
let filter = format!("doc_id = '{}'", doc_id.replace('\'', "''"));
|
||
let mut scanner = dataset.scan();
|
||
scanner.filter(&filter).map_err(e)?;
|
||
scanner.limit(Some(1), None).map_err(e)?;
|
||
let mut stream = scanner.try_into_stream().await.map_err(e)?;
|
||
|
||
while let Some(batch) = stream.next().await {
|
||
let batch = batch.map_err(e)?;
|
||
if batch.num_rows() == 0 { continue; }
|
||
return Ok(Some(row_from_batch(&batch, 0)?));
|
||
}
|
||
Ok(None)
|
||
}
|
||
}
|
||
|
||
// ================= Internal helpers =================
|
||
|
||
fn e<T: std::fmt::Display>(err: T) -> String { err.to_string() }
|
||
|
||
/// `file:///abs/path` → `/abs/path`. Leave other URI schemes as-is for
|
||
/// helpers that only work on local paths (dir_size_bytes, remove_dir_all).
|
||
fn strip_file_uri(uri: &str) -> String {
|
||
uri.strip_prefix("file://").unwrap_or(uri).to_string()
|
||
}
|
||
|
||
async fn open_or_err(path: &str) -> Result<lance::dataset::Dataset, String> {
|
||
lance::dataset::Dataset::open(path).await.map_err(e)
|
||
}
|
||
|
||
fn dir_size_bytes(path: &str) -> u64 {
|
||
fn recurse(p: &std::path::Path) -> u64 {
|
||
let Ok(meta) = std::fs::metadata(p) else { return 0; };
|
||
if meta.is_file() { return meta.len(); }
|
||
let Ok(entries) = std::fs::read_dir(p) else { return 0; };
|
||
entries.filter_map(|e| e.ok()).map(|e| recurse(&e.path())).sum()
|
||
}
|
||
recurse(std::path::Path::new(path))
|
||
}
|
||
|
||
fn read_parquet(bytes: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>, usize), String> {
|
||
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::copy_from_slice(bytes))
|
||
.map_err(e)?;
|
||
let schema = builder.schema().clone();
|
||
let reader = builder.build().map_err(e)?;
|
||
let batches: Vec<RecordBatch> = reader.collect::<Result<_, _>>().map_err(e)?;
|
||
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||
Ok((schema, batches, rows))
|
||
}
|
||
|
||
fn detect_vector_dims(batches: &[RecordBatch]) -> Result<usize, String> {
|
||
for batch in batches {
|
||
let idx = batch.schema().index_of("vector")
|
||
.map_err(|_| "no 'vector' column".to_string())?;
|
||
let col = batch.column(idx);
|
||
if let Some(binary) = col.as_any().downcast_ref::<BinaryArray>() {
|
||
for i in 0..binary.len() {
|
||
if !binary.is_null(i) {
|
||
return Ok(binary.value(i).len() / 4);
|
||
}
|
||
}
|
||
} else if let Some(fsl) = col.as_any().downcast_ref::<FixedSizeListArray>() {
|
||
return Ok(fsl.value_length() as usize);
|
||
}
|
||
}
|
||
Err("could not determine vector dimensions".into())
|
||
}
|
||
|
||
fn convert_to_fixed_size_list(
|
||
schema: &Arc<Schema>,
|
||
batches: Vec<RecordBatch>,
|
||
dims: usize,
|
||
) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
|
||
let new_fields: Vec<Arc<Field>> = schema
|
||
.fields()
|
||
.iter()
|
||
.map(|f| {
|
||
if f.name() == "vector" {
|
||
Arc::new(Field::new(
|
||
"vector",
|
||
DataType::FixedSizeList(
|
||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||
dims as i32,
|
||
),
|
||
false,
|
||
))
|
||
} else {
|
||
f.clone()
|
||
}
|
||
})
|
||
.collect();
|
||
let new_schema = Arc::new(Schema::new(new_fields));
|
||
|
||
let mut out = Vec::with_capacity(batches.len());
|
||
for batch in batches {
|
||
let vec_idx = batch.schema().index_of("vector").map_err(e)?;
|
||
let mut new_cols: Vec<ArrayRef> = Vec::with_capacity(batch.num_columns());
|
||
for (i, col) in batch.columns().iter().enumerate() {
|
||
if i == vec_idx {
|
||
if let Some(bin) = col.as_any().downcast_ref::<BinaryArray>() {
|
||
new_cols.push(Arc::new(binary_to_fsl(bin, dims)?));
|
||
} else if col.as_any().is::<FixedSizeListArray>() {
|
||
// Already in the right shape — just clone.
|
||
new_cols.push(col.clone());
|
||
} else {
|
||
return Err("vector column is neither Binary nor FixedSizeList".into());
|
||
}
|
||
} else {
|
||
new_cols.push(col.clone());
|
||
}
|
||
}
|
||
out.push(RecordBatch::try_new(new_schema.clone(), new_cols).map_err(e)?);
|
||
}
|
||
Ok((new_schema, out))
|
||
}
|
||
|
||
fn binary_to_fsl(bin: &BinaryArray, dims: usize) -> Result<FixedSizeListArray, String> {
|
||
let n = bin.len();
|
||
let mut flat: Vec<f32> = Vec::with_capacity(n * dims);
|
||
for i in 0..n {
|
||
if bin.is_null(i) {
|
||
flat.extend(std::iter::repeat(0.0).take(dims));
|
||
continue;
|
||
}
|
||
let b = bin.value(i);
|
||
if b.len() != dims * 4 {
|
||
return Err(format!(
|
||
"row {i}: {} bytes vs expected {} ({} × f32)",
|
||
b.len(), dims * 4, dims,
|
||
));
|
||
}
|
||
for c in b.chunks_exact(4) {
|
||
flat.push(f32::from_le_bytes([c[0], c[1], c[2], c[3]]));
|
||
}
|
||
}
|
||
let values = Float32Array::from(flat);
|
||
let field = Arc::new(Field::new("item", DataType::Float32, true));
|
||
FixedSizeListArray::try_new(field, dims as i32, Arc::new(values), None).map_err(e)
|
||
}
|
||
|
||
async fn write_dataset(
|
||
path: &str,
|
||
schema: Arc<Schema>,
|
||
batches: Vec<RecordBatch>,
|
||
) -> Result<(), String> {
|
||
use lance::dataset::{Dataset, WriteParams};
|
||
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||
Dataset::write(reader, path, Some(WriteParams::default()))
|
||
.await.map_err(e)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn row_from_batch(batch: &RecordBatch, row: usize) -> Result<Row, String> {
|
||
let doc_id = batch.column_by_name("doc_id")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.map(|a| a.value(row).to_string())
|
||
.ok_or_else(|| "missing doc_id".to_string())?;
|
||
let chunk_text = batch.column_by_name("chunk_text")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.map(|a| a.value(row).to_string())
|
||
.unwrap_or_default();
|
||
let source = batch.column_by_name("source")
|
||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row).to_string()) });
|
||
let chunk_idx = batch.column_by_name("chunk_idx")
|
||
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
|
||
.and_then(|a| if a.is_null(row) { None } else { Some(a.value(row)) });
|
||
|
||
let vec_col = batch.column_by_name("vector")
|
||
.ok_or_else(|| "no vector column in row fetch".to_string())?;
|
||
let fsl = vec_col.as_any().downcast_ref::<FixedSizeListArray>()
|
||
.ok_or_else(|| "vector column is not FixedSizeList".to_string())?;
|
||
let inner = fsl.value(row);
|
||
let floats = inner.as_any().downcast_ref::<Float32Array>()
|
||
.ok_or_else(|| "vector inner not Float32".to_string())?;
|
||
let mut v = Vec::with_capacity(floats.len());
|
||
for i in 0..floats.len() { v.push(floats.value(i)); }
|
||
|
||
Ok(Row { doc_id, chunk_text, vector: v, source, chunk_idx })
|
||
}
|