lakehouse/crates/shared/src/secrets.rs
root dbe00d018f Federation foundation + HNSW trial system + Postgres streaming + PRD reframe
Four shipped features and a PRD realignment, all measured end-to-end:

HNSW trial system (Phase 15 horizon item → complete)
- vectord: EmbeddingCache, harness (eval sets + brute-force ground truth),
  TrialJournal, parameterized HnswConfig on build_index_with_config
- /vectors/hnsw/trial, /hnsw/trials/{idx}, /hnsw/trials/{idx}/best,
  /hnsw/evals/{name}/autogen, /hnsw/cache/stats
- Measured on resumes_100k_v2 (100K × 768d): brute-force 44ms -> HNSW 873us
  at 100% recall@10. ec=80 es=30 locked as HnswConfig::default()
- Lower ec values trade recall for build time: 20/30 = 0.96 recall in 8s,
  80/30 = 1.00 recall in 230s

Catalog manifest repair
- catalogd: resync_from_parquet reads parquet footers to restore row_count
  and columns on drifted manifests
- POST /catalog/datasets/{name}/resync + POST /catalog/resync-missing
- All 7 staffing tables recovered to PRD-matching 2,469,278 rows

Federation foundation (ADR-017)
- shared::secrets: SecretsProvider trait + FileSecretsProvider (reads
  /etc/lakehouse/secrets.toml, enforces 0600 perms)
- storaged::registry::BucketRegistry — multi-bucket resolution with
  rescue_bucket read fallback and reachability probing
- storaged::error_journal — bucket op failures visible in one HTTP call
- storaged::append_log — write-once batched append pattern (fixes the RMW
  anti-pattern llms3.com calls out; errors and trial journals both use it)
- /storage/buckets, /storage/errors, /storage/bucket-health,
  /storage/errors/{flush,compact}
- Bucket-aware I/O at /storage/buckets/{bucket}/objects/{*key} with
  X-Lakehouse-Rescue-Used observability headers on fallback

Postgres streaming ingest
- ingestd::pg_stream: DSN parser, batched ORDER BY + LIMIT/OFFSET pagination
  into ArrowWriter, lineage redacts password
- POST /ingest/db — verified against live knowledge_base.team_runs
  (586 rows × 13 cols, 6 batches, 196ms end-to-end)

PRD realignment (2026-04-16)
- Dual use case: staffing analytics + local LLM knowledge substrate
- Removed "multi-tenancy (single-owner system)" from non-goals
- Added invariants 8-11: indexes hot-swappable, per-reader profiles,
  trials-as-data, operational failures findable in one HTTP call
- New phases 16 (hot-swap generations), 17 (model profiles + dataset
  bindings), 18 (Lance vs Parquet+sidecar evaluation)
- Known ceilings table documents the 5M vector wall and escape hatches
- ADR-017 (federation), ADR-018 (append-log pattern) added
- EXECUTION_PLAN.md sequences phases B-E with success gates and
  decision rules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 01:50:05 -05:00

145 lines
4.7 KiB
Rust

/// Secrets layer for bucket credentials.
///
/// Credentials never appear in `lakehouse.toml`. Each bucket entry has an
/// opaque `secret_ref` handle, resolved at startup by a `SecretsProvider`.
///
/// MVP ships `FileSecretsProvider` — reads a TOML file at a path given by
/// `LAKEHOUSE_SECRETS` env var, defaulting to `/etc/lakehouse/secrets.toml`.
/// The file should be root-owned, mode 0600, never in git.
///
/// Future providers (Vault, SOPS, OS keyring) implement the same trait.
use async_trait::async_trait;
use serde::Deserialize;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct BucketCredentials {
pub access_key: String,
pub secret_key: String,
}
// Redact credentials when debug-printing — NEVER let these end up in logs.
impl BucketCredentials {
pub fn redacted(&self) -> String {
let prefix: String = self.access_key.chars().take(4).collect();
format!("access_key={}... (redacted)", prefix)
}
}
#[async_trait]
pub trait SecretsProvider: Send + Sync {
/// Resolve a handle to credentials. Returns Err if the handle is unknown.
async fn resolve(&self, handle: &str) -> Result<BucketCredentials, String>;
/// List known handles (for diagnostics only, never values).
async fn list_handles(&self) -> Vec<String>;
}
#[derive(Debug, Deserialize)]
struct RawSecretEntry {
access_key: String,
secret_key: String,
}
/// File-backed secrets provider.
pub struct FileSecretsProvider {
path: PathBuf,
cache: tokio::sync::RwLock<HashMap<String, BucketCredentials>>,
}
impl FileSecretsProvider {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
path: path.into(),
cache: tokio::sync::RwLock::new(HashMap::new()),
}
}
/// Default path: env var `LAKEHOUSE_SECRETS` or `/etc/lakehouse/secrets.toml`.
pub fn from_env() -> Self {
let path = std::env::var("LAKEHOUSE_SECRETS")
.unwrap_or_else(|_| "/etc/lakehouse/secrets.toml".to_string());
Self::new(path)
}
/// Arc wrapper for convenient injection.
pub fn shared(path: impl Into<PathBuf>) -> Arc<dyn SecretsProvider> {
Arc::new(Self::new(path))
}
/// Load and populate the cache. Call once at startup.
pub async fn load(&self) -> Result<usize, String> {
if !self.path.exists() {
tracing::info!(
"secrets file {} not present — running with zero credentials configured",
self.path.display()
);
return Ok(0);
}
check_permissions(&self.path)?;
let content = std::fs::read_to_string(&self.path)
.map_err(|e| format!("read secrets file: {e}"))?;
let raw: HashMap<String, RawSecretEntry> = toml::from_str(&content)
.map_err(|e| format!("parse secrets file: {e}"))?;
let mut cache = self.cache.write().await;
for (handle, entry) in raw {
cache.insert(handle.clone(), BucketCredentials {
access_key: entry.access_key,
secret_key: entry.secret_key,
});
}
let count = cache.len();
tracing::info!("secrets: loaded {count} handles from {}", self.path.display());
Ok(count)
}
}
#[async_trait]
impl SecretsProvider for FileSecretsProvider {
async fn resolve(&self, handle: &str) -> Result<BucketCredentials, String> {
let cache = self.cache.read().await;
cache
.get(handle)
.cloned()
.ok_or_else(|| format!("secret handle not found: {handle}"))
}
async fn list_handles(&self) -> Vec<String> {
self.cache.read().await.keys().cloned().collect()
}
}
/// Reject world-readable secrets files. Fail closed.
fn check_permissions(path: &Path) -> Result<(), String> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let meta = std::fs::metadata(path)
.map_err(|e| format!("stat secrets file: {e}"))?;
let mode = meta.permissions().mode() & 0o777;
if mode & 0o044 != 0 {
return Err(format!(
"secrets file {} is group/world-readable (mode {:o}); chmod 600 to proceed",
path.display(), mode
));
}
}
let _ = path;
Ok(())
}
/// A zero-handles provider for tests / purely-local setups.
pub struct EmptySecretsProvider;
#[async_trait]
impl SecretsProvider for EmptySecretsProvider {
async fn resolve(&self, handle: &str) -> Result<BucketCredentials, String> {
Err(format!("no secrets provider configured (wanted handle '{handle}')"))
}
async fn list_handles(&self) -> Vec<String> { Vec::new() }
}