Phase D: AI-safe views — capability-surface projections over base data

Implements the llms3.com "AI-safe views" pattern: a named projection
that exposes only whitelisted columns, with optional row filter and
per-column redactions. AI agents (or Phase 13 roles) bind to the view;
they can never accidentally see PII even if they write raw SQL.

Schema (shared::types):
- AiView { name, base_dataset, columns: Vec<String>, row_filter,
           column_redactions: HashMap<String, Redaction>, ... }
- Redaction enum: Null | Hash | Mask { keep_prefix, keep_suffix }

Catalog (catalogd::registry):
- put_view validates base dataset exists + columns non-empty
- Persists JSON at _catalog/views/{name}.json (sanitized name)
- rebuild() loads views alongside dataset manifests on startup

Query layer (queryd::context):
- build_context registers every AiView as a DataFusion view object
- Constructed SELECT applies whitelist projection, WHERE filter, and
  redaction expressions per column
  - Mask: substr(prefix) + repeat('*', mid_len) + substr(suffix)
  - Hash: digest(value, 'sha256')
  - Null: CAST(NULL AS VARCHAR) AS col
- DataFusion handles JOINs/aggregates over the view natively — it's a
  real view, not a query rewrite

HTTP (catalogd::service):
- POST /catalog/views (create)
- GET  /catalog/views (list)
- GET  /catalog/views/{name} (full def)
- DELETE /catalog/views/{name}

End-to-end test on candidates (100K rows, 15 columns):

  candidates_safe view:
    columns: candidate_id, first_name, city, state, vertical,
             skills, years_experience, status
    row_filter: status != 'blocked'
    redaction: candidate_id mask(prefix=3, suffix=2)

  SELECT * FROM candidates_safe LIMIT 5
    -> 8 columns only, candidate_id shown as "CAN******01"
       (PII fields email/phone/last_name absent from result)

  SELECT email FROM candidates_safe
    -> fails (column not in projection)

  SELECT email FROM candidates
    -> succeeds (raw table still accessible by name —
       Phase 13 access control is the gate, not the view itself)

Survives restart — view definitions reload from object storage.

Limits / not in MVP:
- View CANNOT shadow base table by name (DataFusion treats them as
  separate identifiers; access control must restrict raw-table access)
- row_filter is treated as trusted SQL — operators must validate
  before persisting; only authenticated admin path should call put_view
- Redaction expressions assume column is castable to VARCHAR; numeric
  redactions could be misleading (a Hash on Int64 returns a hex string
  that won't equi-join with another hash on the same value type)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-16 09:16:44 -05:00
parent 24f1249a62
commit 09fd446c8d
5 changed files with 279 additions and 3 deletions

View File

@ -1,6 +1,6 @@
use shared::types::{ use shared::types::{
DatasetId, DatasetManifest, ObjectRef, SchemaFingerprint, AiView, ColumnMeta, DatasetId, DatasetManifest, FreshnessContract, Lineage, ObjectRef,
ColumnMeta, Lineage, FreshnessContract, RefreshPolicy, Sensitivity, RefreshPolicy, SchemaFingerprint, Sensitivity,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -9,6 +9,14 @@ use tokio::sync::RwLock;
use storaged::ops; use storaged::ops;
use object_store::ObjectStore; use object_store::ObjectStore;
/// Make a view name safe for use as an object storage key.
/// Allows letters, digits, `_`, `-`, `.`. Anything else becomes `_`.
fn sanitize_view_name(name: &str) -> String {
name.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.' { c } else { '_' })
.collect()
}
#[derive(Debug, Clone, Default, serde::Serialize)] #[derive(Debug, Clone, Default, serde::Serialize)]
pub struct MigrateBucketsReport { pub struct MigrateBucketsReport {
pub refs_examined: usize, pub refs_examined: usize,
@ -34,11 +42,14 @@ pub struct MetadataUpdate {
} }
const MANIFEST_PREFIX: &str = "_catalog/manifests"; const MANIFEST_PREFIX: &str = "_catalog/manifests";
const VIEW_PREFIX: &str = "_catalog/views";
/// In-memory dataset registry backed by manifest persistence in object storage. /// In-memory dataset registry backed by manifest persistence in object storage.
/// Also tracks AiViews (Phase D) — safe projections over base datasets.
#[derive(Clone)] #[derive(Clone)]
pub struct Registry { pub struct Registry {
datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>, datasets: Arc<RwLock<HashMap<DatasetId, DatasetManifest>>>,
views: Arc<RwLock<HashMap<String, AiView>>>,
store: Arc<dyn ObjectStore>, store: Arc<dyn ObjectStore>,
} }
@ -46,11 +57,12 @@ impl Registry {
pub fn new(store: Arc<dyn ObjectStore>) -> Self { pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { Self {
datasets: Arc::new(RwLock::new(HashMap::new())), datasets: Arc::new(RwLock::new(HashMap::new())),
views: Arc::new(RwLock::new(HashMap::new())),
store, store,
} }
} }
/// Rebuild in-memory index from persisted manifests on startup. /// Rebuild in-memory index from persisted manifests + views on startup.
pub async fn rebuild(&self) -> Result<usize, String> { pub async fn rebuild(&self) -> Result<usize, String> {
let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?; let keys = ops::list(&self.store, Some(MANIFEST_PREFIX)).await?;
let mut datasets = self.datasets.write().await; let mut datasets = self.datasets.write().await;
@ -63,6 +75,29 @@ impl Registry {
} }
let count = datasets.len(); let count = datasets.len();
tracing::info!("catalog rebuilt: {count} datasets loaded"); tracing::info!("catalog rebuilt: {count} datasets loaded");
// Phase D: load AiView definitions alongside manifests.
let view_keys = ops::list(&self.store, Some(VIEW_PREFIX)).await.unwrap_or_default();
let mut views = self.views.write().await;
views.clear();
for key in &view_keys {
if !key.ends_with(".json") { continue; }
let data = match ops::get(&self.store, key).await {
Ok(d) => d,
Err(e) => {
tracing::warn!("view '{key}': read failed: {e}");
continue;
}
};
match serde_json::from_slice::<AiView>(&data) {
Ok(view) => { views.insert(view.name.clone(), view); }
Err(e) => tracing::warn!("view '{key}': parse failed: {e}"),
}
}
if !views.is_empty() {
tracing::info!("catalog: {} views loaded", views.len());
}
Ok(count) Ok(count)
} }
@ -354,6 +389,55 @@ impl Registry {
Ok(report) Ok(report)
} }
// --- Phase D: AI-safe views ---
/// Create or replace a named view. Validates that the base dataset
/// exists and that the column whitelist is non-empty. Persists to
/// `_catalog/views/{name}.json`.
pub async fn put_view(&self, mut view: AiView) -> Result<AiView, String> {
if view.name.is_empty() {
return Err("view name is empty".into());
}
if view.columns.is_empty() {
return Err("view must whitelist at least one column".into());
}
// Base dataset must exist (read-side will fail anyway, fail-fast here
// so operators don't end up with dangling views).
if self.get_by_name(&view.base_dataset).await.is_none() {
return Err(format!(
"base dataset '{}' not found in catalog",
view.base_dataset
));
}
if view.created_at.timestamp() == 0 {
view.created_at = chrono::Utc::now();
}
let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(&view.name));
let json = serde_json::to_vec_pretty(&view).map_err(|e| e.to_string())?;
ops::put(&self.store, &key, json.into()).await?;
let mut views = self.views.write().await;
views.insert(view.name.clone(), view.clone());
tracing::info!("view registered: {} over '{}'", view.name, view.base_dataset);
Ok(view)
}
pub async fn get_view(&self, name: &str) -> Option<AiView> {
self.views.read().await.get(name).cloned()
}
pub async fn list_views(&self) -> Vec<AiView> {
self.views.read().await.values().cloned().collect()
}
pub async fn delete_view(&self, name: &str) -> Result<(), String> {
let key = format!("{VIEW_PREFIX}/{}.json", sanitize_view_name(name));
ops::delete(&self.store, &key).await?;
self.views.write().await.remove(name);
Ok(())
}
/// List datasets whose `embedding_stale_since` is set — they need a refresh. /// List datasets whose `embedding_stale_since` is set — they need a refresh.
pub async fn stale_datasets(&self) -> Vec<DatasetManifest> { pub async fn stale_datasets(&self) -> Vec<DatasetManifest> {
let datasets = self.datasets.read().await; let datasets = self.datasets.read().await;

View File

@ -22,6 +22,9 @@ pub fn router(registry: Registry) -> Router {
.route("/datasets/by-name/{name}/resync", post(resync_dataset)) .route("/datasets/by-name/{name}/resync", post(resync_dataset))
.route("/resync-missing", post(resync_all_missing)) .route("/resync-missing", post(resync_all_missing))
.route("/migrate-buckets", post(migrate_buckets)) .route("/migrate-buckets", post(migrate_buckets))
// Phase D: AI-safe views
.route("/views", post(create_view).get(list_views))
.route("/views/{name}", get(get_view).delete(delete_view))
.with_state(registry) .with_state(registry)
} }
@ -205,3 +208,64 @@ async fn migrate_buckets(State(registry): State<Registry>) -> impl IntoResponse
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
} }
} }
// --- Phase D: AI-safe views ---
#[derive(Deserialize)]
struct CreateViewRequest {
name: String,
base_dataset: String,
columns: Vec<String>,
#[serde(default)]
row_filter: Option<String>,
#[serde(default)]
column_redactions: std::collections::HashMap<String, shared::types::Redaction>,
#[serde(default)]
description: String,
#[serde(default)]
created_by: String,
}
async fn create_view(
State(registry): State<Registry>,
Json(req): Json<CreateViewRequest>,
) -> impl IntoResponse {
let view = shared::types::AiView {
name: req.name,
base_dataset: req.base_dataset,
columns: req.columns,
row_filter: req.row_filter,
column_redactions: req.column_redactions,
created_at: chrono::Utc::now(),
created_by: req.created_by,
description: req.description,
};
match registry.put_view(view).await {
Ok(v) => Ok((StatusCode::CREATED, Json(v))),
Err(e) => Err((StatusCode::BAD_REQUEST, e)),
}
}
async fn list_views(State(registry): State<Registry>) -> impl IntoResponse {
Json(registry.list_views().await)
}
async fn get_view(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.get_view(&name).await {
Some(v) => Ok(Json(v)),
None => Err((StatusCode::NOT_FOUND, format!("view not found: {name}"))),
}
}
async fn delete_view(
State(registry): State<Registry>,
Path(name): Path<String>,
) -> impl IntoResponse {
match registry.delete_view(&name).await {
Ok(()) => Ok(StatusCode::NO_CONTENT),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -201,6 +201,70 @@ impl QueryEngine {
} }
} }
// Phase D: register every AiView as a DataFusion view that wraps
// the base table with the safe projection + filter + redactions.
for view in self.registry.list_views().await {
// Build the SELECT clause with column whitelist and any
// redaction expressions. Each column is either the raw column
// name or an expression aliased back to the column name so
// downstream queries see the same shape.
let select_cols: Vec<String> = view.columns.iter().map(|col| {
match view.column_redactions.get(col) {
None => format!("\"{}\"", col),
Some(shared::types::Redaction::Null) => {
format!("CAST(NULL AS VARCHAR) AS \"{}\"", col)
}
Some(shared::types::Redaction::Hash) => {
// DataFusion has digest('sha256', value) — but column
// type might not be string. Cast first to be safe.
format!("digest(CAST(\"{}\" AS VARCHAR), 'sha256') AS \"{}\"", col, col)
}
Some(shared::types::Redaction::Mask { keep_prefix, keep_suffix }) => {
// Keep first N + last M chars, replace middle with stars.
// CASE NULL through with COALESCE for null-safe behavior.
format!(
"CASE WHEN \"{c}\" IS NULL THEN NULL ELSE \
concat(\
substr(CAST(\"{c}\" AS VARCHAR), 1, {p}), \
repeat('*', greatest(0, length(CAST(\"{c}\" AS VARCHAR)) - {p} - {s})), \
substr(CAST(\"{c}\" AS VARCHAR), greatest(1, length(CAST(\"{c}\" AS VARCHAR)) - {s} + 1))\
) END AS \"{c}\"",
c = col, p = keep_prefix, s = keep_suffix,
)
}
}
}).collect();
let where_clause = view.row_filter
.as_deref()
.map(|f| format!(" WHERE {}", f))
.unwrap_or_default();
let view_sql = format!(
"SELECT {} FROM \"{}\"{}",
select_cols.join(", "),
view.base_dataset,
where_clause,
);
tracing::debug!("registering AiView '{}': {}", view.name, view_sql);
let df = match ctx.sql(&view_sql).await {
Ok(df) => df,
Err(e) => {
tracing::warn!("view '{}' SQL invalid, skipping: {e}", view.name);
continue;
}
};
if let Err(e) = ctx.register_table(view.name.as_str(), df.into_view()) {
let msg = e.to_string();
if msg.contains("already exists") {
tracing::debug!("skip duplicate view registration: {}", view.name);
} else {
tracing::warn!("view '{}' registration failed: {e}", view.name);
}
}
}
Ok(ctx) Ok(ctx)
} }
} }

View File

@ -157,3 +157,59 @@ pub enum RefreshPolicy {
impl Default for RefreshPolicy { impl Default for RefreshPolicy {
fn default() -> Self { Self::Manual } fn default() -> Self { Self::Manual }
} }
/// AI-safe view of a dataset (Phase D).
///
/// A view is a named projection over a base dataset that:
/// - whitelists the columns visible (everything else is invisible),
/// - optionally applies a row filter (a SQL WHERE clause fragment),
/// - optionally redacts column values (e.g. mask the last 4 of an SSN).
///
/// Queries that reference the view name see only the safe projection.
/// The base table remains separately accessible by its raw name — this
/// is a *capability surface*, not a data classification fence. Use
/// access control (Phase 13) on top to enforce who can query which view.
///
/// Stored as JSON at `_catalog/views/{name}.json` in the primary bucket
/// alongside dataset manifests. The view itself doesn't store data —
/// queryd materializes it as a DataFusion view at SessionContext build
/// time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AiView {
pub name: String,
/// Source dataset name. Must exist in the catalog.
pub base_dataset: String,
/// Whitelisted columns. Empty list is invalid (would expose nothing).
pub columns: Vec<String>,
/// Optional WHERE clause fragment (without the `WHERE` keyword), e.g.
/// `status != 'blocked' AND city = 'Chicago'`. SQL injection here is
/// the operator's problem — only trust this from authenticated admins.
#[serde(default)]
pub row_filter: Option<String>,
/// Per-column redactions: `{column_name -> Redaction}`.
#[serde(default)]
pub column_redactions: std::collections::HashMap<String, Redaction>,
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(default)]
pub created_by: String,
#[serde(default)]
pub description: String,
}
/// How a column's values should be transformed before being returned.
/// `Mask` is the most common — keeps a few visible chars, replaces the
/// rest with `*`. `Hash` returns SHA-256 of the value for join keys you
/// want to preserve referential integrity on without exposing the value.
/// `Null` returns NULL unconditionally.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Redaction {
Null,
Hash,
Mask {
/// Number of leading characters left visible.
keep_prefix: usize,
/// Number of trailing characters left visible.
keep_suffix: usize,
},
}

View File

@ -154,6 +154,14 @@
- `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack - `crates/lance-bench` standalone pilot (Lance 4.0) avoids DataFusion/Arrow version conflict with main stack
- 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard - 8-dimension benchmark on resumes_100k_v2 — see docs/ADR-019-vector-storage.md for scorecard
- Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling. - Decision: hybrid architecture. Parquet+HNSW stays primary (2.55× faster search at 100K in-RAM). Lance added as per-profile second backend for random access (112× faster), append (0.08s vs full rewrite), hot-swap (14× faster index builds), and scale past 5M RAM ceiling.
- [x] Phase D: AI-safe views — 2026-04-16
- `shared::types::AiView` — name, base_dataset, columns whitelist, optional row_filter, column_redactions
- `shared::types::Redaction` — Null | Hash | Mask { keep_prefix, keep_suffix }
- `Registry::put_view / get_view / list_views / delete_view` persisted to `_catalog/views/{name}.json`
- `queryd::context` registers each view as a DataFusion view with the safe projection + filter + redactions baked into the SELECT
- Endpoints: `POST/GET /catalog/views`, `GET/DELETE /catalog/views/{name}`
- End-to-end on candidates: `candidates_safe` view exposes 8 of 15 columns, masks `candidate_id` (CAN******01), filters out `status='blocked'`. `SELECT * FROM candidates_safe` returns whitelist only; `SELECT email FROM candidates_safe` fails. View survives restart.
- Capability surface — raw `candidates` still accessible by name; Phase 13 access control is the layer that enforces who can query what
- [x] Phase C: Decoupled embedding refresh — 2026-04-16 - [x] Phase C: Decoupled embedding refresh — 2026-04-16
- `DatasetManifest`: `last_embedded_at`, `embedding_stale_since`, `embedding_refresh_policy` (Manual | OnAppend | Scheduled) - `DatasetManifest`: `last_embedded_at`, `embedding_stale_since`, `embedding_refresh_policy` (Manual | OnAppend | Scheduled)
- `Registry::mark_embeddings_stale` / `clear_embeddings_stale` / `stale_datasets` - `Registry::mark_embeddings_stale` / `clear_embeddings_stale` / `stale_datasets`