lakehouse/crates/shared/src/arrow_helpers.rs
root 655b6c0b37 Phase 1: storage + catalog layer
- storaged: object_store backend (LocalFileSystem), PUT/GET/DELETE/LIST endpoints
- shared: arrow_helpers with Parquet roundtrip + schema fingerprinting (2 tests)
- catalogd: in-memory registry with write-ahead manifest persistence to object storage
- catalogd: POST/GET /datasets, GET /datasets/by-name/{name}
- gateway: wires storaged + catalogd with shared object_store state
- Phase tracker updated: Phase 0 + Phase 1 gates passed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 05:15:27 -05:00

84 lines
2.9 KiB
Rust

use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use sha2::{Digest, Sha256};
use crate::types::SchemaFingerprint;
/// Write a RecordBatch to Parquet bytes.
pub fn record_batch_to_parquet(batch: &RecordBatch) -> Result<Bytes, String> {
let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None)
.map_err(|e| e.to_string())?;
writer.write(batch).map_err(|e| e.to_string())?;
writer.close().map_err(|e| e.to_string())?;
Ok(Bytes::from(buf))
}
/// Read Parquet bytes into a Vec of RecordBatches.
pub fn parquet_to_record_batches(data: &[u8]) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::copy_from_slice(data))
.map_err(|e| e.to_string())?;
let schema = builder.schema().clone();
let reader = builder.build().map_err(|e| e.to_string())?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().map_err(|e| e.to_string())?;
Ok((schema, batches))
}
/// Compute a deterministic fingerprint from an Arrow schema.
pub fn fingerprint_schema(schema: &SchemaRef) -> SchemaFingerprint {
let mut hasher = Sha256::new();
for field in schema.fields() {
hasher.update(field.name().as_bytes());
hasher.update(field.data_type().to_string().as_bytes());
hasher.update(if field.is_nullable() { b"1" } else { b"0" });
}
let hash = hasher.finalize();
SchemaFingerprint(format!("{hash:x}"))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Float64Array, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
fn sample_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("score", DataType::Float64, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["alice", "bob", "carol"])),
Arc::new(Float64Array::from(vec![9.5, 8.2, 7.8])),
],
)
.unwrap()
}
#[test]
fn roundtrip_parquet() {
let batch = sample_batch();
let parquet_bytes = record_batch_to_parquet(&batch).unwrap();
let (schema, batches) = parquet_to_record_batches(&parquet_bytes).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}
#[test]
fn schema_fingerprint_deterministic() {
let batch = sample_batch();
let fp1 = fingerprint_schema(&batch.schema());
let fp2 = fingerprint_schema(&batch.schema());
assert_eq!(fp1, fp2);
}
}