- storaged: object_store backend (LocalFileSystem), PUT/GET/DELETE/LIST endpoints
- shared: arrow_helpers with Parquet roundtrip + schema fingerprinting (2 tests)
- catalogd: in-memory registry with write-ahead manifest persistence to object storage
- catalogd: POST/GET /datasets, GET /datasets/by-name/{name}
- gateway: wires storaged + catalogd with shared object_store state
- Phase tracker updated: Phase 0 + Phase 1 gates passed
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
84 lines
2.9 KiB
Rust
84 lines
2.9 KiB
Rust
use arrow::array::RecordBatch;
|
|
use arrow::datatypes::SchemaRef;
|
|
use bytes::Bytes;
|
|
use parquet::arrow::ArrowWriter;
|
|
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
|
use sha2::{Digest, Sha256};
|
|
|
|
use crate::types::SchemaFingerprint;
|
|
|
|
/// Write a RecordBatch to Parquet bytes.
|
|
pub fn record_batch_to_parquet(batch: &RecordBatch) -> Result<Bytes, String> {
|
|
let mut buf = Vec::new();
|
|
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None)
|
|
.map_err(|e| e.to_string())?;
|
|
writer.write(batch).map_err(|e| e.to_string())?;
|
|
writer.close().map_err(|e| e.to_string())?;
|
|
Ok(Bytes::from(buf))
|
|
}
|
|
|
|
/// Read Parquet bytes into a Vec of RecordBatches.
|
|
pub fn parquet_to_record_batches(data: &[u8]) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
|
|
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::copy_from_slice(data))
|
|
.map_err(|e| e.to_string())?;
|
|
let schema = builder.schema().clone();
|
|
let reader = builder.build().map_err(|e| e.to_string())?;
|
|
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().map_err(|e| e.to_string())?;
|
|
Ok((schema, batches))
|
|
}
|
|
|
|
/// Compute a deterministic fingerprint from an Arrow schema.
|
|
pub fn fingerprint_schema(schema: &SchemaRef) -> SchemaFingerprint {
|
|
let mut hasher = Sha256::new();
|
|
for field in schema.fields() {
|
|
hasher.update(field.name().as_bytes());
|
|
hasher.update(field.data_type().to_string().as_bytes());
|
|
hasher.update(if field.is_nullable() { b"1" } else { b"0" });
|
|
}
|
|
let hash = hasher.finalize();
|
|
SchemaFingerprint(format!("{hash:x}"))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use arrow::array::{Float64Array, Int32Array, StringArray};
|
|
use arrow::datatypes::{DataType, Field, Schema};
|
|
use std::sync::Arc;
|
|
|
|
fn sample_batch() -> RecordBatch {
|
|
let schema = Arc::new(Schema::new(vec![
|
|
Field::new("id", DataType::Int32, false),
|
|
Field::new("name", DataType::Utf8, false),
|
|
Field::new("score", DataType::Float64, true),
|
|
]));
|
|
RecordBatch::try_new(
|
|
schema,
|
|
vec![
|
|
Arc::new(Int32Array::from(vec![1, 2, 3])),
|
|
Arc::new(StringArray::from(vec!["alice", "bob", "carol"])),
|
|
Arc::new(Float64Array::from(vec![9.5, 8.2, 7.8])),
|
|
],
|
|
)
|
|
.unwrap()
|
|
}
|
|
|
|
#[test]
|
|
fn roundtrip_parquet() {
|
|
let batch = sample_batch();
|
|
let parquet_bytes = record_batch_to_parquet(&batch).unwrap();
|
|
let (schema, batches) = parquet_to_record_batches(&parquet_bytes).unwrap();
|
|
assert_eq!(schema.fields().len(), 3);
|
|
assert_eq!(batches.len(), 1);
|
|
assert_eq!(batches[0].num_rows(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn schema_fingerprint_deterministic() {
|
|
let batch = sample_batch();
|
|
let fp1 = fingerprint_schema(&batch.schema());
|
|
let fp2 = fingerprint_schema(&batch.schema());
|
|
assert_eq!(fp1, fp2);
|
|
}
|
|
}
|