profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

105 lines
3.5 KiB
Rust

use proto::lakehouse::{
catalog_service_server::CatalogService,
CreateDatasetRequest, DatasetResponse, GetDatasetByNameRequest,
GetDatasetRequest, ListDatasetsRequest, ListDatasetsResponse,
ObjectRef as ProtoObjectRef,
};
use shared::types::{DatasetId, ObjectRef, SchemaFingerprint};
use tonic::{Request, Response, Status};
use uuid::Uuid;
use crate::registry::Registry;
pub struct CatalogGrpc {
registry: Registry,
}
impl CatalogGrpc {
pub fn new(registry: Registry) -> Self {
Self { registry }
}
}
fn manifest_to_proto(m: &shared::types::DatasetManifest) -> DatasetResponse {
DatasetResponse {
id: m.id.to_string(),
name: m.name.clone(),
schema_fingerprint: m.schema_fingerprint.0.clone(),
objects: m.objects.iter().map(|o| ProtoObjectRef {
bucket: o.bucket.clone(),
key: o.key.clone(),
size_bytes: o.size_bytes,
created_at: o.created_at.to_rfc3339(),
}).collect(),
created_at: m.created_at.to_rfc3339(),
updated_at: m.updated_at.to_rfc3339(),
}
}
#[tonic::async_trait]
impl CatalogService for CatalogGrpc {
async fn create_dataset(
&self,
request: Request<CreateDatasetRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
let now = chrono::Utc::now();
let objects: Vec<ObjectRef> = req.objects.into_iter().map(|o| ObjectRef {
bucket: o.bucket,
key: o.key,
size_bytes: o.size_bytes,
created_at: now,
}).collect();
let manifest = self.registry
.register(req.name, SchemaFingerprint(req.schema_fingerprint), objects)
.await
.map_err(|e| {
// Mirror the HTTP side's 409 mapping: schema drift is a
// precondition failure, not a server error. Keeps gRPC
// and HTTP callers seeing the same diagnostic signal.
if e.contains("different schema") {
Status::failed_precondition(e)
} else {
Status::internal(e)
}
})?;
Ok(Response::new(manifest_to_proto(&manifest)))
}
async fn get_dataset(
&self,
request: Request<GetDatasetRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
let uuid = Uuid::parse_str(&req.id).map_err(|e| Status::invalid_argument(e.to_string()))?;
let id = DatasetId(uuid);
match self.registry.get(&id).await {
Some(m) => Ok(Response::new(manifest_to_proto(&m))),
None => Err(Status::not_found(format!("dataset not found: {}", req.id))),
}
}
async fn get_dataset_by_name(
&self,
request: Request<GetDatasetByNameRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
match self.registry.get_by_name(&req.name).await {
Some(m) => Ok(Response::new(manifest_to_proto(&m))),
None => Err(Status::not_found(format!("dataset not found: {}", req.name))),
}
}
async fn list_datasets(
&self,
_request: Request<ListDatasetsRequest>,
) -> Result<Response<ListDatasetsResponse>, Status> {
let datasets = self.registry.list().await;
let responses = datasets.iter().map(manifest_to_proto).collect();
Ok(Response::new(ListDatasetsResponse { datasets: responses }))
}
}