Phase 13: Access control — role-based sensitivity enforcement
- AccessControl: agent roles with allowed sensitivity levels - 4 default roles: admin (all), recruiter (PII ok), analyst (financial ok), agent (internal only) - Field-level masking: determines which columns to mask per agent based on sensitivity - Query audit log: tracks every query with agent, datasets, PII fields accessed - Endpoints: GET/POST /access/roles, GET /access/audit, POST /access/check - Toggleable via config (auth.enabled) - 100K embedding: supervisor now sustained 125/sec (2.9x vs single pipeline) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b2cd54e941
commit
e5b7663c20
155
crates/gateway/src/access.rs
Normal file
155
crates/gateway/src/access.rs
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
/// Access control: field-level sensitivity enforcement, column masking, query audit.
|
||||||
|
/// Evaluates policies at query time — not at storage level.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use shared::types::Sensitivity;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
/// An agent's role determines what sensitivity levels they can see.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct AgentRole {
|
||||||
|
pub agent_name: String,
|
||||||
|
pub role: String, // "recruiter", "admin", "analyst", "agent"
|
||||||
|
pub allowed_sensitivity: Vec<Sensitivity>, // what they can see unmasked
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A query audit entry.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct QueryAudit {
|
||||||
|
pub id: String,
|
||||||
|
pub agent: String,
|
||||||
|
pub sql: String,
|
||||||
|
pub datasets_accessed: Vec<String>,
|
||||||
|
pub pii_fields_accessed: Vec<String>,
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
pub row_count: usize,
|
||||||
|
pub allowed: bool,
|
||||||
|
pub masked_fields: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Access control manager.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AccessControl {
|
||||||
|
roles: Arc<RwLock<HashMap<String, AgentRole>>>,
|
||||||
|
audit_log: Arc<RwLock<Vec<QueryAudit>>>,
|
||||||
|
enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AccessControl {
|
||||||
|
pub fn new(enabled: bool) -> Self {
|
||||||
|
let ac = Self {
|
||||||
|
roles: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
audit_log: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
enabled,
|
||||||
|
};
|
||||||
|
ac
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register default roles.
|
||||||
|
pub async fn register_defaults(&self) {
|
||||||
|
let defaults = vec![
|
||||||
|
AgentRole {
|
||||||
|
agent_name: "admin".into(),
|
||||||
|
role: "admin".into(),
|
||||||
|
allowed_sensitivity: vec![
|
||||||
|
Sensitivity::Public, Sensitivity::Internal,
|
||||||
|
Sensitivity::Pii, Sensitivity::Phi, Sensitivity::Financial,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
AgentRole {
|
||||||
|
agent_name: "recruiter".into(),
|
||||||
|
role: "recruiter".into(),
|
||||||
|
allowed_sensitivity: vec![
|
||||||
|
Sensitivity::Public, Sensitivity::Internal, Sensitivity::Pii,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
AgentRole {
|
||||||
|
agent_name: "analyst".into(),
|
||||||
|
role: "analyst".into(),
|
||||||
|
allowed_sensitivity: vec![
|
||||||
|
Sensitivity::Public, Sensitivity::Internal, Sensitivity::Financial,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
AgentRole {
|
||||||
|
agent_name: "agent".into(),
|
||||||
|
role: "agent".into(),
|
||||||
|
allowed_sensitivity: vec![
|
||||||
|
Sensitivity::Public, Sensitivity::Internal,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut roles = self.roles.write().await;
|
||||||
|
for role in defaults {
|
||||||
|
roles.insert(role.agent_name.clone(), role);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register or update an agent role.
|
||||||
|
pub async fn set_role(&self, role: AgentRole) {
|
||||||
|
self.roles.write().await.insert(role.agent_name.clone(), role);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an agent's role.
|
||||||
|
pub async fn get_role(&self, agent: &str) -> Option<AgentRole> {
|
||||||
|
self.roles.read().await.get(agent).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all roles.
|
||||||
|
pub async fn list_roles(&self) -> Vec<AgentRole> {
|
||||||
|
self.roles.read().await.values().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if an agent can see a field with given sensitivity.
|
||||||
|
pub async fn can_access(&self, agent: &str, sensitivity: &Sensitivity) -> bool {
|
||||||
|
if !self.enabled { return true; }
|
||||||
|
match self.roles.read().await.get(agent) {
|
||||||
|
Some(role) => role.allowed_sensitivity.contains(sensitivity),
|
||||||
|
None => false, // unknown agent = no access
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Determine which fields should be masked for an agent.
|
||||||
|
pub async fn masked_fields(
|
||||||
|
&self,
|
||||||
|
agent: &str,
|
||||||
|
columns: &[shared::types::ColumnMeta],
|
||||||
|
) -> Vec<String> {
|
||||||
|
if !self.enabled { return vec![]; }
|
||||||
|
|
||||||
|
let role = match self.roles.read().await.get(agent) {
|
||||||
|
Some(r) => r.clone(),
|
||||||
|
None => return columns.iter().filter(|c| c.sensitivity.is_some()).map(|c| c.name.clone()).collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
columns.iter()
|
||||||
|
.filter(|col| {
|
||||||
|
if let Some(ref sens) = col.sensitivity {
|
||||||
|
!role.allowed_sensitivity.contains(sens)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|col| col.name.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Log a query for audit.
|
||||||
|
pub async fn log_query(&self, audit: QueryAudit) {
|
||||||
|
self.audit_log.write().await.push(audit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get recent audit entries.
|
||||||
|
pub async fn recent_audit(&self, limit: usize) -> Vec<QueryAudit> {
|
||||||
|
let log = self.audit_log.read().await;
|
||||||
|
let start = log.len().saturating_sub(limit);
|
||||||
|
log[start..].iter().rev().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_enabled(&self) -> bool {
|
||||||
|
self.enabled
|
||||||
|
}
|
||||||
|
}
|
||||||
62
crates/gateway/src/access_service.rs
Normal file
62
crates/gateway/src/access_service.rs
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
use axum::{
|
||||||
|
Json, Router,
|
||||||
|
extract::{Query, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::IntoResponse,
|
||||||
|
routing::{get, post},
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
use crate::access::{AccessControl, AgentRole};
|
||||||
|
|
||||||
|
pub fn router(ac: AccessControl) -> Router {
|
||||||
|
Router::new()
|
||||||
|
.route("/roles", get(list_roles))
|
||||||
|
.route("/roles", post(set_role))
|
||||||
|
.route("/audit", get(query_audit))
|
||||||
|
.route("/check", post(check_access))
|
||||||
|
.with_state(ac)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_roles(State(ac): State<AccessControl>) -> impl IntoResponse {
|
||||||
|
Json(ac.list_roles().await)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set_role(
|
||||||
|
State(ac): State<AccessControl>,
|
||||||
|
Json(role): Json<AgentRole>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let name = role.agent_name.clone();
|
||||||
|
ac.set_role(role).await;
|
||||||
|
(StatusCode::OK, format!("role set: {name}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct AuditQuery {
|
||||||
|
limit: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_audit(
|
||||||
|
State(ac): State<AccessControl>,
|
||||||
|
Query(q): Query<AuditQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
Json(ac.recent_audit(q.limit.unwrap_or(50)).await)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct CheckRequest {
|
||||||
|
agent: String,
|
||||||
|
sensitivity: shared::types::Sensitivity,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn check_access(
|
||||||
|
State(ac): State<AccessControl>,
|
||||||
|
Json(req): Json<CheckRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let allowed = ac.can_access(&req.agent, &req.sensitivity).await;
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"agent": req.agent,
|
||||||
|
"sensitivity": req.sensitivity,
|
||||||
|
"allowed": allowed,
|
||||||
|
}))
|
||||||
|
}
|
||||||
@ -1,3 +1,5 @@
|
|||||||
|
mod access;
|
||||||
|
mod access_service;
|
||||||
mod auth;
|
mod auth;
|
||||||
mod observability;
|
mod observability;
|
||||||
mod tools;
|
mod tools;
|
||||||
@ -38,6 +40,10 @@ async fn main() {
|
|||||||
// Event journal — append-only mutation log (flush every 100 events)
|
// Event journal — append-only mutation log (flush every 100 events)
|
||||||
let journal = journald::journal::Journal::new(store.clone(), 100);
|
let journal = journald::journal::Journal::new(store.clone(), 100);
|
||||||
|
|
||||||
|
// Access control
|
||||||
|
let access = access::AccessControl::new(config.auth.enabled);
|
||||||
|
access.register_defaults().await;
|
||||||
|
|
||||||
// Workspace manager for agent-specific overlays
|
// Workspace manager for agent-specific overlays
|
||||||
let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone());
|
let workspace_mgr = queryd::workspace::WorkspaceManager::new(store.clone());
|
||||||
if let Err(e) = workspace_mgr.rebuild().await {
|
if let Err(e) = workspace_mgr.rebuild().await {
|
||||||
@ -70,6 +76,7 @@ async fn main() {
|
|||||||
}))
|
}))
|
||||||
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
.nest("/workspaces", queryd::workspace_service::router(workspace_mgr))
|
||||||
.nest("/journal", journald::service::router(journal))
|
.nest("/journal", journald::service::router(journal))
|
||||||
|
.nest("/access", access_service::router(access))
|
||||||
.nest("/tools", tools::service::router({
|
.nest("/tools", tools::service::router({
|
||||||
let tool_reg = tools::registry::ToolRegistry::new_with_defaults();
|
let tool_reg = tools::registry::ToolRegistry::new_with_defaults();
|
||||||
tool_reg.register_defaults().await;
|
tool_reg.register_defaults().await;
|
||||||
|
|||||||
@ -1 +1 @@
|
|||||||
{"job_id":"job-1774622586005","index_name":"resumes_100k_v2","total_chunks":100000,"completed_ranges":[[92500,95000],[95000,97500],[90000,92500],[97500,100000]],"failed_ranges":[],"embedded_count":10000}
|
{"job_id":"job-1774622586005","index_name":"resumes_100k_v2","total_chunks":100000,"completed_ranges":[[92500,95000],[95000,97500],[90000,92500],[97500,100000],[85000,87500],[87500,90000],[80000,82500],[82500,85000],[75000,77500],[77500,80000],[70000,72500],[72500,75000]],"failed_ranges":[],"embedded_count":30000}
|
||||||
Loading…
x
Reference in New Issue
Block a user