root 655b6c0b37 Phase 1: storage + catalog layer
- storaged: object_store backend (LocalFileSystem), PUT/GET/DELETE/LIST endpoints
- shared: arrow_helpers with Parquet roundtrip + schema fingerprinting (2 tests)
- catalogd: in-memory registry with write-ahead manifest persistence to object storage
- catalogd: POST/GET /datasets, GET /datasets/by-name/{name}
- gateway: wires storaged + catalogd with shared object_store state
- Phase tracker updated: Phase 0 + Phase 1 gates passed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 05:15:27 -05:00

76 lines
1.9 KiB
Rust

use axum::{
Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{delete, get, put},
};
use bytes::Bytes;
use object_store::ObjectStore;
use serde::Deserialize;
use std::sync::Arc;
use crate::ops;
pub type StorageState = Arc<dyn ObjectStore>;
pub fn router(store: Arc<dyn ObjectStore>) -> Router {
Router::new()
.route("/health", get(health))
.route("/objects/{*key}", put(put_object))
.route("/objects/{*key}", get(get_object))
.route("/objects/{*key}", delete(delete_object))
.route("/objects", get(list_objects))
.with_state(store)
}
async fn health() -> &'static str {
"storaged ok"
}
async fn put_object(
State(store): State<StorageState>,
Path(key): Path<String>,
body: Bytes,
) -> impl IntoResponse {
match ops::put(&store, &key, body).await {
Ok(()) => (StatusCode::CREATED, format!("stored: {key}")),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e),
}
}
async fn get_object(
State(store): State<StorageState>,
Path(key): Path<String>,
) -> impl IntoResponse {
match ops::get(&store, &key).await {
Ok(data) => Ok(data),
Err(e) => Err((StatusCode::NOT_FOUND, e)),
}
}
async fn delete_object(
State(store): State<StorageState>,
Path(key): Path<String>,
) -> impl IntoResponse {
match ops::delete(&store, &key).await {
Ok(()) => (StatusCode::OK, format!("deleted: {key}")),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e),
}
}
#[derive(Deserialize)]
struct ListQuery {
prefix: Option<String>,
}
async fn list_objects(
State(store): State<StorageState>,
Query(q): Query<ListQuery>,
) -> impl IntoResponse {
match ops::list(&store, q.prefix.as_deref()).await {
Ok(keys) => Ok(axum::Json(keys)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}