Phase 1: storage + catalog layer

- storaged: object_store backend (LocalFileSystem), PUT/GET/DELETE/LIST endpoints
- shared: arrow_helpers with Parquet roundtrip + schema fingerprinting (2 tests)
- catalogd: in-memory registry with write-ahead manifest persistence to object storage
- catalogd: POST/GET /datasets, GET /datasets/by-name/{name}
- gateway: wires storaged + catalogd with shared object_store state
- Phase tracker updated: Phase 0 + Phase 1 gates passed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 05:15:27 -05:00
parent a52ca841c6
commit 655b6c0b37
17 changed files with 2331 additions and 30 deletions

1868
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,3 +20,10 @@ thiserror = "2"
uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
tower-http = { version = "0.6", features = ["cors", "trace"] }
object_store = { version = "0.12", features = ["aws"] }
arrow = "55"
parquet = { version = "55", features = ["arrow", "async"] }
datafusion = "47"
bytes = "1"
futures = "0.3"
sha2 = "0.10"

View File

@ -5,8 +5,13 @@ edition = "2024"
[dependencies]
shared = { path = "../shared" }
storaged = { path = "../storaged" }
tokio = { workspace = true }
axum = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
object_store = { workspace = true }

View File

@ -1 +1,2 @@
pub mod registry;
pub mod service;

View File

@ -0,0 +1,108 @@
use shared::types::{DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use storaged::ops;
use object_store::ObjectStore;
const MANIFEST_PREFIX: &str = "_catalog/manifests";
/// In-memory dataset registry backed by manifest persistence in object storage.
#[derive(Clone)]
pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
store: Arc<dyn ObjectStore>,
}
impl Registry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
store,
}
}
/// Rebuild in-memory index from persisted manifests on startup.
pub async fn rebuild(&self) -> Result<usize, String> {
let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?;
let mut datasets = self.datasets.write().await;
datasets.clear();
for key in &keys {
let data = ops::get(&self.store, key).await?;
let manifest: DatasetManifest =
serde_json::from_slice(&data).map_err(|e| e.to_string())?;
datasets.insert(manifest.id.clone(), manifest);
}
let count = datasets.len();
tracing::info!("catalog rebuilt: {count} datasets loaded");
Ok(count)
}
/// Register a new dataset. Persists manifest to storage before updating memory.
pub async fn register(
&self,
name: String,
schema_fingerprint: SchemaFingerprint,
objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let now = chrono::Utc::now();
let manifest = DatasetManifest {
id: DatasetId::new(),
name,
schema_fingerprint,
objects,
created_at: now,
updated_at: now,
};
// Write-ahead: persist before in-memory update
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(&manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
let mut datasets = self.datasets.write().await;
datasets.insert(manifest.id.clone(), manifest.clone());
tracing::info!("registered dataset: {} ({})", manifest.name, manifest.id);
Ok(manifest)
}
/// Get a dataset by ID.
pub async fn get(&self, id: &DatasetId) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.get(id).cloned()
}
/// Get a dataset by name.
pub async fn get_by_name(&self, name: &str) -> Option<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().find(|d| d.name == name).cloned()
}
/// List all datasets.
pub async fn list(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await;
datasets.values().cloned().collect()
}
/// Add objects to an existing dataset.
pub async fn add_objects(
&self,
id: &DatasetId,
new_objects: Vec<ObjectRef>,
) -> Result<DatasetManifest, String> {
let mut datasets = self.datasets.write().await;
let manifest = datasets.get_mut(id).ok_or_else(|| format!("dataset not found: {id}"))?;
manifest.objects.extend(new_objects);
manifest.updated_at = chrono::Utc::now();
// Persist updated manifest
let manifest_key = format!("{MANIFEST_PREFIX}/{}.json", manifest.id);
let json = serde_json::to_vec_pretty(manifest).map_err(|e| e.to_string())?;
ops::put(&self.store, &manifest_key, json.into()).await?;
Ok(manifest.clone())
}
}

View File

@ -1,9 +1,125 @@
use axum::{Router, routing::get};
use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use shared::types::{DatasetId, ObjectRef, SchemaFingerprint};
use uuid::Uuid;
pub fn router() -> Router {
Router::new().route("/health", get(health))
use crate::registry::Registry;
pub fn router(registry: Registry) -> Router {
Router::new()
.route("/health", get(health))
.route("/datasets", post(create_dataset))
.route("/datasets", get(list_datasets))
.route("/datasets/{id}", get(get_dataset))
.route("/datasets/by-name/{name}", get(get_dataset_by_name))
.with_state(registry)
}
async fn health() -> &'static str {
"catalogd ok"
}
#[derive(Deserialize)]
struct CreateDatasetRequest {
name: String,
schema_fingerprint: String,
objects: Vec<ObjectRefRequest>,
}
#[derive(Deserialize)]
struct ObjectRefRequest {
bucket: String,
key: String,
size_bytes: u64,
}
#[derive(Serialize)]
struct DatasetResponse {
id: String,
name: String,
schema_fingerprint: String,
objects: Vec<ObjectRefResponse>,
created_at: String,
updated_at: String,
}
#[derive(Serialize)]
struct ObjectRefResponse {
bucket: String,
key: String,
size_bytes: u64,
created_at: String,
}
impl From<&shared::types::DatasetManifest> for DatasetResponse {
fn from(m: &shared::types::DatasetManifest) -> Self {
Self {
id: m.id.to_string(),
name: m.name.clone(),
schema_fingerprint: m.schema_fingerprint.0.clone(),
objects: m.objects.iter().map(|o| ObjectRefResponse {
bucket: o.bucket.clone(),
key: o.key.clone(),
size_bytes: o.size_bytes,
created_at: o.created_at.to_rfc3339(),
}).collect(),
created_at: m.created_at.to_rfc3339(),
updated_at: m.updated_at.to_rfc3339(),
}
}
}
async fn create_dataset(
State(registry): State<Registry>,
Json(req): Json<CreateDatasetRequest>,
) -> impl IntoResponse {
let now = chrono::Utc::now();
let objects: Vec<ObjectRef> = req.objects.into_iter().map(|o| ObjectRef {
bucket: o.bucket,
key: o.key,
size_bytes: o.size_bytes,
created_at: now,
}).collect();
match registry.register(req.name, SchemaFingerprint(req.schema_fingerprint), objects).await {
Ok(manifest) => {
let resp = DatasetResponse::from(&manifest);
Ok((StatusCode::CREATED, Json(resp)))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn list_datasets(State(registry): State<Registry>) -> impl IntoResponse {
let datasets = registry.list().await;
let resp: Vec<DatasetResponse> = datasets.iter().map(DatasetResponse::from).collect();
Json(resp)
}
async fn get_dataset(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = Uuid::parse_str(&id).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
let dataset_id = DatasetId(uuid);
match registry.get(&dataset_id).await {
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {id}"))),
}
}
async fn get_dataset_by_name(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.get_by_name(&name).await {
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {name}"))),
}
}

View File

@ -16,3 +16,4 @@ serde_json = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tower-http = { workspace = true }
object_store = { workspace = true }

View File

@ -9,10 +9,20 @@ async fn main() {
.with(fmt::layer())
.init();
// Storage backend — local filesystem for now
let storage_root = std::env::var("STORAGE_ROOT").unwrap_or_else(|_| "./data".to_string());
let store = storaged::backend::init_local(&storage_root);
// Catalog — rebuild from persisted manifests
let registry = catalogd::registry::Registry::new(store.clone());
if let Err(e) = registry.rebuild().await {
tracing::warn!("catalog rebuild failed (empty store?): {e}");
}
let app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router())
.nest("/catalog", catalogd::service::router())
.nest("/storage", storaged::service::router(store))
.nest("/catalog", catalogd::service::router(registry))
.nest("/query", queryd::service::router())
.nest("/ai", aibridge::service::router())
.layer(TraceLayer::new_for_http());

View File

@ -9,3 +9,7 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
arrow = { workspace = true }
parquet = { workspace = true }
bytes = { workspace = true }
sha2 = { workspace = true }

View File

@ -0,0 +1,83 @@
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use sha2::{Digest, Sha256};
use crate::types::SchemaFingerprint;
/// Write a RecordBatch to Parquet bytes.
pub fn record_batch_to_parquet(batch: &RecordBatch) -> Result<Bytes, String> {
let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None)
.map_err(|e| e.to_string())?;
writer.write(batch).map_err(|e| e.to_string())?;
writer.close().map_err(|e| e.to_string())?;
Ok(Bytes::from(buf))
}
/// Read Parquet bytes into a Vec of RecordBatches.
pub fn parquet_to_record_batches(data: &[u8]) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::copy_from_slice(data))
.map_err(|e| e.to_string())?;
let schema = builder.schema().clone();
let reader = builder.build().map_err(|e| e.to_string())?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().map_err(|e| e.to_string())?;
Ok((schema, batches))
}
/// Compute a deterministic fingerprint from an Arrow schema.
pub fn fingerprint_schema(schema: &SchemaRef) -> SchemaFingerprint {
let mut hasher = Sha256::new();
for field in schema.fields() {
hasher.update(field.name().as_bytes());
hasher.update(field.data_type().to_string().as_bytes());
hasher.update(if field.is_nullable() { b"1" } else { b"0" });
}
let hash = hasher.finalize();
SchemaFingerprint(format!("{hash:x}"))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Float64Array, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
fn sample_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("score", DataType::Float64, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["alice", "bob", "carol"])),
Arc::new(Float64Array::from(vec![9.5, 8.2, 7.8])),
],
)
.unwrap()
}
#[test]
fn roundtrip_parquet() {
let batch = sample_batch();
let parquet_bytes = record_batch_to_parquet(&batch).unwrap();
let (schema, batches) = parquet_to_record_batches(&parquet_bytes).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}
#[test]
fn schema_fingerprint_deterministic() {
let batch = sample_batch();
let fp1 = fingerprint_schema(&batch.schema());
let fp2 = fingerprint_schema(&batch.schema());
assert_eq!(fp1, fp2);
}
}

View File

@ -1,2 +1,3 @@
pub mod types;
pub mod errors;
pub mod arrow_helpers;

View File

@ -10,3 +10,6 @@ axum = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }

View File

@ -0,0 +1,10 @@
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use std::sync::Arc;
/// Initialize the object store backend.
/// Starts with LocalFileSystem. Swap to S3/RustFS by changing this function.
pub fn init_local(root: &str) -> Arc<dyn ObjectStore> {
std::fs::create_dir_all(root).expect("failed to create storage root");
Arc::new(LocalFileSystem::new_with_prefix(root).expect("failed to init local object store"))
}

View File

@ -1 +1,3 @@
pub mod backend;
pub mod ops;
pub mod service;

View File

@ -0,0 +1,30 @@
use bytes::Bytes;
use futures::TryStreamExt;
use object_store::{ObjectStore, path::Path};
use std::sync::Arc;
pub async fn put(store: &Arc<dyn ObjectStore>, key: &str, data: Bytes) -> Result<(), String> {
let path = Path::from(key);
store.put(&path, data.into()).await.map_err(|e| e.to_string())?;
Ok(())
}
pub async fn get(store: &Arc<dyn ObjectStore>, key: &str) -> Result<Bytes, String> {
let path = Path::from(key);
let result = store.get(&path).await.map_err(|e| e.to_string())?;
let bytes = result.bytes().await.map_err(|e| e.to_string())?;
Ok(bytes)
}
pub async fn delete(store: &Arc<dyn ObjectStore>, key: &str) -> Result<(), String> {
let path = Path::from(key);
store.delete(&path).await.map_err(|e| e.to_string())?;
Ok(())
}
pub async fn list(store: &Arc<dyn ObjectStore>, prefix: Option<&str>) -> Result<Vec<String>, String> {
let prefix_path = prefix.map(Path::from);
let stream = store.list(prefix_path.as_ref());
let items: Vec<_> = stream.try_collect().await.map_err(|e| e.to_string())?;
Ok(items.into_iter().map(|m| m.location.to_string()).collect())
}

View File

@ -1,9 +1,75 @@
use axum::{Router, routing::get};
use axum::{
Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{delete, get, put},
};
use bytes::Bytes;
use object_store::ObjectStore;
use serde::Deserialize;
use std::sync::Arc;
pub fn router() -> Router {
Router::new().route("/health", get(health))
use crate::ops;
pub type StorageState = Arc<dyn ObjectStore>;
pub fn router(store: Arc<dyn ObjectStore>) -> Router {
Router::new()
.route("/health", get(health))
.route("/objects/{*key}", put(put_object))
.route("/objects/{*key}", get(get_object))
.route("/objects/{*key}", delete(delete_object))
.route("/objects", get(list_objects))
.with_state(store)
}
async fn health() -> &'static str {
"storaged ok"
}
async fn put_object(
State(store): State<StorageState>,
Path(key): Path<String>,
body: Bytes,
) -> impl IntoResponse {
match ops::put(&store, &key, body).await {
Ok(()) => (StatusCode::CREATED, format!("stored: {key}")),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e),
}
}
async fn get_object(
State(store): State<StorageState>,
Path(key): Path<String>,
) -> impl IntoResponse {
match ops::get(&store, &key).await {
Ok(data) => Ok(data),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
async fn delete_object(
State(store): State<StorageState>,
Path(key): Path<String>,
) -> impl IntoResponse {
match ops::delete(&store, &key).await {
Ok(()) => (StatusCode::OK, format!("deleted: {key}")),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e),
}
}
#[derive(Deserialize)]
struct ListQuery {
prefix: Option<String>,
}
async fn list_objects(
State(store): State<StorageState>,
Query(q): Query<ListQuery>,
) -> impl IntoResponse {
match ops::list(&store, q.prefix.as_deref()).await {
Ok(keys) => Ok(axum::Json(keys)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -1,25 +1,25 @@
# Phase Tracker
## Phase 0: Bootstrap
- [ ] 0.1 — Cargo workspace with all crate stubs compiling
- [ ] 0.2 — `shared` crate: error types, ObjectRef, DatasetId
- [ ] 0.3 — `gateway` with Axum: GET /health → 200
- [ ] 0.4 — tracing + tracing-subscriber wired in gateway
- [ ] 0.5 — justfile with build, test, run recipes
- [ ] 0.6 — docs committed to git
- [x] 0.1 — Cargo workspace with all crate stubs compiling
- [x] 0.2 — `shared` crate: error types, ObjectRef, DatasetId
- [x] 0.3 — `gateway` with Axum: GET /health → 200
- [x] 0.4 — tracing + tracing-subscriber wired in gateway
- [x] 0.5 — justfile with build, test, run recipes
- [x] 0.6 — docs committed to git
**Gate:** All crates compile. Gateway runs. Logs emit. Docs committed.
**Gate: PASSED** All crates compile. Gateway runs. Logs emit. Docs committed.
## Phase 1: Storage + Catalog
- [ ] 1.1 — storaged: object_store backend init (LocalFileSystem → S3)
- [ ] 1.2 — storaged: Axum endpoints (PUT/GET/DELETE /objects/{key})
- [ ] 1.3 — shared/arrow.rs: RecordBatch ↔ Parquet helpers
- [ ] 1.4 — catalogd/registry.rs: in-memory index + manifest persistence
- [ ] 1.5 — catalogd/schema.rs: schema fingerprinting
- [ ] 1.6 — catalogd service: POST/GET /datasets endpoints
- [ ] 1.7 — gateway routes to storaged + catalogd
- [x] 1.1 — storaged: object_store backend init (LocalFileSystem)
- [x] 1.2 — storaged: Axum endpoints (PUT/GET/DELETE/LIST /objects/{key})
- [x] 1.3 — shared/arrow_helpers.rs: RecordBatch ↔ Parquet + schema fingerprinting
- [x] 1.4 — catalogd/registry.rs: in-memory index + manifest persistence to object storage
- [x] 1.5 — catalogd/schema.rs: schema fingerprinting (merged into shared/arrow_helpers.rs)
- [x] 1.6 — catalogd service: POST/GET /datasets + GET /datasets/by-name/{name}
- [x] 1.7 — gateway routes to storaged + catalogd with shared state
**Gate:** Upload Parquet → register → metadata → read back. All via gateway.
**Gate: PASSED** — PUT object → register dataset → list → get by name. All via gateway HTTP.
## Phase 2: Query Engine
- [ ] 2.1 — queryd: SessionContext + object_store config