use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use bytes::Bytes; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use sha2::{Digest, Sha256}; use crate::types::SchemaFingerprint; /// Write a RecordBatch to Parquet bytes. pub fn record_batch_to_parquet(batch: &RecordBatch) -> Result { let mut buf = Vec::new(); let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None) .map_err(|e| e.to_string())?; writer.write(batch).map_err(|e| e.to_string())?; writer.close().map_err(|e| e.to_string())?; Ok(Bytes::from(buf)) } /// Read Parquet bytes into a Vec of RecordBatches. pub fn parquet_to_record_batches(data: &[u8]) -> Result<(SchemaRef, Vec), String> { let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::copy_from_slice(data)) .map_err(|e| e.to_string())?; let schema = builder.schema().clone(); let reader = builder.build().map_err(|e| e.to_string())?; let batches: Vec = reader.collect::, _>>().map_err(|e| e.to_string())?; Ok((schema, batches)) } /// Compute a deterministic fingerprint from an Arrow schema. pub fn fingerprint_schema(schema: &SchemaRef) -> SchemaFingerprint { let mut hasher = Sha256::new(); for field in schema.fields() { hasher.update(field.name().as_bytes()); hasher.update(field.data_type().to_string().as_bytes()); hasher.update(if field.is_nullable() { b"1" } else { b"0" }); } let hash = hasher.finalize(); SchemaFingerprint(format!("{hash:x}")) } #[cfg(test)] mod tests { use super::*; use arrow::array::{Float64Array, Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use std::sync::Arc; fn sample_batch() -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("score", DataType::Float64, true), ])); RecordBatch::try_new( schema, vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(StringArray::from(vec!["alice", "bob", "carol"])), Arc::new(Float64Array::from(vec![9.5, 8.2, 7.8])), ], ) .unwrap() } #[test] fn roundtrip_parquet() { let batch = sample_batch(); let parquet_bytes = record_batch_to_parquet(&batch).unwrap(); let (schema, batches) = parquet_to_record_batches(&parquet_bytes).unwrap(); assert_eq!(schema.fields().len(), 3); assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 3); } #[test] fn schema_fingerprint_deterministic() { let batch = sample_batch(); let fp1 = fingerprint_schema(&batch.schema()); let fp2 = fingerprint_schema(&batch.schema()); assert_eq!(fp1, fp2); } }