- storaged: object_store backend (LocalFileSystem), PUT/GET/DELETE/LIST endpoints
- shared: arrow_helpers with Parquet roundtrip + schema fingerprinting (2 tests)
- catalogd: in-memory registry with write-ahead manifest persistence to object storage
- catalogd: POST/GET /datasets, GET /datasets/by-name/{name}
- gateway: wires storaged + catalogd with shared object_store state
- Phase tracker updated: Phase 0 + Phase 1 gates passed
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
31 lines
1.1 KiB
Rust
31 lines
1.1 KiB
Rust
use bytes::Bytes;
|
|
use futures::TryStreamExt;
|
|
use object_store::{ObjectStore, path::Path};
|
|
use std::sync::Arc;
|
|
|
|
pub async fn put(store: &Arc<dyn ObjectStore>, key: &str, data: Bytes) -> Result<(), String> {
|
|
let path = Path::from(key);
|
|
store.put(&path, data.into()).await.map_err(|e| e.to_string())?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn get(store: &Arc<dyn ObjectStore>, key: &str) -> Result<Bytes, String> {
|
|
let path = Path::from(key);
|
|
let result = store.get(&path).await.map_err(|e| e.to_string())?;
|
|
let bytes = result.bytes().await.map_err(|e| e.to_string())?;
|
|
Ok(bytes)
|
|
}
|
|
|
|
pub async fn delete(store: &Arc<dyn ObjectStore>, key: &str) -> Result<(), String> {
|
|
let path = Path::from(key);
|
|
store.delete(&path).await.map_err(|e| e.to_string())?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn list(store: &Arc<dyn ObjectStore>, prefix: Option<&str>) -> Result<Vec<String>, String> {
|
|
let prefix_path = prefix.map(Path::from);
|
|
let stream = store.list(prefix_path.as_ref());
|
|
let items: Vec<_> = stream.try_collect().await.map_err(|e| e.to_string())?;
|
|
Ok(items.into_iter().map(|m| m.location.to_string()).collect())
|
|
}
|