Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").
Routes (all under /v1/observer/* via gateway):
GET /observer/health — liveness
GET /observer/stats — total / successes / failures /
by_source / recent_scenario_ops
(matches Rust JSON shape exactly)
POST /observer/event — record one ObservedOp; auto-defaults
timestamp + source, validates required
fields (endpoint), persists to JSONL,
appends to ring buffer
Architecture:
- internal/observer/types.go — ObservedOp model + Source taxonomy
(mcp / scenario / langfuse / overseer_correction). Mirrors the
Rust shape so JSON round-trips during cutover.
- internal/observer/store.go — Store + Persistor. Ring buffer cap
matches Rust's 2000; recent_scenarios cap matches Rust's 10.
Same persist-then-apply order as pathwayd; same corruption-
tolerant replay (skip malformed lines + warn).
- cmd/observerd — :3219 HTTP service, fronted by gateway as
/v1/observer/*.
- lakehouse.toml + DefaultConfig — [observerd] block matches the
pathwayd pattern (Bind + PersistPath; empty path = ephemeral).
Tests + smoke (all PASS):
- 7 unit tests in store_test.go: validation, default fields,
stats aggregation, recent-scenarios cap + ordering, ring-buffer
rollover at cap, JSONL round-trip persistence, corruption-
tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
- scripts/observer_smoke.sh: 4 assertions through gateway —
record 5 events (3 ok / 2 fail across 2 sources), stats
aggregates correctly, empty-endpoint→400, kill+restart preserves
via JSONL replay (5 ops, 3 ok, 2 err survive)
Deferred (named in package + cmd doc, not in this commit):
- POST /observer/review (cloud-LLM hand-review fall-back). The
heuristic-only path could land cheaply but the productized
cloud path (qwen3-coder fall-back) is multi-day port.
- Background loops: analyzeErrors, consolidatePlaybooks,
tailOverseerCorrections (read overseer_corrections.jsonl into
the ring buffer once per cycle).
- escalateFailureClusterToLLMTeam (failure clustering trigger
that posts to LLM Team's /api/run with code_review mode).
/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).
16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
280 lines
10 KiB
Go
280 lines
10 KiB
Go
// Package shared also provides the TOML config loader. Per ADR
|
||
// equivalent of Rust ADR-006 (TOML config over env vars), every
|
||
// service reads `lakehouse.toml` with sane defaults and env
|
||
// overrides. Config is hot-reload-unaware in G0; reload-on-SIGHUP
|
||
// is a G1+ concern.
|
||
package shared
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"io/fs"
|
||
"log/slog"
|
||
"os"
|
||
|
||
"github.com/pelletier/go-toml/v2"
|
||
)
|
||
|
||
// Config is the unified Lakehouse config. Each service reads only
|
||
// the section it cares about, but they all share the same file so
|
||
// operators have one place to look.
|
||
type Config struct {
|
||
Gateway GatewayConfig `toml:"gateway"`
|
||
Storaged ServiceConfig `toml:"storaged"`
|
||
Catalogd CatalogConfig `toml:"catalogd"`
|
||
Ingestd IngestConfig `toml:"ingestd"`
|
||
Queryd QuerydConfig `toml:"queryd"`
|
||
Vectord VectordConfig `toml:"vectord"`
|
||
Embedd EmbeddConfig `toml:"embedd"`
|
||
Pathwayd PathwaydConfig `toml:"pathwayd"`
|
||
Matrixd MatrixdConfig `toml:"matrixd"`
|
||
Observerd ObserverdConfig `toml:"observerd"`
|
||
S3 S3Config `toml:"s3"`
|
||
Log LogConfig `toml:"log"`
|
||
Auth AuthConfig `toml:"auth"`
|
||
}
|
||
|
||
// IngestConfig adds ingestd-specific knobs. ingestd needs to PUT
|
||
// parquet to storaged AND register manifests with catalogd, so it
|
||
// holds two upstream URLs in addition to its own bind.
|
||
//
|
||
// MaxIngestBytes caps the multipart body size. CSVs are typically
|
||
// 4-6× larger than the resulting Snappy-compressed Parquet, so 256
|
||
// MiB CSV → ~50 MiB Parquet — well under storaged's 256 MiB PUT
|
||
// cap. Real-scale validation (2026-04-29) showed 500K workers ×
|
||
// 18 cols = 344 MiB CSV → 71 MiB Parquet; bumping this knob to
|
||
// 512 MiB is the documented path for that workload.
|
||
type IngestConfig struct {
|
||
Bind string `toml:"bind"`
|
||
StoragedURL string `toml:"storaged_url"`
|
||
CatalogdURL string `toml:"catalogd_url"`
|
||
MaxIngestBytes int64 `toml:"max_ingest_bytes"`
|
||
}
|
||
|
||
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
||
// Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql,
|
||
// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix, /v1/observer)
|
||
// has its own upstream so we can scale services independently or
|
||
// move them to different boxes without touching gateway code.
|
||
type GatewayConfig struct {
|
||
Bind string `toml:"bind"`
|
||
StoragedURL string `toml:"storaged_url"`
|
||
CatalogdURL string `toml:"catalogd_url"`
|
||
IngestdURL string `toml:"ingestd_url"`
|
||
QuerydURL string `toml:"queryd_url"`
|
||
VectordURL string `toml:"vectord_url"`
|
||
EmbeddURL string `toml:"embedd_url"`
|
||
PathwaydURL string `toml:"pathwayd_url"`
|
||
MatrixdURL string `toml:"matrixd_url"`
|
||
ObserverdURL string `toml:"observerd_url"`
|
||
}
|
||
|
||
// EmbeddConfig drives the embed service. ProviderURL points at the
|
||
// embedding backend (Ollama in G2, possibly OpenAI/Voyage in G3+).
|
||
// DefaultModel is what gets used when callers don't specify a
|
||
// model in their request body. CacheSize is the LRU cache cap on
|
||
// (model, sha256(text)) → vector lookups; 0 disables caching.
|
||
// Default 10000 entries ≈ 30 MiB at d=768.
|
||
type EmbeddConfig struct {
|
||
Bind string `toml:"bind"`
|
||
ProviderURL string `toml:"provider_url"`
|
||
DefaultModel string `toml:"default_model"`
|
||
CacheSize int `toml:"cache_size"`
|
||
}
|
||
|
||
// VectordConfig adds vectord-specific knobs. StoragedURL is
|
||
// optional — empty string disables persistence, useful for ephemeral
|
||
// dev or test runs. When set, indexes Save after every state change
|
||
// and Load on startup.
|
||
type VectordConfig struct {
|
||
Bind string `toml:"bind"`
|
||
StoragedURL string `toml:"storaged_url"`
|
||
}
|
||
|
||
// PathwaydConfig drives the pathway-memory service (cmd/pathwayd).
|
||
// PersistPath: file path to the JSONL log; empty = in-memory only
|
||
// (test/dev). Production sets a stable path under /var/lib/lakehouse
|
||
// or similar so traces survive restart.
|
||
type PathwaydConfig struct {
|
||
Bind string `toml:"bind"`
|
||
PersistPath string `toml:"persist_path"`
|
||
}
|
||
|
||
// MatrixdConfig drives the matrix-indexer service (cmd/matrixd).
|
||
// Per docs/SPEC.md §3.4: multi-corpus retrieve+merge over vectord
|
||
// with embed-via-embedd for query text. Both upstream URLs are
|
||
// required — matrixd has no in-process fallback.
|
||
type MatrixdConfig struct {
|
||
Bind string `toml:"bind"`
|
||
EmbeddURL string `toml:"embedd_url"`
|
||
VectordURL string `toml:"vectord_url"`
|
||
}
|
||
|
||
// ObserverdConfig drives the observer service (cmd/observerd).
|
||
// PersistPath: file path to the JSONL ops log; empty = in-memory
|
||
// only (test/dev). Production sets a stable path under
|
||
// /var/lib/lakehouse/observer/ops.jsonl so ops survive restart.
|
||
// Mirrors the PathwaydConfig pattern.
|
||
type ObserverdConfig struct {
|
||
Bind string `toml:"bind"`
|
||
PersistPath string `toml:"persist_path"`
|
||
}
|
||
|
||
// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB
|
||
// directly to MinIO via DuckDB's httpfs extension (so no storaged
|
||
// URL needed), and reads the catalog over HTTP for view registration.
|
||
// SecretsPath defaults to /etc/lakehouse/secrets-go.toml — the same
|
||
// file storaged uses, since both services need the S3 credentials.
|
||
type QuerydConfig struct {
|
||
Bind string `toml:"bind"`
|
||
CatalogdURL string `toml:"catalogd_url"`
|
||
SecretsPath string `toml:"secrets_path"`
|
||
RefreshEvery string `toml:"refresh_every"` // duration string, e.g. "30s"
|
||
}
|
||
|
||
// CatalogConfig adds catalogd-specific knobs on top of the standard
|
||
// bind. StoragedURL points at the storaged service for manifest
|
||
// persistence; G0 defaults to the localhost bind.
|
||
type CatalogConfig struct {
|
||
Bind string `toml:"bind"`
|
||
StoragedURL string `toml:"storaged_url"`
|
||
}
|
||
|
||
// ServiceConfig is the per-binary bind config. Default Bind ""
|
||
// means "use the service's hardcoded G0 default" — see DefaultConfig.
|
||
type ServiceConfig struct {
|
||
Bind string `toml:"bind"`
|
||
}
|
||
|
||
// S3Config holds S3-compatible storage settings. Endpoint blank →
|
||
// AWS default. Bucket "" → "lakehouse-primary".
|
||
type S3Config struct {
|
||
Endpoint string `toml:"endpoint"`
|
||
Region string `toml:"region"`
|
||
Bucket string `toml:"bucket"`
|
||
AccessKeyID string `toml:"access_key_id"`
|
||
SecretAccessKey string `toml:"secret_access_key"`
|
||
UsePathStyle bool `toml:"use_path_style"`
|
||
}
|
||
|
||
// LogConfig — slog level for now; structured fields land G1+.
|
||
type LogConfig struct {
|
||
Level string `toml:"level"`
|
||
}
|
||
|
||
// AuthConfig is the inter-service auth posture from ADR-003.
|
||
// Token is a Bearer token; empty means "no auth" (G0 dev mode).
|
||
// AllowedIPs is a list of CIDRs (or bare IPs treated as /32);
|
||
// empty means "any source IP."
|
||
//
|
||
// Both layers operate independently when set:
|
||
// - Token + AllowedIPs both empty → middleware is a no-op
|
||
// - Token only → 401 unless Bearer matches
|
||
// - AllowedIPs only → 403 unless r.RemoteAddr in CIDR
|
||
// - Both → both gates apply
|
||
//
|
||
// The startup gate in shared.Run refuses to start with non-loopback
|
||
// bind AND empty Token — that's the audit's R-001 + R-007 worst
|
||
// case (no auth, world-reachable). LH_<SVC>_ALLOW_NONLOOPBACK=1 still
|
||
// bypasses the bind gate for explicit dev cases; the auth gate is
|
||
// independent of that bypass and is the real production guard.
|
||
type AuthConfig struct {
|
||
Token string `toml:"token"`
|
||
AllowedIPs []string `toml:"allowed_ips"`
|
||
}
|
||
|
||
// DefaultConfig returns the G0 dev defaults. Ports are shifted to
|
||
// 3110+ to coexist with the live Rust lakehouse on 3100/3201-3204
|
||
// during the migration. G5 cutover flips gateway back to 3100.
|
||
func DefaultConfig() Config {
|
||
return Config{
|
||
Gateway: GatewayConfig{
|
||
Bind: "127.0.0.1:3110",
|
||
StoragedURL: "http://127.0.0.1:3211",
|
||
CatalogdURL: "http://127.0.0.1:3212",
|
||
IngestdURL: "http://127.0.0.1:3213",
|
||
QuerydURL: "http://127.0.0.1:3214",
|
||
VectordURL: "http://127.0.0.1:3215",
|
||
EmbeddURL: "http://127.0.0.1:3216",
|
||
PathwaydURL: "http://127.0.0.1:3217",
|
||
MatrixdURL: "http://127.0.0.1:3218",
|
||
ObserverdURL: "http://127.0.0.1:3219",
|
||
},
|
||
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
||
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
||
Ingestd: IngestConfig{
|
||
Bind: "127.0.0.1:3213",
|
||
StoragedURL: "http://127.0.0.1:3211",
|
||
CatalogdURL: "http://127.0.0.1:3212",
|
||
MaxIngestBytes: 256 << 20, // 256 MiB; bump per deployment via lakehouse.toml
|
||
},
|
||
Vectord: VectordConfig{
|
||
Bind: "127.0.0.1:3215",
|
||
StoragedURL: "http://127.0.0.1:3211",
|
||
},
|
||
Embedd: EmbeddConfig{
|
||
Bind: "127.0.0.1:3216",
|
||
ProviderURL: "http://localhost:11434", // local Ollama
|
||
DefaultModel: "nomic-embed-text",
|
||
CacheSize: 10_000, // ~30 MiB at d=768; set to 0 to disable
|
||
},
|
||
Pathwayd: PathwaydConfig{
|
||
Bind: "127.0.0.1:3217",
|
||
// PersistPath empty by default = in-memory only. Production
|
||
// sets to e.g. /var/lib/lakehouse/pathway/state.jsonl.
|
||
},
|
||
Matrixd: MatrixdConfig{
|
||
Bind: "127.0.0.1:3218",
|
||
EmbeddURL: "http://127.0.0.1:3216",
|
||
VectordURL: "http://127.0.0.1:3215",
|
||
},
|
||
Observerd: ObserverdConfig{
|
||
Bind: "127.0.0.1:3219",
|
||
// PersistPath empty by default = in-memory only.
|
||
},
|
||
Queryd: QuerydConfig{
|
||
Bind: "127.0.0.1:3214",
|
||
CatalogdURL: "http://127.0.0.1:3212",
|
||
SecretsPath: "/etc/lakehouse/secrets-go.toml",
|
||
RefreshEvery: "30s",
|
||
},
|
||
S3: S3Config{
|
||
Endpoint: "http://localhost:9000",
|
||
Region: "us-east-1",
|
||
Bucket: "lakehouse-primary",
|
||
UsePathStyle: true,
|
||
},
|
||
Log: LogConfig{Level: "info"},
|
||
}
|
||
}
|
||
|
||
// LoadConfig reads `lakehouse.toml` from path; if path is empty or
|
||
// the file doesn't exist, returns DefaultConfig. Any decode error is
|
||
// fatal (we don't want a misconfigured service silently falling back
|
||
// to defaults — that's the kind of bug you find at 2am).
|
||
//
|
||
// Per Opus + Qwen WARN #3: when path WAS given but the file is
|
||
// missing, log a warning so silent default-fallback doesn't hide
|
||
// misconfiguration. Empty path is fine (caller didn't ask for a
|
||
// file); non-empty + missing is suspicious.
|
||
func LoadConfig(path string) (Config, error) {
|
||
cfg := DefaultConfig()
|
||
if path == "" {
|
||
return cfg, nil
|
||
}
|
||
b, err := os.ReadFile(path)
|
||
if errors.Is(err, fs.ErrNotExist) {
|
||
slog.Warn("config file not found, using defaults",
|
||
"path", path,
|
||
"hint", "create the file or pass -config /path/to/lakehouse.toml")
|
||
return cfg, nil
|
||
}
|
||
if err != nil {
|
||
return cfg, fmt.Errorf("read config: %w", err)
|
||
}
|
||
if err := toml.Unmarshal(b, &cfg); err != nil {
|
||
return cfg, fmt.Errorf("parse config: %w", err)
|
||
}
|
||
return cfg, nil
|
||
}
|