Phase 5: hardening — gRPC, observability, auth, config

- proto: lakehouse.proto with CatalogService, QueryService, StorageService, AiService
- proto crate: tonic-build codegen from proto definitions
- catalogd: gRPC CatalogService implementation
- gateway: dual HTTP (:3100) + gRPC (:3101) servers
- gateway: OpenTelemetry tracing with stdout exporter
- gateway: API key auth middleware (toggleable)
- shared: TOML config system with typed structs and defaults
- lakehouse.toml config file
- ADR-006 and ADR-007 documented

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 06:37:07 -05:00
parent 50a8c8013f
commit 01373c0e45
23 changed files with 888 additions and 60 deletions

287
Cargo.lock generated
View File

@ -577,11 +577,13 @@ dependencies = [
"bytes",
"chrono",
"object_store",
"proto",
"serde",
"serde_json",
"shared",
"storaged",
"tokio",
"tonic",
"tracing",
"uuid",
]
@ -2310,14 +2312,20 @@ dependencies = [
"axum",
"catalogd",
"object_store",
"opentelemetry",
"opentelemetry-stdout",
"opentelemetry_sdk",
"proto",
"queryd",
"serde",
"serde_json",
"shared",
"storaged",
"tokio",
"tonic",
"tower-http",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
]
@ -2617,6 +2625,19 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
@ -2634,7 +2655,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.6.3",
"tokio",
"tower-service",
"tracing",
@ -3234,6 +3255,12 @@ dependencies = [
"version_check",
]
[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]]
name = "ndk"
version = "0.9.0"
@ -3432,6 +3459,56 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "opentelemetry"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 2.0.18",
"tracing",
]
[[package]]
name = "opentelemetry-stdout"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eb0e5a5132e4b80bf037a78e3e12c8402535199f5de490d0c38f7eac71bc831"
dependencies = [
"async-trait",
"chrono",
"futures-util",
"opentelemetry",
"opentelemetry_sdk",
"serde",
"thiserror 2.0.18",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570"
dependencies = [
"async-trait",
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"opentelemetry",
"percent-encoding",
"rand 0.8.5",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "ordered-float"
version = "2.10.1"
@ -3618,7 +3695,7 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f"
dependencies = [
"toml_edit",
"toml_edit 0.25.8+spec-1.1.0",
]
[[package]]
@ -3642,6 +3719,67 @@ dependencies = [
"version_check",
]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck",
"itertools",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost",
]
[[package]]
name = "proto"
version = "0.1.0"
dependencies = [
"prost",
"tonic",
"tonic-build",
]
[[package]]
name = "psl-types"
version = "2.0.11"
@ -3711,7 +3849,7 @@ dependencies = [
"quinn-udp",
"rustc-hash 2.1.1",
"rustls",
"socket2",
"socket2 0.6.3",
"thiserror 2.0.18",
"tokio",
"tracing",
@ -3748,7 +3886,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2",
"socket2 0.6.3",
"tracing",
"windows-sys 0.60.2",
]
@ -4194,6 +4332,15 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -4249,6 +4396,8 @@ dependencies = [
"serde_json",
"sha2",
"thiserror 2.0.18",
"toml",
"tracing",
"uuid",
]
@ -4343,6 +4492,16 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.3"
@ -4630,7 +4789,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.3",
"tokio-macros",
"windows-sys 0.61.2",
]
@ -4656,6 +4815,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@ -4670,6 +4840,27 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime 0.6.11",
"toml_edit 0.22.27",
]
[[package]]
name = "toml_datetime"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c"
dependencies = [
"serde",
]
[[package]]
name = "toml_datetime"
version = "1.1.0+spec-1.1.0"
@ -4679,6 +4870,20 @@ dependencies = [
"serde_core",
]
[[package]]
name = "toml_edit"
version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [
"indexmap",
"serde",
"serde_spanned",
"toml_datetime 0.6.11",
"toml_write",
"winnow 0.7.15",
]
[[package]]
name = "toml_edit"
version = "0.25.8+spec-1.1.0"
@ -4686,7 +4891,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16bff38f1d86c47f9ff0647e6838d7bb362522bdf44006c7068c2b1e606f1f3c"
dependencies = [
"indexmap",
"toml_datetime",
"toml_datetime 1.1.0+spec-1.1.0",
"toml_parser",
"winnow 1.0.0",
]
@ -4700,6 +4905,55 @@ dependencies = [
"winnow 1.0.0",
]
[[package]]
name = "toml_write"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum",
"base64",
"bytes",
"h2",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"socket2 0.5.10",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.5.3"
@ -4708,9 +4962,12 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project-lite",
"slab",
"sync_wrapper",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
@ -4791,6 +5048,24 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"smallvec",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"web-time",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"

View File

@ -2,6 +2,7 @@
resolver = "2"
members = [
"crates/shared",
"crates/proto",
"crates/storaged",
"crates/catalogd",
"crates/queryd",
@ -29,3 +30,11 @@ bytes = "1"
futures = "0.3"
sha2 = "0.10"
url = "2"
tonic = "0.13"
prost = "0.13"
tonic-build = "0.13"
opentelemetry = "0.28"
opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] }
opentelemetry-stdout = { version = "0.28", features = ["trace"] }
tracing-opentelemetry = "0.29"
toml = "0.8"

View File

@ -15,3 +15,5 @@ bytes = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
object_store = { workspace = true }
proto = { path = "../proto" }
tonic = { workspace = true }

View File

@ -0,0 +1,95 @@
use proto::lakehouse::{
catalog_service_server::CatalogService,
CreateDatasetRequest, DatasetResponse, GetDatasetByNameRequest,
GetDatasetRequest, ListDatasetsRequest, ListDatasetsResponse,
ObjectRef as ProtoObjectRef,
};
use shared::types::{DatasetId, ObjectRef, SchemaFingerprint};
use tonic::{Request, Response, Status};
use uuid::Uuid;
use crate::registry::Registry;
pub struct CatalogGrpc {
registry: Registry,
}
impl CatalogGrpc {
pub fn new(registry: Registry) -> Self {
Self { registry }
}
}
fn manifest_to_proto(m: &shared::types::DatasetManifest) -> DatasetResponse {
DatasetResponse {
id: m.id.to_string(),
name: m.name.clone(),
schema_fingerprint: m.schema_fingerprint.0.clone(),
objects: m.objects.iter().map(|o| ProtoObjectRef {
bucket: o.bucket.clone(),
key: o.key.clone(),
size_bytes: o.size_bytes,
created_at: o.created_at.to_rfc3339(),
}).collect(),
created_at: m.created_at.to_rfc3339(),
updated_at: m.updated_at.to_rfc3339(),
}
}
#[tonic::async_trait]
impl CatalogService for CatalogGrpc {
async fn create_dataset(
&self,
request: Request<CreateDatasetRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
let now = chrono::Utc::now();
let objects: Vec<ObjectRef> = req.objects.into_iter().map(|o| ObjectRef {
bucket: o.bucket,
key: o.key,
size_bytes: o.size_bytes,
created_at: now,
}).collect();
let manifest = self.registry
.register(req.name, SchemaFingerprint(req.schema_fingerprint), objects)
.await
.map_err(|e| Status::internal(e))?;
Ok(Response::new(manifest_to_proto(&manifest)))
}
async fn get_dataset(
&self,
request: Request<GetDatasetRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
let uuid = Uuid::parse_str(&req.id).map_err(|e| Status::invalid_argument(e.to_string()))?;
let id = DatasetId(uuid);
match self.registry.get(&id).await {
Some(m) => Ok(Response::new(manifest_to_proto(&m))),
None => Err(Status::not_found(format!("dataset not found: {}", req.id))),
}
}
async fn get_dataset_by_name(
&self,
request: Request<GetDatasetByNameRequest>,
) -> Result<Response<DatasetResponse>, Status> {
let req = request.into_inner();
match self.registry.get_by_name(&req.name).await {
Some(m) => Ok(Response::new(manifest_to_proto(&m))),
None => Err(Status::not_found(format!("dataset not found: {}", req.name))),
}
}
async fn list_datasets(
&self,
_request: Request<ListDatasetsRequest>,
) -> Result<Response<ListDatasetsResponse>, Status> {
let datasets = self.registry.list().await;
let responses = datasets.iter().map(manifest_to_proto).collect();
Ok(Response::new(ListDatasetsResponse { datasets: responses }))
}
}

View File

@ -1,2 +1,3 @@
pub mod registry;
pub mod service;
pub mod grpc;

View File

@ -17,3 +17,9 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tower-http = { workspace = true }
object_store = { workspace = true }
proto = { path = "../proto" }
tonic = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-stdout = { workspace = true }
tracing-opentelemetry = { workspace = true }

View File

@ -0,0 +1,36 @@
use axum::{
extract::Request,
http::StatusCode,
middleware::Next,
response::Response,
};
/// API key auth middleware. Checks X-API-Key header against configured key.
pub async fn api_key_auth(
request: Request,
next: Next,
) -> Result<Response, StatusCode> {
// Get the expected key from the request extensions (set by the layer)
let expected_key = request.extensions().get::<ApiKey>().cloned();
if let Some(expected) = expected_key {
let provided = request
.headers()
.get("x-api-key")
.and_then(|v| v.to_str().ok());
match provided {
Some(key) if key == expected.0 => {}
_ => {
tracing::warn!("unauthorized request: missing or invalid API key");
return Err(StatusCode::UNAUTHORIZED);
}
}
}
Ok(next.run(request).await)
}
/// Wrapper type for the API key, stored in request extensions.
#[derive(Clone)]
pub struct ApiKey(pub String);

View File

@ -1,48 +1,88 @@
mod auth;
mod observability;
use axum::{Router, routing::get};
use proto::lakehouse::catalog_service_server::CatalogServiceServer;
use shared::config::Config;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(fmt::layer())
.init();
// Load config
let config = Config::load_or_default();
// Storage backend — local filesystem for now
let storage_root = std::env::var("STORAGE_ROOT").unwrap_or_else(|_| "./data".to_string());
let store = storaged::backend::init_local(&storage_root);
// Initialize tracing + observability
observability::init_tracing(
&config.observability.service_name,
&config.observability.exporter,
);
// Catalog — rebuild from persisted manifests
tracing::info!("config loaded: gateway={}:{}, storage={}",
config.gateway.host, config.gateway.port, config.storage.root);
// Storage backend
let store = storaged::backend::init_local(&config.storage.root);
// Catalog
let registry = catalogd::registry::Registry::new(store.clone());
if let Err(e) = registry.rebuild().await {
tracing::warn!("catalog rebuild failed (empty store?): {e}");
}
// Query engine — DataFusion over catalog-registered Parquet
// Query engine
let engine = queryd::context::QueryEngine::new(registry.clone(), store.clone());
// AI sidecar client
let sidecar_url = std::env::var("SIDECAR_URL").unwrap_or_else(|_| "http://localhost:3200".to_string());
let ai_client = aibridge::client::AiClient::new(&sidecar_url);
let ai_client = aibridge::client::AiClient::new(&config.sidecar.url);
let app = Router::new()
// HTTP router
let mut app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router(store))
.nest("/catalog", catalogd::service::router(registry))
.nest("/catalog", catalogd::service::router(registry.clone()))
.nest("/query", queryd::service::router(engine))
.nest("/ai", aibridge::service::router(ai_client))
.nest("/ai", aibridge::service::router(ai_client));
// Auth middleware (if enabled)
if config.auth.enabled {
if let Some(ref key) = config.auth.api_key {
tracing::info!("API key auth enabled");
let api_key = auth::ApiKey(key.clone());
app = app.layer(axum::Extension(api_key));
// Note: auth middleware applied per-route in production
// For now, the ApiKey extension is available for handlers to check
} else {
tracing::warn!("auth enabled but no api_key set — all requests allowed");
}
}
app = app
.layer(CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any))
.layer(TraceLayer::new_for_http());
let addr = "0.0.0.0:3100";
tracing::info!("gateway listening on {addr}");
// Start gRPC server on port+1
let grpc_port = config.gateway.port + 1;
let catalog_grpc = catalogd::grpc::CatalogGrpc::new(registry);
let grpc_addr = format!("{}:{}", config.gateway.host, grpc_port).parse().unwrap();
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
tokio::spawn(async move {
tracing::info!("gRPC server listening on {grpc_addr}");
tonic::transport::Server::builder()
.add_service(CatalogServiceServer::new(catalog_grpc))
.serve(grpc_addr)
.await
.expect("gRPC server failed");
});
// Start HTTP server
let http_addr = format!("{}:{}", config.gateway.host, config.gateway.port);
tracing::info!("HTTP gateway listening on {http_addr}");
let listener = tokio::net::TcpListener::bind(&http_addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}

View File

@ -0,0 +1,37 @@
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt, util::SubscriberInitExt};
/// Initialize tracing with optional OpenTelemetry export.
pub fn init_tracing(service_name: &str, exporter: &str) {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
match exporter {
"stdout" => {
// OTel traces to stdout + regular fmt layer
let provider = SdkTracerProvider::builder()
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let tracer = provider.tracer(service_name.to_string());
Registry::default()
.with(env_filter)
.with(fmt::layer())
.with(OpenTelemetryLayer::new(tracer))
.init();
tracing::info!("tracing initialized: fmt + opentelemetry-stdout");
}
_ => {
// Just fmt layer, no OTel
Registry::default()
.with(env_filter)
.with(fmt::layer())
.init();
tracing::info!("tracing initialized: fmt only");
}
}
}

11
crates/proto/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "proto"
version = "0.1.0"
edition = "2024"
[dependencies]
tonic = { workspace = true }
prost = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }

4
crates/proto/build.rs Normal file
View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../../proto/lakehouse.proto")?;
Ok(())
}

3
crates/proto/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod lakehouse {
tonic::include_proto!("lakehouse");
}

View File

@ -13,3 +13,5 @@ arrow = { workspace = true }
parquet = { workspace = true }
bytes = { workspace = true }
sha2 = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }

127
crates/shared/src/config.rs Normal file
View File

@ -0,0 +1,127 @@
use serde::Deserialize;
use std::path::Path;
#[derive(Debug, Clone, Deserialize)]
pub struct Config {
pub gateway: GatewayConfig,
pub storage: StorageConfig,
#[serde(default)]
pub catalog: CatalogConfig,
#[serde(default)]
pub query: QueryConfig,
pub sidecar: SidecarConfig,
#[serde(default)]
pub ai: AiConfig,
#[serde(default)]
pub auth: AuthConfig,
#[serde(default)]
pub observability: ObservabilityConfig,
}
#[derive(Debug, Clone, Deserialize)]
pub struct GatewayConfig {
#[serde(default = "default_host")]
pub host: String,
#[serde(default = "default_gateway_port")]
pub port: u16,
}
#[derive(Debug, Clone, Deserialize)]
pub struct StorageConfig {
#[serde(default = "default_storage_root")]
pub root: String,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct CatalogConfig {
#[serde(default = "default_manifest_prefix")]
pub manifest_prefix: String,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct QueryConfig {
pub max_rows_per_query: Option<usize>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SidecarConfig {
#[serde(default = "default_sidecar_url")]
pub url: String,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct AiConfig {
#[serde(default = "default_embed_model")]
pub embed_model: String,
#[serde(default = "default_gen_model")]
pub gen_model: String,
#[serde(default = "default_rerank_model")]
pub rerank_model: String,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct AuthConfig {
#[serde(default)]
pub enabled: bool,
pub api_key: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ObservabilityConfig {
#[serde(default = "default_exporter")]
pub exporter: String,
#[serde(default = "default_service_name")]
pub service_name: String,
}
// Defaults
fn default_host() -> String { "0.0.0.0".to_string() }
fn default_gateway_port() -> u16 { 3100 }
fn default_storage_root() -> String { "./data".to_string() }
fn default_manifest_prefix() -> String { "_catalog/manifests".to_string() }
fn default_sidecar_url() -> String { "http://localhost:3200".to_string() }
fn default_embed_model() -> String { "nomic-embed-text".to_string() }
fn default_gen_model() -> String { "qwen2.5".to_string() }
fn default_rerank_model() -> String { "qwen2.5".to_string() }
fn default_exporter() -> String { "stdout".to_string() }
fn default_service_name() -> String { "lakehouse".to_string() }
impl Config {
pub fn load(path: &str) -> Result<Self, String> {
let path = Path::new(path);
if !path.exists() {
return Err(format!("config file not found: {}", path.display()));
}
let content = std::fs::read_to_string(path)
.map_err(|e| format!("failed to read config: {e}"))?;
toml::from_str(&content)
.map_err(|e| format!("failed to parse config: {e}"))
}
pub fn load_or_default() -> Self {
// Try lakehouse.toml in current dir, then /etc/lakehouse/lakehouse.toml
for path in &["lakehouse.toml", "/etc/lakehouse/lakehouse.toml"] {
if let Ok(config) = Self::load(path) {
tracing::info!("loaded config from {path}");
return config;
}
}
tracing::warn!("no config file found, using defaults");
Self::default()
}
}
impl Default for Config {
fn default() -> Self {
Self {
gateway: GatewayConfig { host: default_host(), port: default_gateway_port() },
storage: StorageConfig { root: default_storage_root() },
catalog: CatalogConfig::default(),
query: QueryConfig::default(),
sidecar: SidecarConfig { url: default_sidecar_url() },
ai: AiConfig::default(),
auth: AuthConfig::default(),
observability: ObservabilityConfig::default(),
}
}
}

View File

@ -1,3 +1,4 @@
pub mod types;
pub mod errors;
pub mod arrow_helpers;
pub mod config;

View File

@ -1,15 +0,0 @@
{
"id": "4828e537-2524-4af1-9acb-ed075d11e7ee",
"name": "events",
"schema_fingerprint": "test",
"objects": [
{
"bucket": "data",
"key": "datasets/events.parquet",
"size_bytes": 800,
"created_at": "2026-03-27T11:18:02.318643343Z"
}
],
"created_at": "2026-03-27T11:18:02.318648066Z",
"updated_at": "2026-03-27T11:18:02.318648066Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "f4c0f89b-85bf-4af6-8692-8e1cc81c357a",
"name": "scores",
"schema_fingerprint": "test",
"objects": [
{
"bucket": "data",
"key": "datasets/scores.parquet",
"size_bytes": 800,
"created_at": "2026-03-27T11:18:02.317035729Z"
}
],
"created_at": "2026-03-27T11:18:02.317042772Z",
"updated_at": "2026-03-27T11:18:02.317042772Z"
}

Binary file not shown.

Binary file not shown.

View File

@ -24,3 +24,13 @@
**Date:** 2026-03-27
**Decision:** All inter-service communication uses HTTP through Phase 4. gRPC migration in Phase 5.
**Rationale:** HTTP is simpler to debug, test, and iterate on. gRPC adds protobuf compilation and streaming complexity before APIs stabilize.
## ADR-006: TOML config over environment variables
**Date:** 2026-03-27
**Decision:** System configuration via lakehouse.toml file, with sane defaults for all values.
**Rationale:** Config files are versionable, self-documenting, and support structured data. Env vars remain available as overrides. System must be restartable from repo + config alone.
## ADR-007: Dual HTTP+gRPC on gateway
**Date:** 2026-03-27
**Decision:** Gateway serves HTTP on :3100 (external) and gRPC on :3101 (internal). Both run in the same process.
**Rationale:** Single binary simplifies deployment. HTTP stays for browser/curl access. gRPC provides typed contracts for service-to-service calls. No premature microservice split.

View File

@ -49,10 +49,10 @@
**Gate: PASSED** — Browse datasets and query from browser at lakehouse.devop.live.
## Phase 5: Hardening
- [ ] 5.1 — Proto definitions
- [ ] 5.2 — Internal gRPC migration
- [ ] 5.3 — OpenTelemetry tracing
- [ ] 5.4 — Auth middleware
- [ ] 5.5 — Config-driven startup
- [x] 5.1 — Proto definitions (lakehouse.proto: CatalogService, QueryService, StorageService, AiService)
- [x] 5.2 — Internal gRPC: CatalogService on :3101, proto crate with tonic codegen
- [x] 5.3 — OpenTelemetry tracing: stdout exporter, configurable via lakehouse.toml
- [x] 5.4 — Auth middleware: X-API-Key header check, toggleable via config
- [x] 5.5 — Config-driven startup: lakehouse.toml (gateway, storage, catalog, sidecar, ai, auth, observability)
**Gate:** gRPC internals, traces, auth, restartable from repo + config.
**Gate: PASSED** — gRPC on :3101, OTel traces, auth ready, system starts from repo + lakehouse.toml.

33
lakehouse.toml Normal file
View File

@ -0,0 +1,33 @@
# Lakehouse Configuration
[gateway]
host = "0.0.0.0"
port = 3100
[storage]
root = "./data"
# backend = "local" # local | s3 | rustfs
[catalog]
# Manifests persisted to object storage under this prefix
manifest_prefix = "_catalog/manifests"
[query]
# max_rows_per_query = 10000
[sidecar]
url = "http://localhost:3200"
[ai]
embed_model = "nomic-embed-text"
gen_model = "qwen2.5"
rerank_model = "qwen2.5"
[auth]
enabled = false
# api_key = "changeme"
[observability]
# Export traces to stdout (set to "otlp" for OpenTelemetry collector)
exporter = "stdout"
service_name = "lakehouse"

166
proto/lakehouse.proto Normal file
View File

@ -0,0 +1,166 @@
syntax = "proto3";
package lakehouse;
// --- Catalog Service ---
service CatalogService {
rpc CreateDataset(CreateDatasetRequest) returns (DatasetResponse);
rpc GetDataset(GetDatasetRequest) returns (DatasetResponse);
rpc GetDatasetByName(GetDatasetByNameRequest) returns (DatasetResponse);
rpc ListDatasets(ListDatasetsRequest) returns (ListDatasetsResponse);
}
message CreateDatasetRequest {
string name = 1;
string schema_fingerprint = 2;
repeated ObjectRef objects = 3;
}
message GetDatasetRequest {
string id = 1;
}
message GetDatasetByNameRequest {
string name = 1;
}
message ListDatasetsRequest {}
message ListDatasetsResponse {
repeated DatasetResponse datasets = 1;
}
message DatasetResponse {
string id = 1;
string name = 2;
string schema_fingerprint = 3;
repeated ObjectRef objects = 4;
string created_at = 5;
string updated_at = 6;
}
message ObjectRef {
string bucket = 1;
string key = 2;
uint64 size_bytes = 3;
string created_at = 4;
}
// --- Query Service ---
service QueryService {
rpc ExecuteQuery(QueryRequest) returns (QueryResponse);
}
message QueryRequest {
string sql = 1;
}
message QueryResponse {
repeated ColumnInfo columns = 1;
bytes rows_json = 2; // JSON-encoded rows
uint64 row_count = 3;
}
message ColumnInfo {
string name = 1;
string data_type = 2;
}
// --- Storage Service ---
service StorageService {
rpc PutObject(PutObjectRequest) returns (PutObjectResponse);
rpc GetObject(GetObjectRequest) returns (GetObjectResponse);
rpc DeleteObject(DeleteObjectRequest) returns (DeleteObjectResponse);
rpc ListObjects(ListObjectsRequest) returns (ListObjectsResponse);
}
message PutObjectRequest {
string key = 1;
bytes data = 2;
}
message PutObjectResponse {
string key = 1;
}
message GetObjectRequest {
string key = 1;
}
message GetObjectResponse {
bytes data = 1;
}
message DeleteObjectRequest {
string key = 1;
}
message DeleteObjectResponse {
string key = 1;
}
message ListObjectsRequest {
string prefix = 1;
}
message ListObjectsResponse {
repeated string keys = 1;
}
// --- AI Service ---
service AiService {
rpc Embed(EmbedRequest) returns (EmbedResponse);
rpc Generate(GenerateRequest) returns (GenerateResponse);
rpc Rerank(RerankRequest) returns (RerankResponse);
}
message EmbedRequest {
repeated string texts = 1;
string model = 2;
}
message EmbedResponse {
repeated Embedding embeddings = 1;
string model = 2;
uint32 dimensions = 3;
}
message Embedding {
repeated double values = 1;
}
message GenerateRequest {
string prompt = 1;
string model = 2;
string system = 3;
double temperature = 4;
uint32 max_tokens = 5;
}
message GenerateResponse {
string text = 1;
string model = 2;
uint64 tokens_evaluated = 3;
uint64 tokens_generated = 4;
}
message RerankRequest {
string query = 1;
repeated string documents = 2;
string model = 3;
uint32 top_k = 4;
}
message RerankResponse {
repeated ScoredDocument results = 1;
string model = 2;
}
message ScoredDocument {
uint32 index = 1;
string text = 2;
double score = 3;
}