diff --git a/crates/catalogd/src/registry.rs b/crates/catalogd/src/registry.rs index fe9b09e..996c716 100644 --- a/crates/catalogd/src/registry.rs +++ b/crates/catalogd/src/registry.rs @@ -1,6 +1,6 @@ use shared::types::{ - DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint, - ColumnMeta, Lineage, FreshnessContract, RefreshPolicy, Sensitivity, + AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ObjectRef, + RefreshPolicy, SchemaFingerprint, Sensitivity, }; use std::collections::HashMap; use std::sync::Arc; @@ -9,6 +9,14 @@ use tokio::sync::RwLock; use storaged::ops; use object_store::ObjectStore; +/// Make a view name safe for use as an object storage key. +/// Allows letters, digits, `_`, `-`, `.`. Anything else becomes `_`. +fn sanitize_view_name(name: &str) -> String { + name.chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' { c } else { '_' }) + .collect() +} + #[derive(Debug, Clone, Default, serde::Serialize)] pub struct MigrateBucketsReport { pub refs_examined: usize, @@ -34,11 +42,14 @@ pub struct MetadataUpdate { } const MANIFEST_PREFIX: &str = "_catalog/manifests"; +const VIEW_PREFIX: &str = "_catalog/views"; /// In-memory dataset registry backed by manifest persistence in object storage. +/// Also tracks AiViews (Phase D) — safe projections over base datasets. #[derive(Clone)] pub struct Registry { datasets: Arc>>, + views: Arc>>, store: Arc, } @@ -46,11 +57,12 @@ impl Registry { pub fn new(store: Arc) -> Self { Self { datasets: Arc::new(RwLock::new(HashMap::new())), + views: Arc::new(RwLock::new(HashMap::new())), store, } } - /// Rebuild in-memory index from persisted manifests on startup. + /// Rebuild in-memory index from persisted manifests + views on startup. pub async fn rebuild(&self) -> Result { let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?; let mut datasets = self.datasets.write().await; @@ -63,6 +75,29 @@ impl Registry { } let count = datasets.len(); tracing::info!("catalog rebuilt: {count} datasets loaded"); + + // Phase D: load AiView definitions alongside manifests. + let view_keys = ops::list(&self.store, Some(VIEW_PREFIX)).await.unwrap_or_default(); + let mut views = self.views.write().await; + views.clear(); + for key in &view_keys { + if !key.ends_with(".json") { continue; } + let data = match ops::get(&self.store, key).await { + Ok(d) => d, + Err(e) => { + tracing::warn!("view '{key}': read failed: {e}"); + continue; + } + }; + match serde_json::from_slice::(&data) { + Ok(view) => { views.insert(view.name.clone(), view); } + Err(e) => tracing::warn!("view '{key}': parse failed: {e}"), + } + } + if !views.is_empty() { + tracing::info!("catalog: {} views loaded", views.len()); + } + Ok(count) } @@ -354,6 +389,55 @@ impl Registry { Ok(report) } + // --- Phase D: AI-safe views --- + + /// Create or replace a named view. Validates that the base dataset + /// exists and that the column whitelist is non-empty. Persists to + /// `_catalog/views/{name}.json`. + pub async fn put_view(&self, mut view: AiView) -> Result { + if view.name.is_empty() { + return Err("view name is empty".into()); + } + if view.columns.is_empty() { + return Err("view must whitelist at least one column".into()); + } + // Base dataset must exist (read-side will fail anyway, fail-fast here + // so operators don't end up with dangling views). + if self.get_by_name(&view.base_dataset).await.is_none() { + return Err(format!( + "base dataset '{}' not found in catalog", + view.base_dataset + )); + } + if view.created_at.timestamp() == 0 { + view.created_at = chrono::Utc::now(); + } + + let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(&view.name)); + let json = serde_json::to_vec_pretty(&view).map_err(|e| e.to_string())?; + ops::put(&self.store, &key, json.into()).await?; + + let mut views = self.views.write().await; + views.insert(view.name.clone(), view.clone()); + tracing::info!("view registered: {} over '{}'", view.name, view.base_dataset); + Ok(view) + } + + pub async fn get_view(&self, name: &str) -> Option { + self.views.read().await.get(name).cloned() + } + + pub async fn list_views(&self) -> Vec { + self.views.read().await.values().cloned().collect() + } + + pub async fn delete_view(&self, name: &str) -> Result<(), String> { + let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(name)); + ops::delete(&self.store, &key).await?; + self.views.write().await.remove(name); + Ok(()) + } + /// List datasets whose `embedding_stale_since` is set — they need a refresh. pub async fn stale_datasets(&self) -> Vec { let datasets = self.datasets.read().await; diff --git a/crates/catalogd/src/service.rs b/crates/catalogd/src/service.rs index b1aa01b..1817b4f 100644 --- a/crates/catalogd/src/service.rs +++ b/crates/catalogd/src/service.rs @@ -22,6 +22,9 @@ pub fn router(registry: Registry) -> Router { .route("/datasets/by-name/{name}/resync", post(resync_dataset)) .route("/resync-missing", post(resync_all_missing)) .route("/migrate-buckets", post(migrate_buckets)) + // Phase D: AI-safe views + .route("/views", post(create_view).get(list_views)) + .route("/views/{name}", get(get_view).delete(delete_view)) .with_state(registry) } @@ -205,3 +208,64 @@ async fn migrate_buckets(State(registry): State) -> impl IntoResponse Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), } } + +// --- Phase D: AI-safe views --- + +#[derive(Deserialize)] +struct CreateViewRequest { + name: String, + base_dataset: String, + columns: Vec, + #[serde(default)] + row_filter: Option, + #[serde(default)] + column_redactions: std::collections::HashMap, + #[serde(default)] + description: String, + #[serde(default)] + created_by: String, +} + +async fn create_view( + State(registry): State, + Json(req): Json, +) -> impl IntoResponse { + let view = shared::types::AiView { + name: req.name, + base_dataset: req.base_dataset, + columns: req.columns, + row_filter: req.row_filter, + column_redactions: req.column_redactions, + created_at: chrono::Utc::now(), + created_by: req.created_by, + description: req.description, + }; + match registry.put_view(view).await { + Ok(v) => Ok((StatusCode::CREATED, Json(v))), + Err(e) => Err((StatusCode::BAD_REQUEST, e)), + } +} + +async fn list_views(State(registry): State) -> impl IntoResponse { + Json(registry.list_views().await) +} + +async fn get_view( + State(registry): State, + Path(name): Path, +) -> impl IntoResponse { + match registry.get_view(&name).await { + Some(v) => Ok(Json(v)), + None => Err((StatusCode::NOT_FOUND, format!("view not found: {name}"))), + } +} + +async fn delete_view( + State(registry): State, + Path(name): Path, +) -> impl IntoResponse { + match registry.delete_view(&name).await { + Ok(()) => Ok(StatusCode::NO_CONTENT), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/crates/queryd/src/context.rs b/crates/queryd/src/context.rs index ac54c1c..0d899e5 100644 --- a/crates/queryd/src/context.rs +++ b/crates/queryd/src/context.rs @@ -201,6 +201,70 @@ impl QueryEngine { } } + // Phase D: register every AiView as a DataFusion view that wraps + // the base table with the safe projection + filter + redactions. + for view in self.registry.list_views().await { + // Build the SELECT clause with column whitelist and any + // redaction expressions. Each column is either the raw column + // name or an expression aliased back to the column name so + // downstream queries see the same shape. + let select_cols: Vec = view.columns.iter().map(|col| { + match view.column_redactions.get(col) { + None => format!("\"{}\"", col), + Some(shared::types::Redaction::Null) => { + format!("CAST(NULL AS VARCHAR) AS \"{}\"", col) + } + Some(shared::types::Redaction::Hash) => { + // DataFusion has digest('sha256', value) — but column + // type might not be string. Cast first to be safe. + format!("digest(CAST(\"{}\" AS VARCHAR), 'sha256') AS \"{}\"", col, col) + } + Some(shared::types::Redaction::Mask { keep_prefix, keep_suffix }) => { + // Keep first N + last M chars, replace middle with stars. + // CASE NULL through with COALESCE for null-safe behavior. + format!( + "CASE WHEN \"{c}\" IS NULL THEN NULL ELSE \ + concat(\ + substr(CAST(\"{c}\" AS VARCHAR), 1, {p}), \ + repeat('*', greatest(0, length(CAST(\"{c}\" AS VARCHAR)) - {p} - {s})), \ + substr(CAST(\"{c}\" AS VARCHAR), greatest(1, length(CAST(\"{c}\" AS VARCHAR)) - {s} + 1))\ + ) END AS \"{c}\"", + c = col, p = keep_prefix, s = keep_suffix, + ) + } + } + }).collect(); + + let where_clause = view.row_filter + .as_deref() + .map(|f| format!(" WHERE {}", f)) + .unwrap_or_default(); + let view_sql = format!( + "SELECT {} FROM \"{}\"{}", + select_cols.join(", "), + view.base_dataset, + where_clause, + ); + + tracing::debug!("registering AiView '{}': {}", view.name, view_sql); + + let df = match ctx.sql(&view_sql).await { + Ok(df) => df, + Err(e) => { + tracing::warn!("view '{}' SQL invalid, skipping: {e}", view.name); + continue; + } + }; + if let Err(e) = ctx.register_table(view.name.as_str(), df.into_view()) { + let msg = e.to_string(); + if msg.contains("already exists") { + tracing::debug!("skip duplicate view registration: {}", view.name); + } else { + tracing::warn!("view '{}' registration failed: {e}", view.name); + } + } + } + Ok(ctx) } } diff --git a/crates/shared/src/types.rs b/crates/shared/src/types.rs index de34945..512486f 100644 --- a/crates/shared/src/types.rs +++ b/crates/shared/src/types.rs @@ -157,3 +157,59 @@ pub enum RefreshPolicy { impl Default for RefreshPolicy { fn default() -> Self { Self::Manual } } + +/// AI-safe view of a dataset (Phase D). +/// +/// A view is a named projection over a base dataset that: +/// - whitelists the columns visible (everything else is invisible), +/// - optionally applies a row filter (a SQL WHERE clause fragment), +/// - optionally redacts column values (e.g. mask the last 4 of an SSN). +/// +/// Queries that reference the view name see only the safe projection. +/// The base table remains separately accessible by its raw name — this +/// is a *capability surface*, not a data classification fence. Use +/// access control (Phase 13) on top to enforce who can query which view. +/// +/// Stored as JSON at `_catalog/views/{name}.json` in the primary bucket +/// alongside dataset manifests. The view itself doesn't store data — +/// queryd materializes it as a DataFusion view at SessionContext build +/// time. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AiView { + pub name: String, + /// Source dataset name. Must exist in the catalog. + pub base_dataset: String, + /// Whitelisted columns. Empty list is invalid (would expose nothing). + pub columns: Vec, + /// Optional WHERE clause fragment (without the `WHERE` keyword), e.g. + /// `status != 'blocked' AND city = 'Chicago'`. SQL injection here is + /// the operator's problem — only trust this from authenticated admins. + #[serde(default)] + pub row_filter: Option, + /// Per-column redactions: `{column_name -> Redaction}`. + #[serde(default)] + pub column_redactions: std::collections::HashMap, + pub created_at: chrono::DateTime, + #[serde(default)] + pub created_by: String, + #[serde(default)] + pub description: String, +} + +/// How a column's values should be transformed before being returned. +/// `Mask` is the most common — keeps a few visible chars, replaces the +/// rest with `*`. `Hash` returns SHA-256 of the value for join keys you +/// want to preserve referential integrity on without exposing the value. +/// `Null` returns NULL unconditionally. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum Redaction { + Null, + Hash, + Mask { + /// Number of leading characters left visible. + keep_prefix: usize, + /// Number of trailing characters left visible. + keep_suffix: usize, + }, +} diff --git a/docs/PHASES.md b/docs/PHASES.md index 9287274..c4fe636 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -154,6 +154,14 @@ - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. +- [x] Phase D: AI-safe views — 2026-04-16 + - `shared::types::AiView` — name, base_dataset, columns whitelist, optional row_filter, column_redactions + - `shared::types::Redaction` — Null | Hash | Mask { keep_prefix, keep_suffix } + - `Registry::put_view / get_view / list_views / delete_view` persisted to `_catalog/views/{name}.json` + - `queryd::context` registers each view as a DataFusion view with the safe projection + filter + redactions baked into the SELECT + - Endpoints: `POST/GET /catalog/views`, `GET/DELETE /catalog/views/{name}` + - End-to-end on candidates: `candidates_safe` view exposes 8 of 15 columns, masks `candidate_id` (CAN******01), filters out `status='blocked'`. `SELECT * FROM candidates_safe` returns whitelist only; `SELECT email FROM candidates_safe` fails. View survives restart. + - Capability surface — raw `candidates` still accessible by name; Phase 13 access control is the layer that enforces who can query what - [x] Phase C: Decoupled embedding refresh — 2026-04-16 - `DatasetManifest`: `last_embedded_at`, `embedding_stale_since`, `embedding_refresh_policy` (Manual | OnAppend | Scheduled) - `Registry::mark_embeddings_stale` / `clear_embeddings_stale` / `stale_datasets`