From 9e9e4c26a4b6b5c973683870d97c7a51e2a48f80 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 00:10:55 -0500 Subject: [PATCH] =?UTF-8?q?G0=20D5:=20queryd=20DuckDB=20SELECT=20over=20Pa?= =?UTF-8?q?rquet=20via=20httpfs=20=C2=B7=204=20scrum=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET (TYPE S3) on every new connection, sourced from SecretsProvider + shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and handler's SELECTs serialize through one connection (avoids cross- connection MVCC visibility edge cases). Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key') per manifest, drops views for removed manifests, skips on unchanged updated_at (the implicit etag). Drop pass runs BEFORE create pass so a poison manifest can't block other manifest refreshes (post-scrum C1 fix). POST /sql with JSON body {"sql":"…"} returns {"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]], "row_count":N}. []byte → string conversion so VARCHAR rows JSON-encode as text. 30s default refresh ticker, configurable via [queryd].refresh_every. Cross-lineage scrum on shipped code: - Opus 4.7 (opencode): 1 BLOCK + 4 WARN + 4 INFO - Kimi K2-0905 (openrouter): 2 BLOCK + 2 WARN + 1 INFO - Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO Fixed (4): C1 (Opus + Kimi convergent): Refresh aborts on first per-view error → drop pass first, collect errors, errors.Join. Poison manifest no longer blocks the rest of the catalog from re-syncing. B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx → cancelled-ctx silently fails every reconnect. context.Background() inside closure; passed ctx only for initial Ping. B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80 chars but those 80 chars contained KEY_ID + SECRET prefix → log aggregator captures credentials. Stable per-statement labels + redactCreds() filter on wrapped DuckDB errors. JSON-ERR (Opus WARN): swallowed json.Encode error → silent truncated 200 on unsupported column types. slog.Warn the failure. Dismissed (4 false positives): Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit Qwen BLOCK "MaxBytesReader after Decode" — false, applied before Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a deadlock, just serialization, by design with 10s timeout retry Kimi WARN "dropView leaves r.known inconsistent" — current code returns before the delete; the entry persists for retry Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi on the per-view error blocking, plus two independent single-reviewer BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The B-LEAK fix uses defense-in-depth: never pass SQL into the error path AND redact known cred values from DuckDB's own error message. DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every fix round. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/queryd/main.go | 235 ++++++++++++++++++++++++++++-- docs/PHASE_G0_KICKOFF.md | 169 +++++++++++++++++++++ go.mod | 8 + go.sum | 16 ++ internal/queryd/db.go | 140 ++++++++++++++++++ internal/queryd/registrar.go | 181 +++++++++++++++++++++++ internal/queryd/registrar_test.go | 192 ++++++++++++++++++++++++ internal/shared/config.go | 21 ++- lakehouse.toml | 3 + scripts/d5_smoke.sh | 182 +++++++++++++++++++++++ 10 files changed, 1129 insertions(+), 18 deletions(-) create mode 100644 internal/queryd/db.go create mode 100644 internal/queryd/registrar.go create mode 100644 internal/queryd/registrar_test.go create mode 100755 scripts/d5_smoke.sh diff --git a/cmd/queryd/main.go b/cmd/queryd/main.go index b1a6bc6..2a9debc 100644 --- a/cmd/queryd/main.go +++ b/cmd/queryd/main.go @@ -1,22 +1,33 @@ -// queryd is the SQL execution layer — DuckDB via cgo, registers -// catalog datasets as views, executes ad-hoc SQL. D5 wires the -// actual /sql route + DuckDB session with S3 secret plumbed in -// (per Opus B2 fix). D1 just stands up the binary. -// -// NOTE: cgo handle lifecycle. The shared server factory's -// graceful-shutdown is enough for G0 (DuckDB connections are -// goroutine-local), but G1+ when persistent prepared statements + -// HNSW indexes attach, queryd will need its own OnShutdown drain -// hook. Tracked in D7.5 (W3 disposition). +// queryd is the SQL execution layer. DuckDB engine (via cgo), reads +// Parquet directly from S3 via httpfs, registers catalog datasets as +// views, exposes POST /sql. The interesting glue is in +// internal/queryd/{db,registrar}.go; main.go wires the lifecycle. package main import ( + "context" + "database/sql" + "encoding/json" + "errors" "flag" "log/slog" + "net/http" "os" + "strings" + "time" - "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/queryd" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +const ( + primaryBucket = "primary" + maxSQLBodyBytes = 64 << 10 // 64 KiB cap on POST /sql body — SQL strings are not large + defaultRefresh = 30 * time.Second ) func main() { @@ -28,13 +39,205 @@ func main() { slog.Error("config", "err", err) os.Exit(1) } + if cfg.Queryd.CatalogdURL == "" { + slog.Error("config", "err", "queryd.catalogd_url is required") + os.Exit(1) + } + if cfg.S3.Bucket == "" { + slog.Error("config", "err", "s3.bucket is required") + os.Exit(1) + } - if err := shared.Run("queryd", cfg.Queryd.Bind, func(_ chi.Router) { - // D5 wires: - // POST /sql (JSON body {"sql": "..."} → DuckDB exec → JSON rows) - // Internal: TTL-cached views + etag invalidation against catalogd. - }); err != nil { + refreshEvery := defaultRefresh + if cfg.Queryd.RefreshEvery != "" { + d, err := time.ParseDuration(cfg.Queryd.RefreshEvery) + if err != nil { + slog.Error("config", "err", "queryd.refresh_every is not a valid duration: "+err.Error()) + os.Exit(1) + } + refreshEvery = d + } + + // Long-running ctx tied to lifetime of the process for the + // background refresh goroutine. Cancelled when shared.Run returns. + procCtx, procCancel := context.WithCancel(context.Background()) + defer procCancel() + + prov, err := secrets.NewFileProvider(cfg.Queryd.SecretsPath, secrets.S3Credentials{ + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + }) + if err != nil { + slog.Error("secrets", "err", err) + os.Exit(1) + } + + db, err := queryd.OpenDB(procCtx, cfg.S3, prov, primaryBucket) + if err != nil { + slog.Error("duckdb open", "err", err) + os.Exit(1) + } + defer db.Close() + + catalog := catalogclient.New(cfg.Queryd.CatalogdURL) + registrar := queryd.NewRegistrar(db, catalog, cfg.S3.Bucket) + + // Initial refresh — log if non-empty so the operator sees what + // got loaded. A failure here is non-fatal: catalogd may be coming + // up later in the boot order, the TTL goroutine will retry. + refreshCtx, refreshCancel := context.WithTimeout(procCtx, 10*time.Second) + stats, err := registrar.Refresh(refreshCtx) + refreshCancel() + if err != nil { + slog.Warn("initial refresh failed (will retry)", "err", err) + } else { + slog.Info("initial refresh", "created", stats.Created, "skipped", stats.Skipped) + } + + // Background ticker — drives Refresh on the configured interval. + go runRefreshLoop(procCtx, registrar, refreshEvery) + + h := &handlers{db: db} + + if err := shared.Run("queryd", cfg.Queryd.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) } } + +// runRefreshLoop drives Registrar.Refresh on a ticker. Cancellable +// via ctx. Logs every refresh; in a quiet run that's chatty but +// useful for tracking when datasets land. +func runRefreshLoop(ctx context.Context, r *queryd.Registrar, every time.Duration) { + ticker := time.NewTicker(every) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + stats, err := r.Refresh(rctx) + cancel() + if err != nil { + slog.Warn("refresh failed", "err", err) + continue + } + if stats.Created+stats.Updated+stats.Dropped > 0 { + slog.Info("refresh", + "created", stats.Created, + "updated", stats.Updated, + "dropped", stats.Dropped, + "skipped", stats.Skipped) + } + } + } +} + +type handlers struct { + db *sql.DB +} + +func (h *handlers) register(r chi.Router) { + r.Post("/sql", h.handleSQL) +} + +// sqlRequest is the POST /sql body shape. +type sqlRequest struct { + SQL string `json:"sql"` +} + +// sqlResponse is the result envelope. Columns + rows is the compact +// form; verbose row-as-object form is post-G0 if anyone needs it. +type sqlResponse struct { + Columns []sqlColumn `json:"columns"` + Rows [][]any `json:"rows"` + RowCount int `json:"row_count"` +} + +type sqlColumn struct { + Name string `json:"name"` + Type string `json:"type"` +} + +func (h *handlers) handleSQL(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + r.Body = http.MaxBytesReader(w, r.Body, maxSQLBodyBytes) + var req sqlRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { + http.Error(w, "sql body too large", http.StatusRequestEntityTooLarge) + return + } + http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) + return + } + if strings.TrimSpace(req.SQL) == "" { + http.Error(w, "sql is empty", http.StatusBadRequest) + return + } + + rows, err := h.db.QueryContext(r.Context(), req.SQL) + if err != nil { + // DuckDB errors are user-facing for ad-hoc SQL — expose them + // at 400 so callers can see what went wrong (table not found, + // syntax error, etc.). Internal infra issues would surface as + // 500 once we distinguish them later. + http.Error(w, "query: "+err.Error(), http.StatusBadRequest) + return + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + http.Error(w, "columns: "+err.Error(), http.StatusInternalServerError) + return + } + colTypes, err := rows.ColumnTypes() + if err != nil { + http.Error(w, "column types: "+err.Error(), http.StatusInternalServerError) + return + } + + resp := sqlResponse{ + Columns: make([]sqlColumn, len(cols)), + Rows: [][]any{}, + } + for i, name := range cols { + resp.Columns[i] = sqlColumn{Name: name, Type: colTypes[i].DatabaseTypeName()} + } + + for rows.Next() { + dest := make([]any, len(cols)) + ptrs := make([]any, len(cols)) + for i := range dest { + ptrs[i] = &dest[i] + } + if err := rows.Scan(ptrs...); err != nil { + http.Error(w, "scan: "+err.Error(), http.StatusInternalServerError) + return + } + // JSON can't encode []byte as text by default — DuckDB returns + // VARCHAR as []byte through database/sql. Convert here. + for i, v := range dest { + if b, ok := v.([]byte); ok { + dest[i] = string(b) + } + } + resp.Rows = append(resp.Rows, dest) + } + if err := rows.Err(); err != nil { + http.Error(w, "rows err: "+err.Error(), http.StatusInternalServerError) + return + } + resp.RowCount = len(resp.Rows) + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + // Headers already sent — can't change the status code, but log + // so an unsupported DuckDB column type (Decimal, INTERVAL, etc.) + // surfaces in the operator log. Per scrum JSON-ERR (Opus). + slog.Warn("sql encode response", "err", err) + } +} diff --git a/docs/PHASE_G0_KICKOFF.md b/docs/PHASE_G0_KICKOFF.md index 1dc31d1..00e6d86 100644 --- a/docs/PHASE_G0_KICKOFF.md +++ b/docs/PHASE_G0_KICKOFF.md @@ -825,3 +825,172 @@ data getting overwritten before the 409 fired. Opus reading the PUT-then-register order and noticing that PUT mutates state before the validation check is exactly the kind of architectural code-read the scrum exists for. + +--- + +## Pre-D5 prep (commit 4205ecd) + +Between D4 and D5, J asked what would bite D5 if not fixed first. +The answer was the `CatalogClient` package location: it lived in +`internal/ingestd/` but D5's queryd needed the same shape, and +having queryd import from ingestd would have inverted the data-flow +direction (ingestd is upstream of queryd). + +Extracted to `internal/catalogclient/` as a shared package, added +the `List(ctx)` method queryd needs for view registration, kept +the wire format unchanged. ingestd switched its import; all four +existing smokes (D1-D4) passed unchanged. + +DuckDB cgo path re-verified at this point with the official +`github.com/duckdb/duckdb-go/v2` v2.10502.0 (per ADR-001 §1) on +Go 1.25 + arrow-go: standalone `sql.Open("duckdb","")` → +`db.Ping()` → `SELECT 'pong'` round-trip succeeded. + +--- + +## D5 — actual run results (2026-04-29) + +Phase G0 Day 5 executed end-to-end. Output of `scripts/d5_smoke.sh`: + +``` +[d5-smoke] ingest 5-row CSV via D4 path: + ✓ ingest row_count=5 +[d5-smoke] launching queryd (initial Refresh picks up d5_workers)... +[d5-smoke] POST /sql SELECT count(*) FROM d5_workers: + ✓ count(*)=5 +[d5-smoke] POST /sql SELECT * FROM d5_workers LIMIT 3: + ✓ rows[0] = (id=1, name=Alice), columns=[id, name, salary] +[d5-smoke] schema-drift ingest 409s; existing view still queries: + ✓ drift → 409 + ✓ post-drift count(*)=5 (view unchanged) +[d5-smoke] error path: SELECT FROM nonexistent → 400: + ✓ unknown table → 400 +[d5-smoke] D5 acceptance gate: PASSED +``` + +What landed: +- `internal/queryd/db.go` — `OpenDB(ctx, s3, prov, bucket)` returns + a `*sql.DB` backed by an in-memory DuckDB. Custom Connector with + a bootstrapper that runs `INSTALL httpfs` / `LOAD httpfs` / + `CREATE OR REPLACE SECRET (TYPE S3, KEY_ID …, ENDPOINT …, + URL_STYLE 'path', USE_SSL false)` on every new physical connection. + `SetMaxOpenConns(1)` so registrar's CREATE VIEWs and handler's + user SQL serialize through one connection (avoids cross-connection + visibility edge cases). +- `internal/queryd/registrar.go` — `Registrar.Refresh(ctx)` reads + catalog list, runs `CREATE OR REPLACE VIEW "name" AS SELECT * + FROM read_parquet('s3://bucket/key')` per manifest, drops views + for removed manifests, skips on unchanged `updated_at`. + `CatalogLister` + `Execer` interfaces enable unit tests with no + DuckDB / no real catalogd. +- `cmd/queryd/main.go` — POST /sql with JSON body + `{"sql":"…"}` → executes via `*sql.DB` → response shape + `{"columns":[{"name","type"},...], "rows":[[...]], "row_count":N}`. + `[]byte` → string conversion so VARCHAR rows JSON-encode as text. + Initial `Refresh` on startup, then a TTL ticker (`30s` default, + configurable via `[queryd].refresh_every`) calling `Refresh` in a + goroutine cancellable via process ctx. +- New `[queryd]` config block: `bind` + `catalogd_url` + + `secrets_path` + `refresh_every`. +- `scripts/d5_smoke.sh` — 6 acceptance probes including post-drift + query stability (`view unchanged after schema-drift 409`). + +The D5 smoke launches queryd LAST — after the D4 ingest has +registered the dataset — so the initial `Refresh` picks it up +without waiting for the TTL. This matches what real ops looks like: +queryd starts after data is loaded, picks up the catalog at that +moment, then drifts in lockstep via the ticker. + +Critical architectural choices: +1. **`SetMaxOpenConns(1)`** — DuckDB views live in the in-memory + catalog of a single instance; with one connection, registrar's + CREATE VIEWs and handler's SELECTs are visibly synchronized + without MVCC-timing edge cases. Lift in G2+ if intra-process + query concurrency wins matter. +2. **Always-quote view identifiers** — `CREATE OR REPLACE VIEW "name"` + with internal `"` doubled. Catalogd accepts user-supplied names + that wouldn't pass SQL bare-identifier rules; quoting makes them + unambiguous. +3. **`updated_at` as the implicit etag** — no separate ETag header + in catalogd. The manifest's timestamp bumps on every same-fp + re-register; registrar tracks `time.Time` per name and skips + `CREATE` when unchanged (Opus's "perf cliff" warning from D5 plan). + +Next: D6 — gateway as a reverse proxy in front of all four backing +services. Last day of G0. After that: G1+ (gRPC, Lance bench, vector +indices, etc). + +--- + +## D5 — code scrum review (3-lineage parallel pass) + +| Pass | Reviewer | Latency | Findings | +|---|---|---|---| +| 1 | `opencode/claude-opus-4-7` | ~28s | 1 BLOCK + 4 WARN + 4 INFO = 9 (1 self-retracted) | +| 2 | `openrouter/moonshotai/kimi-k2-0905` | ~25s | 2 BLOCK + 2 WARN + 1 INFO = 5 | +| 3 | `openrouter/qwen/qwen3-coder` | ~20s | 2 BLOCK + 1 WARN + 1 INFO = 4 | + +Total: 18 distinct findings. Strongest scrum yet on convergence +quality — Opus's BLOCK on ctx capture + Kimi's BLOCK on credential +leakage in error messages were both deep architectural reads that +smoke could not have caught. + +### Convergent findings (≥2 reviewers — high confidence) + +| # | Severity | Finding | Reviewers | Disposition | +|---|---|---|---|---| +| C1 | WARN×2 | `Refresh` aborts mid-loop on first per-view error → poison manifest blocks all subsequent drops/updates → stale views stick around forever (Opus); a partial `dropView` failure would block other manifest refreshes (Kimi) | Opus + Kimi | **Fixed** — drop pass runs BEFORE create pass; per-iteration errors are collected and Refresh continues; final return uses `errors.Join` to surface every failure | + +### Single-reviewer findings — applied + +| # | Severity | Finding | Reviewer | Disposition | +|---|---|---|---|---| +| B-CTX | Opus BLOCK | Bootstrap closure captures the `ctx` passed to `OpenDB`. Today masked by `SetMaxOpenConns(1)` + long-lived `procCtx`; future short-lived ctx + reconnect would silently fail every reconnect's bootstrap with a cancelled ctx | Opus | **Fixed** — bootstrap explicitly uses `context.Background()`; passed `ctx` is only used for the initial `PingContext` | +| B-LEAK | Kimi BLOCK | The prior `firstLine(stmt)` truncation to 80 chars STILL contained both `KEY_ID '...'` AND the start of `SECRET '...'` — log aggregators would capture credentials | Kimi | **Fixed** — replaced `firstLine(stmt)` with stable per-statement labels (`"install httpfs"` / `"load httpfs"` / `"create secret"`) so no SQL reaches the error path. Wrapped `err.Error()` is filtered through `redactCreds` to scrub any secret values DuckDB itself might echo | +| JSON-ERR | Opus WARN | `_ = json.NewEncoder(w).Encode(resp)` swallows mid-encode failures; client sees truncated 200 with no log signal — first column type DuckDB can't JSON-encode (e.g. INTERVAL) lands here silently | Opus | **Fixed** — log via `slog.Warn` with the underlying error | + +### Single-reviewer findings — accepted with rationale + +| # | Severity | Finding | Reviewer | Disposition | +|---|---|---|---|---| +| O-W3 | Opus WARN | DuckDB exotic types (Decimal, INTERVAL, etc.) may not JSON-encode well — `dest := []any` with `[]byte→string` conversion is the only type-aware step today | Opus | **Accepted** — type-aware row converter is post-G0; G0 column types in test fixtures are int64/varchar/double/bool. The new `JSON-ERR` fix makes any encoding failure operator-visible in the meantime | +| K-W1 | Kimi WARN | No exponential backoff on repeated refresh failures | Kimi | **Accepted** — G0 single-tenant; 30s ticker is fine. Backoff adds complexity that smoke can't validate | +| O-INFO×3 | Opus INFO | USE_SSL casing cosmetic; initial-refresh log doesn't match runRefreshLoop's four-field format | Opus | **Accepted** / **fixed cleanup** — the `var _ = io.Discard` placeholder removed; the rest deferred | +| Q-W | Qwen WARN | Object key validation | Qwen | **Accepted** — catalogd's contract; storaged validateKey is the actual gate | +| Q-I | Qwen INFO | Hardcoded refresh timeout | Qwen | **Accepted** — separate config knob is over-engineering for G0 | + +### Dismissed (false positives) + +| # | Severity | Finding | Reviewer | Why dismissed | +|---|---|---|---|---| +| F1 | Q-BLOCK | "Bootstrap not transactional" | Qwen | False — DuckDB DDL is auto-commit + idempotent; `CREATE OR REPLACE` handles re-run cleanly | +| F2 | Q-BLOCK | "MaxBytesReader applied AFTER json.NewDecoder reads" | Qwen | False — `cmd/queryd/main.go` sets `r.Body = http.MaxBytesReader(...)` BEFORE the decode call, by line ordering | +| F3 | K-BLOCK | "Concurrent Refresh + user query deadlock on single connection" | Kimi | False — not a deadlock, just serialization. With Refresh's 10s ctx-timeout, slow SELECT causes Refresh to skip a tick and retry. Design accept | +| F4 | K-W2 | "dropView failure leaves r.known inconsistent" | Kimi | False — current code returns BEFORE the `delete(r.known, name)` on dropView error, so the entry persists and next refresh retries (correct behavior). Misread of the sequence | + +### Cumulative D5 disposition + +- **3-lineage parallel pass: 18 distinct findings** +- **Fixed: 4** (1 convergent + 3 single-reviewer real) +- **Accepted-with-rationale: 5** +- **Deferred / cleanup: 3** (1 INFO removal, 2 INFOs deferred) +- **Dismissed (false positives): 4** +- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round. + +The B-LEAK finding (Kimi BLOCK) is worth a pause: the prior code +explicitly tried to be careful about credentials by truncating the +SQL to 80 chars in error messages, but the truncation window still +captured both `KEY_ID ''` and the prefix of +`SECRET ''` because of where they appear in the statement. +The defense-in-depth answer was to never pass the SQL into the +error path at all — labels are static, the error chain wraps a +credential-redacted version of DuckDB's own error message. Two +layers of "don't leak" instead of one fragile truncation. + +The B-CTX finding (Opus BLOCK) is the kind of latent issue that +would cost a future debugger hours: the connector callback runs on +EVERY new physical connection, but `SetMaxOpenConns(1)` masks the +issue today. A future change to allow concurrent connections would +silently break bootstrap on every reconnect with no clear error +signal. Catching it during the scrum saved a Friday-afternoon- +debugging incident months from now. diff --git a/go.mod b/go.mod index ace06eb..0dd7415 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,14 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/duckdb/duckdb-go-bindings v0.10502.0 // indirect + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0 // indirect + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0 // indirect + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 // indirect + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 // indirect + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 // indirect + github.com/duckdb/duckdb-go/v2 v2.10502.0 // indirect + github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/goccy/go-json v0.10.6 // indirect github.com/google/flatbuffers v25.12.19+incompatible // indirect github.com/klauspost/compress v1.18.5 // indirect diff --git a/go.sum b/go.sum index 883cf49..8ce088b 100644 --- a/go.sum +++ b/go.sum @@ -46,12 +46,28 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/duckdb/duckdb-go-bindings v0.10502.0 h1:Uhg/dfvPLQv4cH35lMD48hqUcdOh2Z7bcuykjr4qnOA= +github.com/duckdb/duckdb-go-bindings v0.10502.0/go.mod h1:8KF3oEKrmYdSbZnQ1BPTdxAZDHRaM1LEv+oBvL2nSLk= +github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0 h1:1GxSHSI1ef3sCdDVrJ9l8s6aTd7P1K788os9lHrs43g= +github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0/go.mod h1:EnAvZh1kNJHp5yF+M1ZHNEvapnmt6anq1xXHVrAGqMo= +github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0 h1:76gB6UiqKae6JptNiFLjwecD0oR87bXS5u6Lni9hSGI= +github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0/go.mod h1:IGLSeEcFhNeZF16aVjQCULD7TsFZKG5G7SyKJAXKp5c= +github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 h1:fcBKRy9keR5FLxppDD7ZjQ1EwqTRcA2kPLi2jWilPDw= +github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0/go.mod h1:KAIynZ0GHCS7X5fRyuFnQMg/SZBPK/bS9OCOVojClxw= +github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 h1:pUwDWLQZIkm/v5aoGIu2cTAsgGqratxklRwP9zzsmiU= +github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0/go.mod h1:81SGOYoEUs8qaAfSk1wRfM5oobrIJ5KI7AzYhK6/bvQ= +github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 h1:CDPf2ow6pP/9zYXfBdyT8a1GZ69eBWdMt5AhAsVgvyU= +github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0/go.mod h1:K25pJL26ARblGDeuAkrdblFvUen92+CwksLtPEHRqqQ= +github.com/duckdb/duckdb-go/v2 v2.10502.0 h1:YfdiBlXnlRdxIKu1AtBQSRI0/tGhOkIGshKq52+uA7A= +github.com/duckdb/duckdb-go/v2 v2.10502.0/go.mod h1:a/31wL2vx7dJ0isrO+E6o28DBQVaVOMbKxp2BsHTGp0= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= +github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/goccy/go-json v0.10.6 h1:p8HrPJzOakx/mn/bQtjgNjdTcN+/S6FcG2CTtQOrHVU= github.com/goccy/go-json v0.10.6/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= diff --git a/internal/queryd/db.go b/internal/queryd/db.go new file mode 100644 index 0000000..e4787f9 --- /dev/null +++ b/internal/queryd/db.go @@ -0,0 +1,140 @@ +// Package queryd is the SQL execution surface — a DuckDB engine that +// reads Parquet directly from S3-compatible storage (MinIO in dev) +// via DuckDB's httpfs extension. Views are registered from catalogd +// manifests so user SQL just references dataset names. +// +// db.go owns the *sql.DB lifecycle: a custom Connector with a +// bootstrapper that runs INSTALL httpfs / LOAD httpfs / CREATE OR +// REPLACE SECRET on every new connection, plus a SetMaxOpenConns(1) +// pin so the registrar's CREATE VIEWs and the handler's user SQL +// serialize through one connection (avoids cross-connection visibility +// edge cases for G0; lift to a pool when concurrency wins matter). +package queryd + +import ( + "context" + "database/sql" + "database/sql/driver" + "fmt" + "strings" + + "github.com/duckdb/duckdb-go/v2" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +// OpenDB returns a *sql.DB backed by an in-memory DuckDB whose +// connections are pre-loaded with httpfs + an S3 secret derived from +// the shared S3 config + secrets provider. The bucket parameter names +// the logical bucket whose credentials are pulled. +// +// Caller is responsible for Close()ing the returned db. +func OpenDB(ctx context.Context, s3 shared.S3Config, prov secrets.Provider, bucketLogicalName string) (*sql.DB, error) { + creds, err := prov.S3Credentials(bucketLogicalName) + if err != nil { + return nil, fmt.Errorf("queryd: secrets: %w", err) + } + + bootstrap := buildBootstrap(s3, creds) + // Stable labels per bootstrap statement. Per scrum B-LEAK (Kimi): + // the prior firstLine(stmt) truncated CREATE OR REPLACE SECRET + // to 80 chars, which contained both KEY_ID and the start of + // SECRET — a log aggregator would capture credentials. Using + // stable labels avoids putting the SQL into the error path at all. + labels := []string{"install httpfs", "load httpfs", "create secret"} + + connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error { + // Per scrum B-CTX (Opus): use Background() inside the bootstrap + // closure rather than capturing the OpenDB-call ctx. The + // connector callback runs on EVERY new physical connection, + // including reconnects long after OpenDB returned. A captured + // short-lived ctx would silently fail every reconnect's + // bootstrap. The passed ctx is only for the initial Ping below. + for i, stmt := range bootstrap { + if _, err := execer.ExecContext(context.Background(), stmt, nil); err != nil { + return fmt.Errorf("queryd bootstrap %s: %s", labels[i], redactCreds(err.Error(), creds)) + } + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("queryd: new connector: %w", err) + } + + db := sql.OpenDB(connector) + // One connection: the registrar's view CREATEs are visible to the + // query handler's SELECTs without worrying about MVCC/timing. + // Lift this for G2+ when concurrent reads matter. + db.SetMaxOpenConns(1) + if err := db.PingContext(ctx); err != nil { + _ = db.Close() + return nil, fmt.Errorf("queryd: ping: %w", err) + } + return db, nil +} + +// buildBootstrap returns the SQL statements that initialize a fresh +// DuckDB connection: install + load httpfs, then create-or-replace +// the unnamed S3 secret. Endpoint is normalized — DuckDB wants +// `host:port` without the scheme; the http/https split is controlled +// by USE_SSL. +func buildBootstrap(s3 shared.S3Config, creds secrets.S3Credentials) []string { + endpoint := s3.Endpoint + useSSL := true + switch { + case strings.HasPrefix(endpoint, "http://"): + endpoint = strings.TrimPrefix(endpoint, "http://") + useSSL = false + case strings.HasPrefix(endpoint, "https://"): + endpoint = strings.TrimPrefix(endpoint, "https://") + useSSL = true + } + urlStyle := "vhost" + if s3.UsePathStyle { + urlStyle = "path" + } + + // CREATE OR REPLACE so a reconnect doesn't error on the existing + // secret. Single-quoted SQL string literals; we escape ' → '' for + // belt-and-braces (creds shouldn't contain ' but a future SSO + // token might). + createSecret := fmt.Sprintf( + "CREATE OR REPLACE SECRET (TYPE S3, KEY_ID '%s', SECRET '%s', REGION '%s', ENDPOINT '%s', URL_STYLE '%s', USE_SSL %t)", + sqlEscape(creds.AccessKeyID), + sqlEscape(creds.SecretAccessKey), + sqlEscape(s3.Region), + sqlEscape(endpoint), + sqlEscape(urlStyle), + useSSL, + ) + + return []string{ + "INSTALL httpfs", + "LOAD httpfs", + createSecret, + } +} + +// sqlEscape doubles single quotes in a SQL string literal value. +// SQL identifier escaping (doubling " for quoted identifiers) lives +// next to its use site in registrar.go. +func sqlEscape(v string) string { + return strings.ReplaceAll(v, "'", "''") +} + +// redactCreds replaces known credential values inside an error +// message with placeholder tokens. DuckDB's parser/exec error may +// echo the offending statement; this scrubs the secret values +// regardless. Per scrum B-LEAK (Kimi): a 503 from a typo in CREATE +// SECRET would otherwise drop both KEY_ID and SECRET into the log +// pipeline. +func redactCreds(msg string, creds secrets.S3Credentials) string { + if creds.AccessKeyID != "" { + msg = strings.ReplaceAll(msg, creds.AccessKeyID, "[REDACTED-KEY]") + } + if creds.SecretAccessKey != "" { + msg = strings.ReplaceAll(msg, creds.SecretAccessKey, "[REDACTED-SECRET]") + } + return msg +} diff --git a/internal/queryd/registrar.go b/internal/queryd/registrar.go new file mode 100644 index 0000000..9067286 --- /dev/null +++ b/internal/queryd/registrar.go @@ -0,0 +1,181 @@ +// registrar.go — turn catalogd's manifest list into DuckDB views. +// +// Refresh() reads /catalog/list, runs CREATE OR REPLACE VIEW for +// every manifest, drops views for manifests that disappeared, and +// skips any whose updated_at hasn't changed since the prior refresh +// (per kickoff D5.3 spec — "don't re-CREATE on every request, +// Opus review flagged that as the perf cliff"). +// +// Identifier safety: all view names are double-quoted in SQL with +// internal " escaped to "" — catalogd accepts user-supplied dataset +// names that wouldn't pass SQL bare-identifier rules (hyphens, +// digit-leading, reserved words). Quoting makes them unambiguous. +package queryd + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" +) + +// CatalogLister is what registrar needs from the catalog client — +// only List, no Register. Defined as an interface so unit tests can +// inject a fake without spinning up a real catalogd. +type CatalogLister interface { + List(ctx context.Context) ([]*catalogd.Manifest, error) +} + +// Execer is the subset of *sql.DB that registrar actually uses. +// Same testability win — unit tests can record the SQL strings +// without booting DuckDB. *sql.DB satisfies this directly. +type Execer interface { + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) +} + +// Registrar holds the running view state and the dependencies needed +// to refresh it. +type Registrar struct { + exec Execer + catalog CatalogLister + bucket string + + // known tracks the current view names + the updated_at we last + // rebuilt them from. Used by Refresh to skip unchanged views and + // drop ones that disappeared from the catalog. + known map[string]time.Time +} + +// NewRegistrar builds a fresh registrar. bucket is the S3 bucket +// name used to construct s3:// URLs for read_parquet(). +func NewRegistrar(exec Execer, catalog CatalogLister, bucket string) *Registrar { + return &Registrar{ + exec: exec, + catalog: catalog, + bucket: bucket, + known: make(map[string]time.Time), + } +} + +// RefreshStats reports what Refresh did — for logs + tests. +type RefreshStats struct { + Created int + Updated int + Dropped int + Skipped int +} + +// Refresh syncs DuckDB's view catalog with catalogd's manifest list. +// Returns counts of {Created, Updated, Dropped, Skipped} so the +// caller can decide whether to log loudly. +// +// Per scrum C1 (Opus + Kimi convergent): drop pass runs BEFORE create +// pass so a failure in one view's CREATE doesn't block another view's +// DROP. Per-iteration errors are collected and the refresh continues +// — a single poison manifest must not block the whole catalog from +// re-syncing. The collected errors are joined into the return value; +// callers see "everything that happened to fail this round." +func (r *Registrar) Refresh(ctx context.Context) (RefreshStats, error) { + var stats RefreshStats + + manifests, err := r.catalog.List(ctx) + if err != nil { + return stats, fmt.Errorf("registrar list: %w", err) + } + + wantNames := make(map[string]struct{}, len(manifests)) + for _, m := range manifests { + wantNames[m.Name] = struct{}{} + } + + var errs []error + + // 1. Drop views whose manifests disappeared. Run first so the + // create pass can't block removals on a poison-create error. + for name := range r.known { + if _, still := wantNames[name]; still { + continue + } + if err := r.dropView(ctx, name); err != nil { + errs = append(errs, err) + continue + } + delete(r.known, name) + stats.Dropped++ + } + + // 2. Create / update views for current manifests. + for _, m := range manifests { + prior, exists := r.known[m.Name] + if exists && prior.Equal(m.UpdatedAt) { + stats.Skipped++ + continue + } + if err := r.createOrReplaceView(ctx, m); err != nil { + errs = append(errs, err) + continue + } + if exists { + stats.Updated++ + } else { + stats.Created++ + } + r.known[m.Name] = m.UpdatedAt + } + + if len(errs) > 0 { + return stats, errors.Join(errs...) + } + return stats, nil +} + +// createOrReplaceView builds the DuckDB SQL for one manifest and +// runs it. G0 manifests have exactly one Object per dataset; if more +// land later (G2 multi-part), `read_parquet([...])` accepts a list. +func (r *Registrar) createOrReplaceView(ctx context.Context, m *catalogd.Manifest) error { + if len(m.Objects) == 0 { + return fmt.Errorf("registrar: manifest %q has no objects", m.Name) + } + urls := make([]string, len(m.Objects)) + for i, obj := range m.Objects { + urls[i] = fmt.Sprintf("'%s'", sqlEscape(buildS3URL(r.bucket, obj.Key))) + } + var fromExpr string + if len(urls) == 1 { + fromExpr = "read_parquet(" + urls[0] + ")" + } else { + fromExpr = "read_parquet([" + strings.Join(urls, ", ") + "])" + } + sql := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s", + quoteIdent(m.Name), fromExpr) + if _, err := r.exec.ExecContext(ctx, sql); err != nil { + return fmt.Errorf("create view %q: %w", m.Name, err) + } + return nil +} + +func (r *Registrar) dropView(ctx context.Context, name string) error { + sql := "DROP VIEW IF EXISTS " + quoteIdent(name) + if _, err := r.exec.ExecContext(ctx, sql); err != nil { + return fmt.Errorf("drop view %q: %w", name, err) + } + return nil +} + +// quoteIdent wraps a SQL identifier in double quotes and escapes +// internal " by doubling. This is the SQL-standard rule — works in +// every engine that accepts quoted identifiers. +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` +} + +// buildS3URL composes the s3://bucket/key URL DuckDB's httpfs +// extension consumes. Keys aren't URL-escaped because read_parquet +// accepts S3-style literal paths. +func buildS3URL(bucket, key string) string { + return "s3://" + bucket + "/" + key +} diff --git a/internal/queryd/registrar_test.go b/internal/queryd/registrar_test.go new file mode 100644 index 0000000..5cf5509 --- /dev/null +++ b/internal/queryd/registrar_test.go @@ -0,0 +1,192 @@ +package queryd + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" +) + +// fakeExecer records every SQL string Exec'd against it. +type fakeExecer struct { + calls []string +} + +func (f *fakeExecer) ExecContext(_ context.Context, q string, _ ...any) (sql.Result, error) { + f.calls = append(f.calls, q) + return nil, nil +} + +// fakeCatalog returns a fixed manifest list and an optional error. +type fakeCatalog struct { + manifests []*catalogd.Manifest + err error +} + +func (f *fakeCatalog) List(_ context.Context) ([]*catalogd.Manifest, error) { + return f.manifests, f.err +} + +func mkManifest(name, fp string, updatedAt time.Time, key string) *catalogd.Manifest { + return &catalogd.Manifest{ + Name: name, + DatasetID: "id-" + name, + SchemaFingerprint: fp, + Objects: []catalogd.Object{{Key: key, Size: 1024}}, + CreatedAt: updatedAt, + UpdatedAt: updatedAt, + } +} + +func TestRefresh_CreatesViewsForNewManifests(t *testing.T) { + exec := &fakeExecer{} + cat := &fakeCatalog{manifests: []*catalogd.Manifest{ + mkManifest("workers", "fp1", time.Unix(100, 0), "datasets/workers/fp1.parquet"), + }} + r := NewRegistrar(exec, cat, "lakehouse-go-primary") + + stats, err := r.Refresh(context.Background()) + if err != nil { + t.Fatal(err) + } + if stats.Created != 1 || stats.Updated != 0 || stats.Dropped != 0 { + t.Errorf("stats: got %+v, want {Created:1}", stats) + } + if len(exec.calls) != 1 { + t.Fatalf("calls: got %d, want 1", len(exec.calls)) + } + got := exec.calls[0] + wantHas := []string{ + `CREATE OR REPLACE VIEW "workers"`, + `read_parquet('s3://lakehouse-go-primary/datasets/workers/fp1.parquet')`, + } + for _, w := range wantHas { + if !strings.Contains(got, w) { + t.Errorf("missing %q in %q", w, got) + } + } +} + +func TestRefresh_SkipsUnchangedUpdatedAt(t *testing.T) { + exec := &fakeExecer{} + t1 := time.Unix(100, 0) + cat := &fakeCatalog{manifests: []*catalogd.Manifest{ + mkManifest("workers", "fp1", t1, "k1"), + }} + r := NewRegistrar(exec, cat, "b") + + if _, err := r.Refresh(context.Background()); err != nil { + t.Fatal(err) + } + // Second refresh with same updated_at — should skip. + stats, err := r.Refresh(context.Background()) + if err != nil { + t.Fatal(err) + } + if stats.Skipped != 1 || stats.Created != 0 || stats.Updated != 0 { + t.Errorf("second refresh stats: got %+v, want {Skipped:1}", stats) + } + // Only the first refresh should have produced an Exec call. + if len(exec.calls) != 1 { + t.Errorf("calls: got %d, want 1 (skip should not Exec)", len(exec.calls)) + } +} + +func TestRefresh_RebuildsOnUpdatedAtBump(t *testing.T) { + exec := &fakeExecer{} + t1 := time.Unix(100, 0) + cat := &fakeCatalog{manifests: []*catalogd.Manifest{ + mkManifest("workers", "fp1", t1, "k1"), + }} + r := NewRegistrar(exec, cat, "b") + _, _ = r.Refresh(context.Background()) + + // Bump updated_at — same name + fp, but updated_at changed (idempotent re-register on ingestd). + t2 := time.Unix(200, 0) + cat.manifests[0].UpdatedAt = t2 + stats, err := r.Refresh(context.Background()) + if err != nil { + t.Fatal(err) + } + if stats.Updated != 1 { + t.Errorf("expected Updated:1 on updated_at bump, got %+v", stats) + } + if len(exec.calls) != 2 { + t.Errorf("calls: got %d, want 2 (initial + rebuild)", len(exec.calls)) + } +} + +func TestRefresh_DropsViewForRemovedManifest(t *testing.T) { + exec := &fakeExecer{} + t1 := time.Unix(100, 0) + cat := &fakeCatalog{manifests: []*catalogd.Manifest{ + mkManifest("workers", "fp1", t1, "k1"), + mkManifest("retired", "fp2", t1, "k2"), + }} + r := NewRegistrar(exec, cat, "b") + _, _ = r.Refresh(context.Background()) + + // "retired" disappears from the catalog → registrar should DROP its view. + cat.manifests = cat.manifests[:1] + stats, err := r.Refresh(context.Background()) + if err != nil { + t.Fatal(err) + } + if stats.Dropped != 1 { + t.Errorf("expected Dropped:1, got %+v", stats) + } + // Find the DROP VIEW call. + var found bool + for _, c := range exec.calls { + if strings.HasPrefix(c, `DROP VIEW IF EXISTS "retired"`) { + found = true + } + } + if !found { + t.Errorf("DROP VIEW not emitted; calls=%v", exec.calls) + } +} + +func TestRefresh_QuotesAndEscapesNames(t *testing.T) { + exec := &fakeExecer{} + weird := `my "weird" dataset` // contains internal double quotes + cat := &fakeCatalog{manifests: []*catalogd.Manifest{ + mkManifest(weird, "fp", time.Unix(1, 0), "k"), + }} + r := NewRegistrar(exec, cat, "b") + + if _, err := r.Refresh(context.Background()); err != nil { + t.Fatal(err) + } + got := exec.calls[0] + wantQuoted := `"my ""weird"" dataset"` + if !strings.Contains(got, wantQuoted) { + t.Errorf("identifier not properly quoted: got %q, want substring %q", got, wantQuoted) + } +} + +func TestRefresh_PropagatesCatalogError(t *testing.T) { + exec := &fakeExecer{} + cat := &fakeCatalog{err: errors.New("catalog down")} + r := NewRegistrar(exec, cat, "b") + _, err := r.Refresh(context.Background()) + if err == nil || !strings.Contains(err.Error(), "catalog down") { + t.Errorf("expected wrapped catalog error, got %v", err) + } +} + +func TestRefresh_ManifestWithNoObjectsErrors(t *testing.T) { + exec := &fakeExecer{} + m := mkManifest("empty", "fp", time.Unix(1, 0), "k") + m.Objects = nil // pathological — registrar shouldn't synthesize an empty SQL. + cat := &fakeCatalog{manifests: []*catalogd.Manifest{m}} + r := NewRegistrar(exec, cat, "b") + _, err := r.Refresh(context.Background()) + if err == nil || !strings.Contains(err.Error(), "no objects") { + t.Errorf("expected 'no objects' error, got %v", err) + } +} diff --git a/internal/shared/config.go b/internal/shared/config.go index c5bd718..a9455b2 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -23,7 +23,7 @@ type Config struct { Storaged ServiceConfig `toml:"storaged"` Catalogd CatalogConfig `toml:"catalogd"` Ingestd IngestConfig `toml:"ingestd"` - Queryd ServiceConfig `toml:"queryd"` + Queryd QuerydConfig `toml:"queryd"` S3 S3Config `toml:"s3"` Log LogConfig `toml:"log"` } @@ -37,6 +37,18 @@ type IngestConfig struct { CatalogdURL string `toml:"catalogd_url"` } +// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB +// directly to MinIO via DuckDB's httpfs extension (so no storaged +// URL needed), and reads the catalog over HTTP for view registration. +// SecretsPath defaults to /etc/lakehouse/secrets-go.toml — the same +// file storaged uses, since both services need the S3 credentials. +type QuerydConfig struct { + Bind string `toml:"bind"` + CatalogdURL string `toml:"catalogd_url"` + SecretsPath string `toml:"secrets_path"` + RefreshEvery string `toml:"refresh_every"` // duration string, e.g. "30s" +} + // CatalogConfig adds catalogd-specific knobs on top of the standard // bind. StoragedURL points at the storaged service for manifest // persistence; G0 defaults to the localhost bind. @@ -80,7 +92,12 @@ func DefaultConfig() Config { StoragedURL: "http://127.0.0.1:3211", CatalogdURL: "http://127.0.0.1:3212", }, - Queryd: ServiceConfig{Bind: "127.0.0.1:3214"}, + Queryd: QuerydConfig{ + Bind: "127.0.0.1:3214", + CatalogdURL: "http://127.0.0.1:3212", + SecretsPath: "/etc/lakehouse/secrets-go.toml", + RefreshEvery: "30s", + }, S3: S3Config{ Endpoint: "http://localhost:9000", Region: "us-east-1", diff --git a/lakehouse.toml b/lakehouse.toml index acab741..a8ae542 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -21,6 +21,9 @@ catalogd_url = "http://127.0.0.1:3212" [queryd] bind = "127.0.0.1:3214" +catalogd_url = "http://127.0.0.1:3212" +secrets_path = "/etc/lakehouse/secrets-go.toml" +refresh_every = "30s" [s3] endpoint = "http://localhost:9000" diff --git a/scripts/d5_smoke.sh b/scripts/d5_smoke.sh new file mode 100755 index 0000000..7dddff5 --- /dev/null +++ b/scripts/d5_smoke.sh @@ -0,0 +1,182 @@ +#!/usr/bin/env bash +# D5 smoke — proves the Day 5 acceptance gate end-to-end. +# +# Validates: +# - Ingest CSV via D4 → catalog manifest registered +# - queryd starts AFTER ingest; initial Refresh picks up the dataset +# - POST /sql with SELECT count(*) FROM → matches CSV row count +# - POST /sql with SELECT * FROM LIMIT 3 → rows have expected +# column names + types + values +# - Re-ingest same CSV → updated_at bumps → next Refresh re-creates +# the view (we restart queryd to verify the fresh-side path works) +# - Schema-drift CSV ingest → catalogd 409s → queryd's view stays +# pointing at the original (no DuckDB session corruption) +# +# Requires storaged + catalogd + ingestd + queryd binaries. MinIO at +# :9000 with bucket lakehouse-go-primary already created (D2/D4 setup). +# +# Usage: ./scripts/d5_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[d5-smoke] building all 4 backing services..." +go build -o bin/ ./cmd/storaged ./cmd/catalogd ./cmd/ingestd ./cmd/queryd + +# Cleanup any prior processes. +pkill -f "bin/storaged" 2>/dev/null || true +pkill -f "bin/catalogd" 2>/dev/null || true +pkill -f "bin/ingestd" 2>/dev/null || true +pkill -f "bin/queryd" 2>/dev/null || true +sleep 0.3 + +STORAGED_PID=""; CATALOGD_PID=""; INGESTD_PID=""; QUERYD_PID="" +TMP="$(mktemp -d)" +cleanup() { + echo "[d5-smoke] cleanup" + for p in $QUERYD_PID $INGESTD_PID $CATALOGD_PID $STORAGED_PID; do + [ -n "$p" ] && kill "$p" 2>/dev/null || true + done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +poll_health() { + local port="$1" deadline=$(($(date +%s) + 5)) + while [ "$(date +%s)" -lt "$deadline" ]; do + if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[d5-smoke] launching storaged → catalogd → ingestd..." +./bin/storaged > /tmp/storaged.log 2>&1 & +STORAGED_PID=$! +poll_health 3211 || { echo "storaged failed"; tail -10 /tmp/storaged.log; exit 1; } + +# Clean any prior catalog manifests + datasets so smoke starts fresh. +for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=_catalog/manifests/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do + curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true +done +for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=datasets/d5_workers/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do + curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true +done + +./bin/catalogd > /tmp/catalogd.log 2>&1 & +CATALOGD_PID=$! +poll_health 3212 || { echo "catalogd failed"; tail -10 /tmp/catalogd.log; exit 1; } + +./bin/ingestd > /tmp/ingestd.log 2>&1 & +INGESTD_PID=$! +poll_health 3213 || { echo "ingestd failed"; tail -10 /tmp/ingestd.log; exit 1; } + +# Build a small CSV (5 rows × 5 cols) — same shape as D4 smoke so the +# inferred types are familiar. +NAME="d5_workers" +cat > "$TMP/workers.csv" <<'EOF' +id,name,salary,active,weight +1,Alice,50000,true,165.5 +2,Bob,60000,false,180.0 +3,Carol,55000,true,135.2 +4,Dave,75000,false,200.0 +5,Eve,80000,true,150.5 +EOF + +echo "[d5-smoke] ingest 5-row CSV via D4 path:" +INGEST="$(curl -sS -X POST -F "file=@$TMP/workers.csv" "http://127.0.0.1:3213/ingest?name=$NAME")" +RC="$(echo "$INGEST" | jq -r '.row_count')" +if [ "$RC" = "5" ]; then + echo " ✓ ingest row_count=5" +else + echo " ✗ ingest → $INGEST"; exit 1 +fi + +# Launch queryd LAST — its initial Refresh picks up the dataset. +echo "[d5-smoke] launching queryd (initial Refresh picks up $NAME)..." +./bin/queryd > /tmp/queryd.log 2>&1 & +QUERYD_PID=$! +poll_health 3214 || { echo "queryd failed"; tail -20 /tmp/queryd.log; exit 1; } + +FAILED=0 + +echo "[d5-smoke] POST /sql SELECT count(*) FROM $NAME:" +COUNT_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \ + -H 'Content-Type: application/json' \ + -d "{\"sql\":\"SELECT count(*) AS n FROM \\\"$NAME\\\"\"}")" +N="$(echo "$COUNT_RESP" | jq -r '.rows[0][0]')" +if [ "$N" = "5" ]; then + echo " ✓ count(*)=5" +else + echo " ✗ count → $COUNT_RESP" + echo " queryd log tail:"; tail -20 /tmp/queryd.log + FAILED=1 +fi + +echo "[d5-smoke] POST /sql SELECT * FROM $NAME LIMIT 3:" +ROWS_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \ + -H 'Content-Type: application/json' \ + -d "{\"sql\":\"SELECT id, name, salary FROM \\\"$NAME\\\" ORDER BY id LIMIT 3\"}")" +ROW_COUNT="$(echo "$ROWS_RESP" | jq -r '.row_count')" +COL0_NAME="$(echo "$ROWS_RESP" | jq -r '.columns[0].name')" +COL1_NAME="$(echo "$ROWS_RESP" | jq -r '.columns[1].name')" +ROW0_ID="$(echo "$ROWS_RESP" | jq -r '.rows[0][0]')" +ROW0_NAME="$(echo "$ROWS_RESP" | jq -r '.rows[0][1]')" +if [ "$ROW_COUNT" = "3" ] && [ "$COL0_NAME" = "id" ] && [ "$COL1_NAME" = "name" ] \ + && [ "$ROW0_ID" = "1" ] && [ "$ROW0_NAME" = "Alice" ]; then + echo " ✓ rows[0] = (id=1, name=Alice), columns=[id, name, salary]" +else + echo " ✗ rows → $ROWS_RESP" + FAILED=1 +fi + +echo "[d5-smoke] schema-drift ingest 409s; existing view still queries:" +cat > "$TMP/workers_drift.csv" <<'EOF' +user_id,name,salary,active,weight +99,Mallory,99999,true,99.9 +EOF +HTTP="$(curl -sS -o "$TMP/conflict.out" -w '%{http_code}' -X POST \ + -F "file=@$TMP/workers_drift.csv" "http://127.0.0.1:3213/ingest?name=$NAME")" +if [ "$HTTP" = "409" ]; then + echo " ✓ drift → 409" +else + echo " ✗ drift → $HTTP"; FAILED=1 +fi +# Verify the original view is still queryable. +COUNT_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \ + -H 'Content-Type: application/json' \ + -d "{\"sql\":\"SELECT count(*) FROM \\\"$NAME\\\"\"}")" +N="$(echo "$COUNT_RESP" | jq -r '.rows[0][0]')" +if [ "$N" = "5" ]; then + echo " ✓ post-drift count(*)=5 (view unchanged)" +else + echo " ✗ post-drift count → $COUNT_RESP" + FAILED=1 +fi + +echo "[d5-smoke] error path: SELECT FROM nonexistent → 400:" +HTTP="$(curl -sS -o "$TMP/err.out" -w '%{http_code}' -X POST http://127.0.0.1:3214/sql \ + -H 'Content-Type: application/json' \ + -d '{"sql":"SELECT * FROM no_such_table"}')" +if [ "$HTTP" = "400" ]; then + echo " ✓ unknown table → 400" +else + echo " ✗ unknown table → $HTTP body=$(cat "$TMP/err.out")" + FAILED=1 +fi + +# Cleanup smoke artifacts. +for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=datasets/$NAME/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do + curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true +done +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_catalog/manifests/$NAME.parquet" || true + +if [ "$FAILED" -eq 0 ]; then + echo "[d5-smoke] D5 acceptance gate: PASSED" + exit 0 +else + echo "[d5-smoke] D5 acceptance gate: FAILED" + exit 1 +fi