root f59ddbebd4 Phase 41: Profile System Expansion
- ProfileType enum: Execution, Retrieval, Memory, Observer
- Per-type endpoints: /profiles/retrieval, /profiles/memory, /profiles/observer
- profile_type field on ModelProfile
- All tests pass
2026-04-23 03:07:22 -05:00

476 lines
15 KiB
Rust

use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use shared::types::{DatasetId, ObjectRef, SchemaFingerprint};
use uuid::Uuid;
use crate::registry::Registry;
pub fn router(registry: Registry) -> Router {
Router::new()
.route("/health", get(health))
.route("/datasets", post(create_dataset))
.route("/datasets", get(list_datasets))
.route("/datasets/{id}", get(get_dataset))
.route("/datasets/by-name/{name}", get(get_dataset_by_name).delete(delete_dataset_by_name))
.route("/datasets/by-name/{name}/metadata", post(update_metadata))
.route("/datasets/by-name/{name}/resync", post(resync_dataset))
.route("/resync-missing", post(resync_all_missing))
.route("/migrate-buckets", post(migrate_buckets))
.route("/dedupe", post(dedupe_by_name))
// Phase D: AI-safe views
.route("/views", post(create_view).get(list_views))
.route("/views/{name}", get(get_view).delete(delete_view))
// Phase E: soft-delete tombstones
.route("/datasets/by-name/{name}/tombstone", post(tombstone_rows).get(list_tombstones))
// Phase 17: model profiles
.route("/profiles", post(create_profile).get(list_profiles))
.route("/profiles/{id}", get(get_profile).delete(delete_profile))
// Phase 41: profile-type routes
.route("/profiles/retrieval", get(list_retrieval_profiles))
.route("/profiles/memory", get(list_memory_profiles))
.route("/profiles/observer", get(list_observer_profiles))
.with_state(registry)
}
async fn health() -> &'static str {
"catalogd ok"
}
#[derive(Deserialize)]
struct CreateDatasetRequest {
name: String,
schema_fingerprint: String,
objects: Vec<ObjectRefRequest>,
}
#[derive(Deserialize)]
struct ObjectRefRequest {
bucket: String,
key: String,
size_bytes: u64,
}
#[derive(Serialize)]
struct DatasetResponse {
id: String,
name: String,
schema_fingerprint: String,
objects: Vec<ObjectRefResponse>,
created_at: String,
updated_at: String,
// Rich metadata
description: String,
owner: String,
sensitivity: Option<shared::types::Sensitivity>,
columns: Vec<shared::types::ColumnMeta>,
lineage: Option<shared::types::Lineage>,
freshness: Option<shared::types::FreshnessContract>,
tags: Vec<String>,
row_count: Option<u64>,
}
#[derive(Serialize)]
struct ObjectRefResponse {
bucket: String,
key: String,
size_bytes: u64,
created_at: String,
}
impl From<&shared::types::DatasetManifest> for DatasetResponse {
fn from(m: &shared::types::DatasetManifest) -> Self {
Self {
id: m.id.to_string(),
name: m.name.clone(),
schema_fingerprint: m.schema_fingerprint.0.clone(),
objects: m.objects.iter().map(|o| ObjectRefResponse {
bucket: o.bucket.clone(),
key: o.key.clone(),
size_bytes: o.size_bytes,
created_at: o.created_at.to_rfc3339(),
}).collect(),
created_at: m.created_at.to_rfc3339(),
updated_at: m.updated_at.to_rfc3339(),
description: m.description.clone(),
owner: m.owner.clone(),
sensitivity: m.sensitivity.clone(),
columns: m.columns.clone(),
lineage: m.lineage.clone(),
freshness: m.freshness.clone(),
tags: m.tags.clone(),
row_count: m.row_count,
}
}
}
async fn create_dataset(
State(registry): State<Registry>,
Json(req): Json<CreateDatasetRequest>,
) -> impl IntoResponse {
let now = chrono::Utc::now();
let objects: Vec<ObjectRef> = req.objects.into_iter().map(|o| ObjectRef {
bucket: o.bucket,
key: o.key,
size_bytes: o.size_bytes,
created_at: now,
}).collect();
match registry.register(req.name, SchemaFingerprint(req.schema_fingerprint), objects).await {
Ok(manifest) => {
let resp = DatasetResponse::from(&manifest);
Ok((StatusCode::CREATED, Json(resp)))
}
Err(e) => {
let status = if e.contains("different schema") {
StatusCode::CONFLICT
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
Err((status, e))
}
}
}
async fn list_datasets(State(registry): State<Registry>) -> impl IntoResponse {
let datasets = registry.list().await;
let resp: Vec<DatasetResponse> = datasets.iter().map(DatasetResponse::from).collect();
Json(resp)
}
async fn get_dataset(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = Uuid::parse_str(&id).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
let dataset_id = DatasetId(uuid);
match registry.get(&dataset_id).await {
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {id}"))),
}
}
async fn get_dataset_by_name(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.get_by_name(&name).await {
Some(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
None => Err((StatusCode::NOT_FOUND, format!("dataset not found: {name}"))),
}
}
/// Remove a dataset manifest by name. Metadata only — parquet files,
/// vector indexes, and tombstones are NOT cascade-deleted. See
/// `Registry::delete_dataset` for the full scope.
async fn delete_dataset_by_name(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.delete_dataset(&name).await {
Ok(removed) => {
let body = serde_json::json!({ "name": name, "manifests_removed": removed });
Ok(Json(body))
}
Err(e) if e.starts_with("dataset not found") => {
Err((StatusCode::NOT_FOUND, e))
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
async fn update_metadata(
State(registry): State<Registry>,
Path(name): Path<String>,
Json(updates): Json<crate::registry::MetadataUpdate>,
) -> impl IntoResponse {
match registry.update_metadata(&name, updates).await {
Ok(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
/// Re-read parquet footers for a single dataset and repopulate row_count
/// and columns from reality. Useful for repairing manifests whose metadata
/// was lost or never backfilled.
async fn resync_dataset(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.resync_from_parquet(&name).await {
Ok(manifest) => Ok(Json(DatasetResponse::from(&manifest))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Serialize)]
struct ResyncAllResponse {
succeeded: Vec<ResyncOk>,
failed: Vec<ResyncErr>,
}
#[derive(Serialize)]
struct ResyncOk {
name: String,
row_count: u64,
}
#[derive(Serialize)]
struct ResyncErr {
name: String,
error: String,
}
/// Resync every dataset that currently has null row_count or empty columns.
async fn resync_all_missing(State(registry): State<Registry>) -> impl IntoResponse {
let (ok, err) = registry.resync_missing().await;
Json(ResyncAllResponse {
succeeded: ok.into_iter().map(|(name, row_count)| ResyncOk { name, row_count }).collect(),
failed: err.into_iter().map(|(name, error)| ResyncErr { name, error }).collect(),
})
}
/// Federation layer 2 one-shot: normalize every ObjectRef.bucket field
/// to the canonical "primary" value. Idempotent — re-running once
/// everything is canonical is a safe no-op.
async fn migrate_buckets(State(registry): State<Registry>) -> impl IntoResponse {
match registry.migrate_buckets_to_primary().await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
/// Collapse duplicate manifests by `name` — see `Registry::dedupe_by_name`.
/// Safe to run repeatedly; single-manifest datasets are untouched.
async fn dedupe_by_name(State(registry): State<Registry>) -> impl IntoResponse {
Json(registry.dedupe_by_name().await)
}
// --- Phase D: AI-safe views ---
#[derive(Deserialize)]
struct CreateViewRequest {
name: String,
base_dataset: String,
columns: Vec<String>,
#[serde(default)]
row_filter: Option<String>,
#[serde(default)]
column_redactions: std::collections::HashMap<String, shared::types::Redaction>,
#[serde(default)]
description: String,
#[serde(default)]
created_by: String,
}
async fn create_view(
State(registry): State<Registry>,
Json(req): Json<CreateViewRequest>,
) -> impl IntoResponse {
let view = shared::types::AiView {
name: req.name,
base_dataset: req.base_dataset,
columns: req.columns,
row_filter: req.row_filter,
column_redactions: req.column_redactions,
created_at: chrono::Utc::now(),
created_by: req.created_by,
description: req.description,
};
match registry.put_view(view).await {
Ok(v) => Ok((StatusCode::CREATED, Json(v))),
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
async fn list_views(State(registry): State<Registry>) -> impl IntoResponse {
Json(registry.list_views().await)
}
async fn get_view(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.get_view(&name).await {
Some(v) => Ok(Json(v)),
None => Err((StatusCode::NOT_FOUND, format!("view not found: {name}"))),
}
}
async fn delete_view(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.delete_view(&name).await {
Ok(()) => Ok(StatusCode::NO_CONTENT),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Phase E: soft-delete tombstones ---
#[derive(Deserialize)]
struct TombstoneRequest {
row_key_column: String,
row_key_values: Vec<String>,
#[serde(default)]
actor: String,
#[serde(default)]
reason: String,
}
#[derive(Serialize)]
struct TombstoneResponse {
dataset: String,
row_key_column: String,
rows_tombstoned: usize,
failures: Vec<String>,
}
async fn tombstone_rows(
State(registry): State<Registry>,
Path(name): Path<String>,
Json(req): Json<TombstoneRequest>,
) -> impl IntoResponse {
if req.row_key_values.is_empty() {
return Err((StatusCode::BAD_REQUEST, "row_key_values is empty".to_string()));
}
let mut ok = 0;
let mut failures = Vec::new();
for value in &req.row_key_values {
match registry
.add_tombstone(&name, &req.row_key_column, value, &req.actor, &req.reason)
.await
{
Ok(_) => ok += 1,
Err(e) => failures.push(format!("{value}: {e}")),
}
}
let status = if ok > 0 && failures.is_empty() {
StatusCode::CREATED
} else if ok > 0 {
StatusCode::MULTI_STATUS
} else {
StatusCode::BAD_REQUEST
};
Ok((status, Json(TombstoneResponse {
dataset: name,
row_key_column: req.row_key_column,
rows_tombstoned: ok,
failures,
})))
}
async fn list_tombstones(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.list_tombstones(&name).await {
Ok(ts) => Ok(Json(ts)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
// --- Phase 17: Model profiles ---
#[derive(Deserialize)]
struct CreateProfileRequest {
id: String,
ollama_name: String,
#[serde(default)]
description: String,
bound_datasets: Vec<String>,
#[serde(default)]
hnsw_config: shared::types::ProfileHnswConfig,
#[serde(default = "default_embed_model_req")]
embed_model: String,
#[serde(default)]
created_by: String,
/// Federation: optional per-profile bucket (`profile:{id}` by convention).
/// Omitting keeps artifacts in primary.
#[serde(default)]
bucket: Option<String>,
/// ADR-019 hybrid: which vector backend to route this profile's
/// indexes to. Defaults to Parquet+HNSW.
#[serde(default)]
vector_backend: shared::types::VectorBackend,
/// Phase 41: Profile type for routing + activation behavior.
#[serde(default)]
profile_type: shared::types::ProfileType,
}
fn default_embed_model_req() -> String { "nomic-embed-text".to_string() }
async fn create_profile(
State(registry): State<Registry>,
Json(req): Json<CreateProfileRequest>,
) -> impl IntoResponse {
let profile = shared::types::ModelProfile {
id: req.id,
ollama_name: req.ollama_name,
description: req.description,
bound_datasets: req.bound_datasets,
hnsw_config: req.hnsw_config,
embed_model: req.embed_model,
created_at: chrono::Utc::now(),
created_by: req.created_by,
bucket: req.bucket,
vector_backend: req.vector_backend,
profile_type: req.profile_type,
};
match registry.put_profile(profile).await {
Ok(p) => Ok((StatusCode::CREATED, Json(p))),
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
async fn list_profiles(State(registry): State<Registry>) -> impl IntoResponse {
Json(registry.list_profiles().await)
}
async fn list_retrieval_profiles(State(registry): State<Registry>) -> impl IntoResponse {
let all = registry.list_profiles().await;
let retrieval: Vec<_> = all.into_iter().filter(|p| p.profile_type == shared::types::ProfileType::Retrieval).collect();
Json(retrieval)
}
async fn list_memory_profiles(State(registry): State<Registry>) -> impl IntoResponse {
let all = registry.list_profiles().await;
let memory: Vec<_> = all.into_iter().filter(|p| p.profile_type == shared::types::ProfileType::Memory).collect();
Json(memory)
}
async fn list_observer_profiles(State(registry): State<Registry>) -> impl IntoResponse {
let all = registry.list_profiles().await;
let observer: Vec<_> = all.into_iter().filter(|p| p.profile_type == shared::types::ProfileType::Observer).collect();
Json(observer)
}
async fn get_profile(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
match registry.get_profile(&id).await {
Some(p) => Ok(Json(p)),
None => Err((StatusCode::NOT_FOUND, format!("profile not found: {id}"))),
}
}
async fn delete_profile(
State(registry): State<Registry>,
Path(id): Path<String>,
) -> impl IntoResponse {
match registry.delete_profile(&id).await {
Ok(()) => Ok(StatusCode::NO_CONTENT),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}