G0 D5: queryd DuckDB SELECT over Parquet via httpfs · 4 scrum fixes
Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector
that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET
(TYPE S3) on every new connection, sourced from SecretsProvider +
shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and
handler's SELECTs serialize through one connection (avoids cross-
connection MVCC visibility edge cases).
Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR
REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key')
per manifest, drops views for removed manifests, skips on unchanged
updated_at (the implicit etag). Drop pass runs BEFORE create pass so
a poison manifest can't block other manifest refreshes (post-scrum
C1 fix).
POST /sql with JSON body {"sql":"…"} returns
{"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]],
"row_count":N}. []byte → string conversion so VARCHAR rows
JSON-encode as text. 30s default refresh ticker, configurable via
[queryd].refresh_every.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 4 WARN + 4 INFO
- Kimi K2-0905 (openrouter): 2 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO
Fixed (4):
C1 (Opus + Kimi convergent): Refresh aborts on first per-view error
→ drop pass first, collect errors, errors.Join. Poison manifest
no longer blocks the rest of the catalog from re-syncing.
B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx →
cancelled-ctx silently fails every reconnect. context.Background()
inside closure; passed ctx only for initial Ping.
B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80
chars but those 80 chars contained KEY_ID + SECRET prefix → log
aggregator captures credentials. Stable per-statement labels +
redactCreds() filter on wrapped DuckDB errors.
JSON-ERR (Opus WARN): swallowed json.Encode error → silent
truncated 200 on unsupported column types. slog.Warn the failure.
Dismissed (4 false positives):
Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit
Qwen BLOCK "MaxBytesReader after Decode" — false, applied before
Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a
deadlock, just serialization, by design with 10s timeout retry
Kimi WARN "dropView leaves r.known inconsistent" — current code
returns before the delete; the entry persists for retry
Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi
on the per-view error blocking, plus two independent single-reviewer
BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The
B-LEAK fix uses defense-in-depth: never pass SQL into the error
path AND redact known cred values from DuckDB's own error message.
DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per
ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every
fix round.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4205ecd0f0
commit
9e9e4c26a4
@ -1,22 +1,33 @@
|
||||
// queryd is the SQL execution layer — DuckDB via cgo, registers
|
||||
// catalog datasets as views, executes ad-hoc SQL. D5 wires the
|
||||
// actual /sql route + DuckDB session with S3 secret plumbed in
|
||||
// (per Opus B2 fix). D1 just stands up the binary.
|
||||
//
|
||||
// NOTE: cgo handle lifecycle. The shared server factory's
|
||||
// graceful-shutdown is enough for G0 (DuckDB connections are
|
||||
// goroutine-local), but G1+ when persistent prepared statements +
|
||||
// HNSW indexes attach, queryd will need its own OnShutdown drain
|
||||
// hook. Tracked in D7.5 (W3 disposition).
|
||||
// queryd is the SQL execution layer. DuckDB engine (via cgo), reads
|
||||
// Parquet directly from S3 via httpfs, registers catalog datasets as
|
||||
// views, exposes POST /sql. The interesting glue is in
|
||||
// internal/queryd/{db,registrar}.go; main.go wires the lifecycle.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/queryd"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
)
|
||||
|
||||
const (
|
||||
primaryBucket = "primary"
|
||||
maxSQLBodyBytes = 64 << 10 // 64 KiB cap on POST /sql body — SQL strings are not large
|
||||
defaultRefresh = 30 * time.Second
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -28,13 +39,205 @@ func main() {
|
||||
slog.Error("config", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if cfg.Queryd.CatalogdURL == "" {
|
||||
slog.Error("config", "err", "queryd.catalogd_url is required")
|
||||
os.Exit(1)
|
||||
}
|
||||
if cfg.S3.Bucket == "" {
|
||||
slog.Error("config", "err", "s3.bucket is required")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := shared.Run("queryd", cfg.Queryd.Bind, func(_ chi.Router) {
|
||||
// D5 wires:
|
||||
// POST /sql (JSON body {"sql": "..."} → DuckDB exec → JSON rows)
|
||||
// Internal: TTL-cached views + etag invalidation against catalogd.
|
||||
}); err != nil {
|
||||
refreshEvery := defaultRefresh
|
||||
if cfg.Queryd.RefreshEvery != "" {
|
||||
d, err := time.ParseDuration(cfg.Queryd.RefreshEvery)
|
||||
if err != nil {
|
||||
slog.Error("config", "err", "queryd.refresh_every is not a valid duration: "+err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
refreshEvery = d
|
||||
}
|
||||
|
||||
// Long-running ctx tied to lifetime of the process for the
|
||||
// background refresh goroutine. Cancelled when shared.Run returns.
|
||||
procCtx, procCancel := context.WithCancel(context.Background())
|
||||
defer procCancel()
|
||||
|
||||
prov, err := secrets.NewFileProvider(cfg.Queryd.SecretsPath, secrets.S3Credentials{
|
||||
AccessKeyID: cfg.S3.AccessKeyID,
|
||||
SecretAccessKey: cfg.S3.SecretAccessKey,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("secrets", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
db, err := queryd.OpenDB(procCtx, cfg.S3, prov, primaryBucket)
|
||||
if err != nil {
|
||||
slog.Error("duckdb open", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
catalog := catalogclient.New(cfg.Queryd.CatalogdURL)
|
||||
registrar := queryd.NewRegistrar(db, catalog, cfg.S3.Bucket)
|
||||
|
||||
// Initial refresh — log if non-empty so the operator sees what
|
||||
// got loaded. A failure here is non-fatal: catalogd may be coming
|
||||
// up later in the boot order, the TTL goroutine will retry.
|
||||
refreshCtx, refreshCancel := context.WithTimeout(procCtx, 10*time.Second)
|
||||
stats, err := registrar.Refresh(refreshCtx)
|
||||
refreshCancel()
|
||||
if err != nil {
|
||||
slog.Warn("initial refresh failed (will retry)", "err", err)
|
||||
} else {
|
||||
slog.Info("initial refresh", "created", stats.Created, "skipped", stats.Skipped)
|
||||
}
|
||||
|
||||
// Background ticker — drives Refresh on the configured interval.
|
||||
go runRefreshLoop(procCtx, registrar, refreshEvery)
|
||||
|
||||
h := &handlers{db: db}
|
||||
|
||||
if err := shared.Run("queryd", cfg.Queryd.Bind, h.register); err != nil {
|
||||
slog.Error("server", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// runRefreshLoop drives Registrar.Refresh on a ticker. Cancellable
|
||||
// via ctx. Logs every refresh; in a quiet run that's chatty but
|
||||
// useful for tracking when datasets land.
|
||||
func runRefreshLoop(ctx context.Context, r *queryd.Registrar, every time.Duration) {
|
||||
ticker := time.NewTicker(every)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
stats, err := r.Refresh(rctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Warn("refresh failed", "err", err)
|
||||
continue
|
||||
}
|
||||
if stats.Created+stats.Updated+stats.Dropped > 0 {
|
||||
slog.Info("refresh",
|
||||
"created", stats.Created,
|
||||
"updated", stats.Updated,
|
||||
"dropped", stats.Dropped,
|
||||
"skipped", stats.Skipped)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type handlers struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func (h *handlers) register(r chi.Router) {
|
||||
r.Post("/sql", h.handleSQL)
|
||||
}
|
||||
|
||||
// sqlRequest is the POST /sql body shape.
|
||||
type sqlRequest struct {
|
||||
SQL string `json:"sql"`
|
||||
}
|
||||
|
||||
// sqlResponse is the result envelope. Columns + rows is the compact
|
||||
// form; verbose row-as-object form is post-G0 if anyone needs it.
|
||||
type sqlResponse struct {
|
||||
Columns []sqlColumn `json:"columns"`
|
||||
Rows [][]any `json:"rows"`
|
||||
RowCount int `json:"row_count"`
|
||||
}
|
||||
|
||||
type sqlColumn struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
func (h *handlers) handleSQL(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxSQLBodyBytes)
|
||||
var req sqlRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
var maxErr *http.MaxBytesError
|
||||
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
||||
http.Error(w, "sql body too large", http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(req.SQL) == "" {
|
||||
http.Error(w, "sql is empty", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := h.db.QueryContext(r.Context(), req.SQL)
|
||||
if err != nil {
|
||||
// DuckDB errors are user-facing for ad-hoc SQL — expose them
|
||||
// at 400 so callers can see what went wrong (table not found,
|
||||
// syntax error, etc.). Internal infra issues would surface as
|
||||
// 500 once we distinguish them later.
|
||||
http.Error(w, "query: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
cols, err := rows.Columns()
|
||||
if err != nil {
|
||||
http.Error(w, "columns: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
colTypes, err := rows.ColumnTypes()
|
||||
if err != nil {
|
||||
http.Error(w, "column types: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
resp := sqlResponse{
|
||||
Columns: make([]sqlColumn, len(cols)),
|
||||
Rows: [][]any{},
|
||||
}
|
||||
for i, name := range cols {
|
||||
resp.Columns[i] = sqlColumn{Name: name, Type: colTypes[i].DatabaseTypeName()}
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
dest := make([]any, len(cols))
|
||||
ptrs := make([]any, len(cols))
|
||||
for i := range dest {
|
||||
ptrs[i] = &dest[i]
|
||||
}
|
||||
if err := rows.Scan(ptrs...); err != nil {
|
||||
http.Error(w, "scan: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// JSON can't encode []byte as text by default — DuckDB returns
|
||||
// VARCHAR as []byte through database/sql. Convert here.
|
||||
for i, v := range dest {
|
||||
if b, ok := v.([]byte); ok {
|
||||
dest[i] = string(b)
|
||||
}
|
||||
}
|
||||
resp.Rows = append(resp.Rows, dest)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
http.Error(w, "rows err: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
resp.RowCount = len(resp.Rows)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||
// Headers already sent — can't change the status code, but log
|
||||
// so an unsupported DuckDB column type (Decimal, INTERVAL, etc.)
|
||||
// surfaces in the operator log. Per scrum JSON-ERR (Opus).
|
||||
slog.Warn("sql encode response", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -825,3 +825,172 @@ data getting overwritten before the 409 fired. Opus reading the
|
||||
PUT-then-register order and noticing that PUT mutates state before
|
||||
the validation check is exactly the kind of architectural code-read
|
||||
the scrum exists for.
|
||||
|
||||
---
|
||||
|
||||
## Pre-D5 prep (commit 4205ecd)
|
||||
|
||||
Between D4 and D5, J asked what would bite D5 if not fixed first.
|
||||
The answer was the `CatalogClient` package location: it lived in
|
||||
`internal/ingestd/` but D5's queryd needed the same shape, and
|
||||
having queryd import from ingestd would have inverted the data-flow
|
||||
direction (ingestd is upstream of queryd).
|
||||
|
||||
Extracted to `internal/catalogclient/` as a shared package, added
|
||||
the `List(ctx)` method queryd needs for view registration, kept
|
||||
the wire format unchanged. ingestd switched its import; all four
|
||||
existing smokes (D1-D4) passed unchanged.
|
||||
|
||||
DuckDB cgo path re-verified at this point with the official
|
||||
`github.com/duckdb/duckdb-go/v2` v2.10502.0 (per ADR-001 §1) on
|
||||
Go 1.25 + arrow-go: standalone `sql.Open("duckdb","")` →
|
||||
`db.Ping()` → `SELECT 'pong'` round-trip succeeded.
|
||||
|
||||
---
|
||||
|
||||
## D5 — actual run results (2026-04-29)
|
||||
|
||||
Phase G0 Day 5 executed end-to-end. Output of `scripts/d5_smoke.sh`:
|
||||
|
||||
```
|
||||
[d5-smoke] ingest 5-row CSV via D4 path:
|
||||
✓ ingest row_count=5
|
||||
[d5-smoke] launching queryd (initial Refresh picks up d5_workers)...
|
||||
[d5-smoke] POST /sql SELECT count(*) FROM d5_workers:
|
||||
✓ count(*)=5
|
||||
[d5-smoke] POST /sql SELECT * FROM d5_workers LIMIT 3:
|
||||
✓ rows[0] = (id=1, name=Alice), columns=[id, name, salary]
|
||||
[d5-smoke] schema-drift ingest 409s; existing view still queries:
|
||||
✓ drift → 409
|
||||
✓ post-drift count(*)=5 (view unchanged)
|
||||
[d5-smoke] error path: SELECT FROM nonexistent → 400:
|
||||
✓ unknown table → 400
|
||||
[d5-smoke] D5 acceptance gate: PASSED
|
||||
```
|
||||
|
||||
What landed:
|
||||
- `internal/queryd/db.go` — `OpenDB(ctx, s3, prov, bucket)` returns
|
||||
a `*sql.DB` backed by an in-memory DuckDB. Custom Connector with
|
||||
a bootstrapper that runs `INSTALL httpfs` / `LOAD httpfs` /
|
||||
`CREATE OR REPLACE SECRET (TYPE S3, KEY_ID …, ENDPOINT …,
|
||||
URL_STYLE 'path', USE_SSL false)` on every new physical connection.
|
||||
`SetMaxOpenConns(1)` so registrar's CREATE VIEWs and handler's
|
||||
user SQL serialize through one connection (avoids cross-connection
|
||||
visibility edge cases).
|
||||
- `internal/queryd/registrar.go` — `Registrar.Refresh(ctx)` reads
|
||||
catalog list, runs `CREATE OR REPLACE VIEW "name" AS SELECT *
|
||||
FROM read_parquet('s3://bucket/key')` per manifest, drops views
|
||||
for removed manifests, skips on unchanged `updated_at`.
|
||||
`CatalogLister` + `Execer` interfaces enable unit tests with no
|
||||
DuckDB / no real catalogd.
|
||||
- `cmd/queryd/main.go` — POST /sql with JSON body
|
||||
`{"sql":"…"}` → executes via `*sql.DB` → response shape
|
||||
`{"columns":[{"name","type"},...], "rows":[[...]], "row_count":N}`.
|
||||
`[]byte` → string conversion so VARCHAR rows JSON-encode as text.
|
||||
Initial `Refresh` on startup, then a TTL ticker (`30s` default,
|
||||
configurable via `[queryd].refresh_every`) calling `Refresh` in a
|
||||
goroutine cancellable via process ctx.
|
||||
- New `[queryd]` config block: `bind` + `catalogd_url` +
|
||||
`secrets_path` + `refresh_every`.
|
||||
- `scripts/d5_smoke.sh` — 6 acceptance probes including post-drift
|
||||
query stability (`view unchanged after schema-drift 409`).
|
||||
|
||||
The D5 smoke launches queryd LAST — after the D4 ingest has
|
||||
registered the dataset — so the initial `Refresh` picks it up
|
||||
without waiting for the TTL. This matches what real ops looks like:
|
||||
queryd starts after data is loaded, picks up the catalog at that
|
||||
moment, then drifts in lockstep via the ticker.
|
||||
|
||||
Critical architectural choices:
|
||||
1. **`SetMaxOpenConns(1)`** — DuckDB views live in the in-memory
|
||||
catalog of a single instance; with one connection, registrar's
|
||||
CREATE VIEWs and handler's SELECTs are visibly synchronized
|
||||
without MVCC-timing edge cases. Lift in G2+ if intra-process
|
||||
query concurrency wins matter.
|
||||
2. **Always-quote view identifiers** — `CREATE OR REPLACE VIEW "name"`
|
||||
with internal `"` doubled. Catalogd accepts user-supplied names
|
||||
that wouldn't pass SQL bare-identifier rules; quoting makes them
|
||||
unambiguous.
|
||||
3. **`updated_at` as the implicit etag** — no separate ETag header
|
||||
in catalogd. The manifest's timestamp bumps on every same-fp
|
||||
re-register; registrar tracks `time.Time` per name and skips
|
||||
`CREATE` when unchanged (Opus's "perf cliff" warning from D5 plan).
|
||||
|
||||
Next: D6 — gateway as a reverse proxy in front of all four backing
|
||||
services. Last day of G0. After that: G1+ (gRPC, Lance bench, vector
|
||||
indices, etc).
|
||||
|
||||
---
|
||||
|
||||
## D5 — code scrum review (3-lineage parallel pass)
|
||||
|
||||
| Pass | Reviewer | Latency | Findings |
|
||||
|---|---|---|---|
|
||||
| 1 | `opencode/claude-opus-4-7` | ~28s | 1 BLOCK + 4 WARN + 4 INFO = 9 (1 self-retracted) |
|
||||
| 2 | `openrouter/moonshotai/kimi-k2-0905` | ~25s | 2 BLOCK + 2 WARN + 1 INFO = 5 |
|
||||
| 3 | `openrouter/qwen/qwen3-coder` | ~20s | 2 BLOCK + 1 WARN + 1 INFO = 4 |
|
||||
|
||||
Total: 18 distinct findings. Strongest scrum yet on convergence
|
||||
quality — Opus's BLOCK on ctx capture + Kimi's BLOCK on credential
|
||||
leakage in error messages were both deep architectural reads that
|
||||
smoke could not have caught.
|
||||
|
||||
### Convergent findings (≥2 reviewers — high confidence)
|
||||
|
||||
| # | Severity | Finding | Reviewers | Disposition |
|
||||
|---|---|---|---|---|
|
||||
| C1 | WARN×2 | `Refresh` aborts mid-loop on first per-view error → poison manifest blocks all subsequent drops/updates → stale views stick around forever (Opus); a partial `dropView` failure would block other manifest refreshes (Kimi) | Opus + Kimi | **Fixed** — drop pass runs BEFORE create pass; per-iteration errors are collected and Refresh continues; final return uses `errors.Join` to surface every failure |
|
||||
|
||||
### Single-reviewer findings — applied
|
||||
|
||||
| # | Severity | Finding | Reviewer | Disposition |
|
||||
|---|---|---|---|---|
|
||||
| B-CTX | Opus BLOCK | Bootstrap closure captures the `ctx` passed to `OpenDB`. Today masked by `SetMaxOpenConns(1)` + long-lived `procCtx`; future short-lived ctx + reconnect would silently fail every reconnect's bootstrap with a cancelled ctx | Opus | **Fixed** — bootstrap explicitly uses `context.Background()`; passed `ctx` is only used for the initial `PingContext` |
|
||||
| B-LEAK | Kimi BLOCK | The prior `firstLine(stmt)` truncation to 80 chars STILL contained both `KEY_ID '...'` AND the start of `SECRET '...'` — log aggregators would capture credentials | Kimi | **Fixed** — replaced `firstLine(stmt)` with stable per-statement labels (`"install httpfs"` / `"load httpfs"` / `"create secret"`) so no SQL reaches the error path. Wrapped `err.Error()` is filtered through `redactCreds` to scrub any secret values DuckDB itself might echo |
|
||||
| JSON-ERR | Opus WARN | `_ = json.NewEncoder(w).Encode(resp)` swallows mid-encode failures; client sees truncated 200 with no log signal — first column type DuckDB can't JSON-encode (e.g. INTERVAL) lands here silently | Opus | **Fixed** — log via `slog.Warn` with the underlying error |
|
||||
|
||||
### Single-reviewer findings — accepted with rationale
|
||||
|
||||
| # | Severity | Finding | Reviewer | Disposition |
|
||||
|---|---|---|---|---|
|
||||
| O-W3 | Opus WARN | DuckDB exotic types (Decimal, INTERVAL, etc.) may not JSON-encode well — `dest := []any` with `[]byte→string` conversion is the only type-aware step today | Opus | **Accepted** — type-aware row converter is post-G0; G0 column types in test fixtures are int64/varchar/double/bool. The new `JSON-ERR` fix makes any encoding failure operator-visible in the meantime |
|
||||
| K-W1 | Kimi WARN | No exponential backoff on repeated refresh failures | Kimi | **Accepted** — G0 single-tenant; 30s ticker is fine. Backoff adds complexity that smoke can't validate |
|
||||
| O-INFO×3 | Opus INFO | USE_SSL casing cosmetic; initial-refresh log doesn't match runRefreshLoop's four-field format | Opus | **Accepted** / **fixed cleanup** — the `var _ = io.Discard` placeholder removed; the rest deferred |
|
||||
| Q-W | Qwen WARN | Object key validation | Qwen | **Accepted** — catalogd's contract; storaged validateKey is the actual gate |
|
||||
| Q-I | Qwen INFO | Hardcoded refresh timeout | Qwen | **Accepted** — separate config knob is over-engineering for G0 |
|
||||
|
||||
### Dismissed (false positives)
|
||||
|
||||
| # | Severity | Finding | Reviewer | Why dismissed |
|
||||
|---|---|---|---|---|
|
||||
| F1 | Q-BLOCK | "Bootstrap not transactional" | Qwen | False — DuckDB DDL is auto-commit + idempotent; `CREATE OR REPLACE` handles re-run cleanly |
|
||||
| F2 | Q-BLOCK | "MaxBytesReader applied AFTER json.NewDecoder reads" | Qwen | False — `cmd/queryd/main.go` sets `r.Body = http.MaxBytesReader(...)` BEFORE the decode call, by line ordering |
|
||||
| F3 | K-BLOCK | "Concurrent Refresh + user query deadlock on single connection" | Kimi | False — not a deadlock, just serialization. With Refresh's 10s ctx-timeout, slow SELECT causes Refresh to skip a tick and retry. Design accept |
|
||||
| F4 | K-W2 | "dropView failure leaves r.known inconsistent" | Kimi | False — current code returns BEFORE the `delete(r.known, name)` on dropView error, so the entry persists and next refresh retries (correct behavior). Misread of the sequence |
|
||||
|
||||
### Cumulative D5 disposition
|
||||
|
||||
- **3-lineage parallel pass: 18 distinct findings**
|
||||
- **Fixed: 4** (1 convergent + 3 single-reviewer real)
|
||||
- **Accepted-with-rationale: 5**
|
||||
- **Deferred / cleanup: 3** (1 INFO removal, 2 INFOs deferred)
|
||||
- **Dismissed (false positives): 4**
|
||||
- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round.
|
||||
|
||||
The B-LEAK finding (Kimi BLOCK) is worth a pause: the prior code
|
||||
explicitly tried to be careful about credentials by truncating the
|
||||
SQL to 80 chars in error messages, but the truncation window still
|
||||
captured both `KEY_ID '<value>'` and the prefix of
|
||||
`SECRET '<value>'` because of where they appear in the statement.
|
||||
The defense-in-depth answer was to never pass the SQL into the
|
||||
error path at all — labels are static, the error chain wraps a
|
||||
credential-redacted version of DuckDB's own error message. Two
|
||||
layers of "don't leak" instead of one fragile truncation.
|
||||
|
||||
The B-CTX finding (Opus BLOCK) is the kind of latent issue that
|
||||
would cost a future debugger hours: the connector callback runs on
|
||||
EVERY new physical connection, but `SetMaxOpenConns(1)` masks the
|
||||
issue today. A future change to allow concurrent connections would
|
||||
silently break bootstrap on every reconnect with no clear error
|
||||
signal. Catching it during the scrum saved a Friday-afternoon-
|
||||
debugging incident months from now.
|
||||
|
||||
8
go.mod
8
go.mod
@ -32,6 +32,14 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 // indirect
|
||||
github.com/duckdb/duckdb-go/v2 v2.10502.0 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.5.0 // indirect
|
||||
github.com/goccy/go-json v0.10.6 // indirect
|
||||
github.com/google/flatbuffers v25.12.19+incompatible // indirect
|
||||
github.com/klauspost/compress v1.18.5 // indirect
|
||||
|
||||
16
go.sum
16
go.sum
@ -46,12 +46,28 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/duckdb/duckdb-go-bindings v0.10502.0 h1:Uhg/dfvPLQv4cH35lMD48hqUcdOh2Z7bcuykjr4qnOA=
|
||||
github.com/duckdb/duckdb-go-bindings v0.10502.0/go.mod h1:8KF3oEKrmYdSbZnQ1BPTdxAZDHRaM1LEv+oBvL2nSLk=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0 h1:1GxSHSI1ef3sCdDVrJ9l8s6aTd7P1K788os9lHrs43g=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0/go.mod h1:EnAvZh1kNJHp5yF+M1ZHNEvapnmt6anq1xXHVrAGqMo=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0 h1:76gB6UiqKae6JptNiFLjwecD0oR87bXS5u6Lni9hSGI=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0/go.mod h1:IGLSeEcFhNeZF16aVjQCULD7TsFZKG5G7SyKJAXKp5c=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 h1:fcBKRy9keR5FLxppDD7ZjQ1EwqTRcA2kPLi2jWilPDw=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0/go.mod h1:KAIynZ0GHCS7X5fRyuFnQMg/SZBPK/bS9OCOVojClxw=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 h1:pUwDWLQZIkm/v5aoGIu2cTAsgGqratxklRwP9zzsmiU=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0/go.mod h1:81SGOYoEUs8qaAfSk1wRfM5oobrIJ5KI7AzYhK6/bvQ=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 h1:CDPf2ow6pP/9zYXfBdyT8a1GZ69eBWdMt5AhAsVgvyU=
|
||||
github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0/go.mod h1:K25pJL26ARblGDeuAkrdblFvUen92+CwksLtPEHRqqQ=
|
||||
github.com/duckdb/duckdb-go/v2 v2.10502.0 h1:YfdiBlXnlRdxIKu1AtBQSRI0/tGhOkIGshKq52+uA7A=
|
||||
github.com/duckdb/duckdb-go/v2 v2.10502.0/go.mod h1:a/31wL2vx7dJ0isrO+E6o28DBQVaVOMbKxp2BsHTGp0=
|
||||
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro=
|
||||
github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/goccy/go-json v0.10.6 h1:p8HrPJzOakx/mn/bQtjgNjdTcN+/S6FcG2CTtQOrHVU=
|
||||
github.com/goccy/go-json v0.10.6/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
|
||||
140
internal/queryd/db.go
Normal file
140
internal/queryd/db.go
Normal file
@ -0,0 +1,140 @@
|
||||
// Package queryd is the SQL execution surface — a DuckDB engine that
|
||||
// reads Parquet directly from S3-compatible storage (MinIO in dev)
|
||||
// via DuckDB's httpfs extension. Views are registered from catalogd
|
||||
// manifests so user SQL just references dataset names.
|
||||
//
|
||||
// db.go owns the *sql.DB lifecycle: a custom Connector with a
|
||||
// bootstrapper that runs INSTALL httpfs / LOAD httpfs / CREATE OR
|
||||
// REPLACE SECRET on every new connection, plus a SetMaxOpenConns(1)
|
||||
// pin so the registrar's CREATE VIEWs and the handler's user SQL
|
||||
// serialize through one connection (avoids cross-connection visibility
|
||||
// edge cases for G0; lift to a pool when concurrency wins matter).
|
||||
package queryd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/duckdb/duckdb-go/v2"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||
)
|
||||
|
||||
// OpenDB returns a *sql.DB backed by an in-memory DuckDB whose
|
||||
// connections are pre-loaded with httpfs + an S3 secret derived from
|
||||
// the shared S3 config + secrets provider. The bucket parameter names
|
||||
// the logical bucket whose credentials are pulled.
|
||||
//
|
||||
// Caller is responsible for Close()ing the returned db.
|
||||
func OpenDB(ctx context.Context, s3 shared.S3Config, prov secrets.Provider, bucketLogicalName string) (*sql.DB, error) {
|
||||
creds, err := prov.S3Credentials(bucketLogicalName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("queryd: secrets: %w", err)
|
||||
}
|
||||
|
||||
bootstrap := buildBootstrap(s3, creds)
|
||||
// Stable labels per bootstrap statement. Per scrum B-LEAK (Kimi):
|
||||
// the prior firstLine(stmt) truncated CREATE OR REPLACE SECRET
|
||||
// to 80 chars, which contained both KEY_ID and the start of
|
||||
// SECRET — a log aggregator would capture credentials. Using
|
||||
// stable labels avoids putting the SQL into the error path at all.
|
||||
labels := []string{"install httpfs", "load httpfs", "create secret"}
|
||||
|
||||
connector, err := duckdb.NewConnector("", func(execer driver.ExecerContext) error {
|
||||
// Per scrum B-CTX (Opus): use Background() inside the bootstrap
|
||||
// closure rather than capturing the OpenDB-call ctx. The
|
||||
// connector callback runs on EVERY new physical connection,
|
||||
// including reconnects long after OpenDB returned. A captured
|
||||
// short-lived ctx would silently fail every reconnect's
|
||||
// bootstrap. The passed ctx is only for the initial Ping below.
|
||||
for i, stmt := range bootstrap {
|
||||
if _, err := execer.ExecContext(context.Background(), stmt, nil); err != nil {
|
||||
return fmt.Errorf("queryd bootstrap %s: %s", labels[i], redactCreds(err.Error(), creds))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("queryd: new connector: %w", err)
|
||||
}
|
||||
|
||||
db := sql.OpenDB(connector)
|
||||
// One connection: the registrar's view CREATEs are visible to the
|
||||
// query handler's SELECTs without worrying about MVCC/timing.
|
||||
// Lift this for G2+ when concurrent reads matter.
|
||||
db.SetMaxOpenConns(1)
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, fmt.Errorf("queryd: ping: %w", err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// buildBootstrap returns the SQL statements that initialize a fresh
|
||||
// DuckDB connection: install + load httpfs, then create-or-replace
|
||||
// the unnamed S3 secret. Endpoint is normalized — DuckDB wants
|
||||
// `host:port` without the scheme; the http/https split is controlled
|
||||
// by USE_SSL.
|
||||
func buildBootstrap(s3 shared.S3Config, creds secrets.S3Credentials) []string {
|
||||
endpoint := s3.Endpoint
|
||||
useSSL := true
|
||||
switch {
|
||||
case strings.HasPrefix(endpoint, "http://"):
|
||||
endpoint = strings.TrimPrefix(endpoint, "http://")
|
||||
useSSL = false
|
||||
case strings.HasPrefix(endpoint, "https://"):
|
||||
endpoint = strings.TrimPrefix(endpoint, "https://")
|
||||
useSSL = true
|
||||
}
|
||||
urlStyle := "vhost"
|
||||
if s3.UsePathStyle {
|
||||
urlStyle = "path"
|
||||
}
|
||||
|
||||
// CREATE OR REPLACE so a reconnect doesn't error on the existing
|
||||
// secret. Single-quoted SQL string literals; we escape ' → '' for
|
||||
// belt-and-braces (creds shouldn't contain ' but a future SSO
|
||||
// token might).
|
||||
createSecret := fmt.Sprintf(
|
||||
"CREATE OR REPLACE SECRET (TYPE S3, KEY_ID '%s', SECRET '%s', REGION '%s', ENDPOINT '%s', URL_STYLE '%s', USE_SSL %t)",
|
||||
sqlEscape(creds.AccessKeyID),
|
||||
sqlEscape(creds.SecretAccessKey),
|
||||
sqlEscape(s3.Region),
|
||||
sqlEscape(endpoint),
|
||||
sqlEscape(urlStyle),
|
||||
useSSL,
|
||||
)
|
||||
|
||||
return []string{
|
||||
"INSTALL httpfs",
|
||||
"LOAD httpfs",
|
||||
createSecret,
|
||||
}
|
||||
}
|
||||
|
||||
// sqlEscape doubles single quotes in a SQL string literal value.
|
||||
// SQL identifier escaping (doubling " for quoted identifiers) lives
|
||||
// next to its use site in registrar.go.
|
||||
func sqlEscape(v string) string {
|
||||
return strings.ReplaceAll(v, "'", "''")
|
||||
}
|
||||
|
||||
// redactCreds replaces known credential values inside an error
|
||||
// message with placeholder tokens. DuckDB's parser/exec error may
|
||||
// echo the offending statement; this scrubs the secret values
|
||||
// regardless. Per scrum B-LEAK (Kimi): a 503 from a typo in CREATE
|
||||
// SECRET would otherwise drop both KEY_ID and SECRET into the log
|
||||
// pipeline.
|
||||
func redactCreds(msg string, creds secrets.S3Credentials) string {
|
||||
if creds.AccessKeyID != "" {
|
||||
msg = strings.ReplaceAll(msg, creds.AccessKeyID, "[REDACTED-KEY]")
|
||||
}
|
||||
if creds.SecretAccessKey != "" {
|
||||
msg = strings.ReplaceAll(msg, creds.SecretAccessKey, "[REDACTED-SECRET]")
|
||||
}
|
||||
return msg
|
||||
}
|
||||
181
internal/queryd/registrar.go
Normal file
181
internal/queryd/registrar.go
Normal file
@ -0,0 +1,181 @@
|
||||
// registrar.go — turn catalogd's manifest list into DuckDB views.
|
||||
//
|
||||
// Refresh() reads /catalog/list, runs CREATE OR REPLACE VIEW for
|
||||
// every manifest, drops views for manifests that disappeared, and
|
||||
// skips any whose updated_at hasn't changed since the prior refresh
|
||||
// (per kickoff D5.3 spec — "don't re-CREATE on every request,
|
||||
// Opus review flagged that as the perf cliff").
|
||||
//
|
||||
// Identifier safety: all view names are double-quoted in SQL with
|
||||
// internal " escaped to "" — catalogd accepts user-supplied dataset
|
||||
// names that wouldn't pass SQL bare-identifier rules (hyphens,
|
||||
// digit-leading, reserved words). Quoting makes them unambiguous.
|
||||
package queryd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||
)
|
||||
|
||||
// CatalogLister is what registrar needs from the catalog client —
|
||||
// only List, no Register. Defined as an interface so unit tests can
|
||||
// inject a fake without spinning up a real catalogd.
|
||||
type CatalogLister interface {
|
||||
List(ctx context.Context) ([]*catalogd.Manifest, error)
|
||||
}
|
||||
|
||||
// Execer is the subset of *sql.DB that registrar actually uses.
|
||||
// Same testability win — unit tests can record the SQL strings
|
||||
// without booting DuckDB. *sql.DB satisfies this directly.
|
||||
type Execer interface {
|
||||
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
|
||||
}
|
||||
|
||||
// Registrar holds the running view state and the dependencies needed
|
||||
// to refresh it.
|
||||
type Registrar struct {
|
||||
exec Execer
|
||||
catalog CatalogLister
|
||||
bucket string
|
||||
|
||||
// known tracks the current view names + the updated_at we last
|
||||
// rebuilt them from. Used by Refresh to skip unchanged views and
|
||||
// drop ones that disappeared from the catalog.
|
||||
known map[string]time.Time
|
||||
}
|
||||
|
||||
// NewRegistrar builds a fresh registrar. bucket is the S3 bucket
|
||||
// name used to construct s3:// URLs for read_parquet().
|
||||
func NewRegistrar(exec Execer, catalog CatalogLister, bucket string) *Registrar {
|
||||
return &Registrar{
|
||||
exec: exec,
|
||||
catalog: catalog,
|
||||
bucket: bucket,
|
||||
known: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// RefreshStats reports what Refresh did — for logs + tests.
|
||||
type RefreshStats struct {
|
||||
Created int
|
||||
Updated int
|
||||
Dropped int
|
||||
Skipped int
|
||||
}
|
||||
|
||||
// Refresh syncs DuckDB's view catalog with catalogd's manifest list.
|
||||
// Returns counts of {Created, Updated, Dropped, Skipped} so the
|
||||
// caller can decide whether to log loudly.
|
||||
//
|
||||
// Per scrum C1 (Opus + Kimi convergent): drop pass runs BEFORE create
|
||||
// pass so a failure in one view's CREATE doesn't block another view's
|
||||
// DROP. Per-iteration errors are collected and the refresh continues
|
||||
// — a single poison manifest must not block the whole catalog from
|
||||
// re-syncing. The collected errors are joined into the return value;
|
||||
// callers see "everything that happened to fail this round."
|
||||
func (r *Registrar) Refresh(ctx context.Context) (RefreshStats, error) {
|
||||
var stats RefreshStats
|
||||
|
||||
manifests, err := r.catalog.List(ctx)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("registrar list: %w", err)
|
||||
}
|
||||
|
||||
wantNames := make(map[string]struct{}, len(manifests))
|
||||
for _, m := range manifests {
|
||||
wantNames[m.Name] = struct{}{}
|
||||
}
|
||||
|
||||
var errs []error
|
||||
|
||||
// 1. Drop views whose manifests disappeared. Run first so the
|
||||
// create pass can't block removals on a poison-create error.
|
||||
for name := range r.known {
|
||||
if _, still := wantNames[name]; still {
|
||||
continue
|
||||
}
|
||||
if err := r.dropView(ctx, name); err != nil {
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
delete(r.known, name)
|
||||
stats.Dropped++
|
||||
}
|
||||
|
||||
// 2. Create / update views for current manifests.
|
||||
for _, m := range manifests {
|
||||
prior, exists := r.known[m.Name]
|
||||
if exists && prior.Equal(m.UpdatedAt) {
|
||||
stats.Skipped++
|
||||
continue
|
||||
}
|
||||
if err := r.createOrReplaceView(ctx, m); err != nil {
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
stats.Updated++
|
||||
} else {
|
||||
stats.Created++
|
||||
}
|
||||
r.known[m.Name] = m.UpdatedAt
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return stats, errors.Join(errs...)
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// createOrReplaceView builds the DuckDB SQL for one manifest and
|
||||
// runs it. G0 manifests have exactly one Object per dataset; if more
|
||||
// land later (G2 multi-part), `read_parquet([...])` accepts a list.
|
||||
func (r *Registrar) createOrReplaceView(ctx context.Context, m *catalogd.Manifest) error {
|
||||
if len(m.Objects) == 0 {
|
||||
return fmt.Errorf("registrar: manifest %q has no objects", m.Name)
|
||||
}
|
||||
urls := make([]string, len(m.Objects))
|
||||
for i, obj := range m.Objects {
|
||||
urls[i] = fmt.Sprintf("'%s'", sqlEscape(buildS3URL(r.bucket, obj.Key)))
|
||||
}
|
||||
var fromExpr string
|
||||
if len(urls) == 1 {
|
||||
fromExpr = "read_parquet(" + urls[0] + ")"
|
||||
} else {
|
||||
fromExpr = "read_parquet([" + strings.Join(urls, ", ") + "])"
|
||||
}
|
||||
sql := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s",
|
||||
quoteIdent(m.Name), fromExpr)
|
||||
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
|
||||
return fmt.Errorf("create view %q: %w", m.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Registrar) dropView(ctx context.Context, name string) error {
|
||||
sql := "DROP VIEW IF EXISTS " + quoteIdent(name)
|
||||
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
|
||||
return fmt.Errorf("drop view %q: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// quoteIdent wraps a SQL identifier in double quotes and escapes
|
||||
// internal " by doubling. This is the SQL-standard rule — works in
|
||||
// every engine that accepts quoted identifiers.
|
||||
func quoteIdent(name string) string {
|
||||
return `"` + strings.ReplaceAll(name, `"`, `""`) + `"`
|
||||
}
|
||||
|
||||
// buildS3URL composes the s3://bucket/key URL DuckDB's httpfs
|
||||
// extension consumes. Keys aren't URL-escaped because read_parquet
|
||||
// accepts S3-style literal paths.
|
||||
func buildS3URL(bucket, key string) string {
|
||||
return "s3://" + bucket + "/" + key
|
||||
}
|
||||
192
internal/queryd/registrar_test.go
Normal file
192
internal/queryd/registrar_test.go
Normal file
@ -0,0 +1,192 @@
|
||||
package queryd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||
)
|
||||
|
||||
// fakeExecer records every SQL string Exec'd against it.
|
||||
type fakeExecer struct {
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (f *fakeExecer) ExecContext(_ context.Context, q string, _ ...any) (sql.Result, error) {
|
||||
f.calls = append(f.calls, q)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// fakeCatalog returns a fixed manifest list and an optional error.
|
||||
type fakeCatalog struct {
|
||||
manifests []*catalogd.Manifest
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeCatalog) List(_ context.Context) ([]*catalogd.Manifest, error) {
|
||||
return f.manifests, f.err
|
||||
}
|
||||
|
||||
func mkManifest(name, fp string, updatedAt time.Time, key string) *catalogd.Manifest {
|
||||
return &catalogd.Manifest{
|
||||
Name: name,
|
||||
DatasetID: "id-" + name,
|
||||
SchemaFingerprint: fp,
|
||||
Objects: []catalogd.Object{{Key: key, Size: 1024}},
|
||||
CreatedAt: updatedAt,
|
||||
UpdatedAt: updatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_CreatesViewsForNewManifests(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
||||
mkManifest("workers", "fp1", time.Unix(100, 0), "datasets/workers/fp1.parquet"),
|
||||
}}
|
||||
r := NewRegistrar(exec, cat, "lakehouse-go-primary")
|
||||
|
||||
stats, err := r.Refresh(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.Created != 1 || stats.Updated != 0 || stats.Dropped != 0 {
|
||||
t.Errorf("stats: got %+v, want {Created:1}", stats)
|
||||
}
|
||||
if len(exec.calls) != 1 {
|
||||
t.Fatalf("calls: got %d, want 1", len(exec.calls))
|
||||
}
|
||||
got := exec.calls[0]
|
||||
wantHas := []string{
|
||||
`CREATE OR REPLACE VIEW "workers"`,
|
||||
`read_parquet('s3://lakehouse-go-primary/datasets/workers/fp1.parquet')`,
|
||||
}
|
||||
for _, w := range wantHas {
|
||||
if !strings.Contains(got, w) {
|
||||
t.Errorf("missing %q in %q", w, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_SkipsUnchangedUpdatedAt(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
t1 := time.Unix(100, 0)
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
||||
mkManifest("workers", "fp1", t1, "k1"),
|
||||
}}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
|
||||
if _, err := r.Refresh(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Second refresh with same updated_at — should skip.
|
||||
stats, err := r.Refresh(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.Skipped != 1 || stats.Created != 0 || stats.Updated != 0 {
|
||||
t.Errorf("second refresh stats: got %+v, want {Skipped:1}", stats)
|
||||
}
|
||||
// Only the first refresh should have produced an Exec call.
|
||||
if len(exec.calls) != 1 {
|
||||
t.Errorf("calls: got %d, want 1 (skip should not Exec)", len(exec.calls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_RebuildsOnUpdatedAtBump(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
t1 := time.Unix(100, 0)
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
||||
mkManifest("workers", "fp1", t1, "k1"),
|
||||
}}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
_, _ = r.Refresh(context.Background())
|
||||
|
||||
// Bump updated_at — same name + fp, but updated_at changed (idempotent re-register on ingestd).
|
||||
t2 := time.Unix(200, 0)
|
||||
cat.manifests[0].UpdatedAt = t2
|
||||
stats, err := r.Refresh(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.Updated != 1 {
|
||||
t.Errorf("expected Updated:1 on updated_at bump, got %+v", stats)
|
||||
}
|
||||
if len(exec.calls) != 2 {
|
||||
t.Errorf("calls: got %d, want 2 (initial + rebuild)", len(exec.calls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_DropsViewForRemovedManifest(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
t1 := time.Unix(100, 0)
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
||||
mkManifest("workers", "fp1", t1, "k1"),
|
||||
mkManifest("retired", "fp2", t1, "k2"),
|
||||
}}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
_, _ = r.Refresh(context.Background())
|
||||
|
||||
// "retired" disappears from the catalog → registrar should DROP its view.
|
||||
cat.manifests = cat.manifests[:1]
|
||||
stats, err := r.Refresh(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.Dropped != 1 {
|
||||
t.Errorf("expected Dropped:1, got %+v", stats)
|
||||
}
|
||||
// Find the DROP VIEW call.
|
||||
var found bool
|
||||
for _, c := range exec.calls {
|
||||
if strings.HasPrefix(c, `DROP VIEW IF EXISTS "retired"`) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("DROP VIEW not emitted; calls=%v", exec.calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_QuotesAndEscapesNames(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
weird := `my "weird" dataset` // contains internal double quotes
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
||||
mkManifest(weird, "fp", time.Unix(1, 0), "k"),
|
||||
}}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
|
||||
if _, err := r.Refresh(context.Background()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got := exec.calls[0]
|
||||
wantQuoted := `"my ""weird"" dataset"`
|
||||
if !strings.Contains(got, wantQuoted) {
|
||||
t.Errorf("identifier not properly quoted: got %q, want substring %q", got, wantQuoted)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_PropagatesCatalogError(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
cat := &fakeCatalog{err: errors.New("catalog down")}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
_, err := r.Refresh(context.Background())
|
||||
if err == nil || !strings.Contains(err.Error(), "catalog down") {
|
||||
t.Errorf("expected wrapped catalog error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh_ManifestWithNoObjectsErrors(t *testing.T) {
|
||||
exec := &fakeExecer{}
|
||||
m := mkManifest("empty", "fp", time.Unix(1, 0), "k")
|
||||
m.Objects = nil // pathological — registrar shouldn't synthesize an empty SQL.
|
||||
cat := &fakeCatalog{manifests: []*catalogd.Manifest{m}}
|
||||
r := NewRegistrar(exec, cat, "b")
|
||||
_, err := r.Refresh(context.Background())
|
||||
if err == nil || !strings.Contains(err.Error(), "no objects") {
|
||||
t.Errorf("expected 'no objects' error, got %v", err)
|
||||
}
|
||||
}
|
||||
@ -23,7 +23,7 @@ type Config struct {
|
||||
Storaged ServiceConfig `toml:"storaged"`
|
||||
Catalogd CatalogConfig `toml:"catalogd"`
|
||||
Ingestd IngestConfig `toml:"ingestd"`
|
||||
Queryd ServiceConfig `toml:"queryd"`
|
||||
Queryd QuerydConfig `toml:"queryd"`
|
||||
S3 S3Config `toml:"s3"`
|
||||
Log LogConfig `toml:"log"`
|
||||
}
|
||||
@ -37,6 +37,18 @@ type IngestConfig struct {
|
||||
CatalogdURL string `toml:"catalogd_url"`
|
||||
}
|
||||
|
||||
// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB
|
||||
// directly to MinIO via DuckDB's httpfs extension (so no storaged
|
||||
// URL needed), and reads the catalog over HTTP for view registration.
|
||||
// SecretsPath defaults to /etc/lakehouse/secrets-go.toml — the same
|
||||
// file storaged uses, since both services need the S3 credentials.
|
||||
type QuerydConfig struct {
|
||||
Bind string `toml:"bind"`
|
||||
CatalogdURL string `toml:"catalogd_url"`
|
||||
SecretsPath string `toml:"secrets_path"`
|
||||
RefreshEvery string `toml:"refresh_every"` // duration string, e.g. "30s"
|
||||
}
|
||||
|
||||
// CatalogConfig adds catalogd-specific knobs on top of the standard
|
||||
// bind. StoragedURL points at the storaged service for manifest
|
||||
// persistence; G0 defaults to the localhost bind.
|
||||
@ -80,7 +92,12 @@ func DefaultConfig() Config {
|
||||
StoragedURL: "http://127.0.0.1:3211",
|
||||
CatalogdURL: "http://127.0.0.1:3212",
|
||||
},
|
||||
Queryd: ServiceConfig{Bind: "127.0.0.1:3214"},
|
||||
Queryd: QuerydConfig{
|
||||
Bind: "127.0.0.1:3214",
|
||||
CatalogdURL: "http://127.0.0.1:3212",
|
||||
SecretsPath: "/etc/lakehouse/secrets-go.toml",
|
||||
RefreshEvery: "30s",
|
||||
},
|
||||
S3: S3Config{
|
||||
Endpoint: "http://localhost:9000",
|
||||
Region: "us-east-1",
|
||||
|
||||
@ -21,6 +21,9 @@ catalogd_url = "http://127.0.0.1:3212"
|
||||
|
||||
[queryd]
|
||||
bind = "127.0.0.1:3214"
|
||||
catalogd_url = "http://127.0.0.1:3212"
|
||||
secrets_path = "/etc/lakehouse/secrets-go.toml"
|
||||
refresh_every = "30s"
|
||||
|
||||
[s3]
|
||||
endpoint = "http://localhost:9000"
|
||||
|
||||
182
scripts/d5_smoke.sh
Executable file
182
scripts/d5_smoke.sh
Executable file
@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env bash
|
||||
# D5 smoke — proves the Day 5 acceptance gate end-to-end.
|
||||
#
|
||||
# Validates:
|
||||
# - Ingest CSV via D4 → catalog manifest registered
|
||||
# - queryd starts AFTER ingest; initial Refresh picks up the dataset
|
||||
# - POST /sql with SELECT count(*) FROM <name> → matches CSV row count
|
||||
# - POST /sql with SELECT * FROM <name> LIMIT 3 → rows have expected
|
||||
# column names + types + values
|
||||
# - Re-ingest same CSV → updated_at bumps → next Refresh re-creates
|
||||
# the view (we restart queryd to verify the fresh-side path works)
|
||||
# - Schema-drift CSV ingest → catalogd 409s → queryd's view stays
|
||||
# pointing at the original (no DuckDB session corruption)
|
||||
#
|
||||
# Requires storaged + catalogd + ingestd + queryd binaries. MinIO at
|
||||
# :9000 with bucket lakehouse-go-primary already created (D2/D4 setup).
|
||||
#
|
||||
# Usage: ./scripts/d5_smoke.sh
|
||||
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
export PATH="$PATH:/usr/local/go/bin"
|
||||
|
||||
echo "[d5-smoke] building all 4 backing services..."
|
||||
go build -o bin/ ./cmd/storaged ./cmd/catalogd ./cmd/ingestd ./cmd/queryd
|
||||
|
||||
# Cleanup any prior processes.
|
||||
pkill -f "bin/storaged" 2>/dev/null || true
|
||||
pkill -f "bin/catalogd" 2>/dev/null || true
|
||||
pkill -f "bin/ingestd" 2>/dev/null || true
|
||||
pkill -f "bin/queryd" 2>/dev/null || true
|
||||
sleep 0.3
|
||||
|
||||
STORAGED_PID=""; CATALOGD_PID=""; INGESTD_PID=""; QUERYD_PID=""
|
||||
TMP="$(mktemp -d)"
|
||||
cleanup() {
|
||||
echo "[d5-smoke] cleanup"
|
||||
for p in $QUERYD_PID $INGESTD_PID $CATALOGD_PID $STORAGED_PID; do
|
||||
[ -n "$p" ] && kill "$p" 2>/dev/null || true
|
||||
done
|
||||
rm -rf "$TMP"
|
||||
}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
poll_health() {
|
||||
local port="$1" deadline=$(($(date +%s) + 5))
|
||||
while [ "$(date +%s)" -lt "$deadline" ]; do
|
||||
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
|
||||
sleep 0.05
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
echo "[d5-smoke] launching storaged → catalogd → ingestd..."
|
||||
./bin/storaged > /tmp/storaged.log 2>&1 &
|
||||
STORAGED_PID=$!
|
||||
poll_health 3211 || { echo "storaged failed"; tail -10 /tmp/storaged.log; exit 1; }
|
||||
|
||||
# Clean any prior catalog manifests + datasets so smoke starts fresh.
|
||||
for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=_catalog/manifests/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true
|
||||
done
|
||||
for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=datasets/d5_workers/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true
|
||||
done
|
||||
|
||||
./bin/catalogd > /tmp/catalogd.log 2>&1 &
|
||||
CATALOGD_PID=$!
|
||||
poll_health 3212 || { echo "catalogd failed"; tail -10 /tmp/catalogd.log; exit 1; }
|
||||
|
||||
./bin/ingestd > /tmp/ingestd.log 2>&1 &
|
||||
INGESTD_PID=$!
|
||||
poll_health 3213 || { echo "ingestd failed"; tail -10 /tmp/ingestd.log; exit 1; }
|
||||
|
||||
# Build a small CSV (5 rows × 5 cols) — same shape as D4 smoke so the
|
||||
# inferred types are familiar.
|
||||
NAME="d5_workers"
|
||||
cat > "$TMP/workers.csv" <<'EOF'
|
||||
id,name,salary,active,weight
|
||||
1,Alice,50000,true,165.5
|
||||
2,Bob,60000,false,180.0
|
||||
3,Carol,55000,true,135.2
|
||||
4,Dave,75000,false,200.0
|
||||
5,Eve,80000,true,150.5
|
||||
EOF
|
||||
|
||||
echo "[d5-smoke] ingest 5-row CSV via D4 path:"
|
||||
INGEST="$(curl -sS -X POST -F "file=@$TMP/workers.csv" "http://127.0.0.1:3213/ingest?name=$NAME")"
|
||||
RC="$(echo "$INGEST" | jq -r '.row_count')"
|
||||
if [ "$RC" = "5" ]; then
|
||||
echo " ✓ ingest row_count=5"
|
||||
else
|
||||
echo " ✗ ingest → $INGEST"; exit 1
|
||||
fi
|
||||
|
||||
# Launch queryd LAST — its initial Refresh picks up the dataset.
|
||||
echo "[d5-smoke] launching queryd (initial Refresh picks up $NAME)..."
|
||||
./bin/queryd > /tmp/queryd.log 2>&1 &
|
||||
QUERYD_PID=$!
|
||||
poll_health 3214 || { echo "queryd failed"; tail -20 /tmp/queryd.log; exit 1; }
|
||||
|
||||
FAILED=0
|
||||
|
||||
echo "[d5-smoke] POST /sql SELECT count(*) FROM $NAME:"
|
||||
COUNT_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d "{\"sql\":\"SELECT count(*) AS n FROM \\\"$NAME\\\"\"}")"
|
||||
N="$(echo "$COUNT_RESP" | jq -r '.rows[0][0]')"
|
||||
if [ "$N" = "5" ]; then
|
||||
echo " ✓ count(*)=5"
|
||||
else
|
||||
echo " ✗ count → $COUNT_RESP"
|
||||
echo " queryd log tail:"; tail -20 /tmp/queryd.log
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
echo "[d5-smoke] POST /sql SELECT * FROM $NAME LIMIT 3:"
|
||||
ROWS_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d "{\"sql\":\"SELECT id, name, salary FROM \\\"$NAME\\\" ORDER BY id LIMIT 3\"}")"
|
||||
ROW_COUNT="$(echo "$ROWS_RESP" | jq -r '.row_count')"
|
||||
COL0_NAME="$(echo "$ROWS_RESP" | jq -r '.columns[0].name')"
|
||||
COL1_NAME="$(echo "$ROWS_RESP" | jq -r '.columns[1].name')"
|
||||
ROW0_ID="$(echo "$ROWS_RESP" | jq -r '.rows[0][0]')"
|
||||
ROW0_NAME="$(echo "$ROWS_RESP" | jq -r '.rows[0][1]')"
|
||||
if [ "$ROW_COUNT" = "3" ] && [ "$COL0_NAME" = "id" ] && [ "$COL1_NAME" = "name" ] \
|
||||
&& [ "$ROW0_ID" = "1" ] && [ "$ROW0_NAME" = "Alice" ]; then
|
||||
echo " ✓ rows[0] = (id=1, name=Alice), columns=[id, name, salary]"
|
||||
else
|
||||
echo " ✗ rows → $ROWS_RESP"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
echo "[d5-smoke] schema-drift ingest 409s; existing view still queries:"
|
||||
cat > "$TMP/workers_drift.csv" <<'EOF'
|
||||
user_id,name,salary,active,weight
|
||||
99,Mallory,99999,true,99.9
|
||||
EOF
|
||||
HTTP="$(curl -sS -o "$TMP/conflict.out" -w '%{http_code}' -X POST \
|
||||
-F "file=@$TMP/workers_drift.csv" "http://127.0.0.1:3213/ingest?name=$NAME")"
|
||||
if [ "$HTTP" = "409" ]; then
|
||||
echo " ✓ drift → 409"
|
||||
else
|
||||
echo " ✗ drift → $HTTP"; FAILED=1
|
||||
fi
|
||||
# Verify the original view is still queryable.
|
||||
COUNT_RESP="$(curl -sS -X POST http://127.0.0.1:3214/sql \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d "{\"sql\":\"SELECT count(*) FROM \\\"$NAME\\\"\"}")"
|
||||
N="$(echo "$COUNT_RESP" | jq -r '.rows[0][0]')"
|
||||
if [ "$N" = "5" ]; then
|
||||
echo " ✓ post-drift count(*)=5 (view unchanged)"
|
||||
else
|
||||
echo " ✗ post-drift count → $COUNT_RESP"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
echo "[d5-smoke] error path: SELECT FROM nonexistent → 400:"
|
||||
HTTP="$(curl -sS -o "$TMP/err.out" -w '%{http_code}' -X POST http://127.0.0.1:3214/sql \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"sql":"SELECT * FROM no_such_table"}')"
|
||||
if [ "$HTTP" = "400" ]; then
|
||||
echo " ✓ unknown table → 400"
|
||||
else
|
||||
echo " ✗ unknown table → $HTTP body=$(cat "$TMP/err.out")"
|
||||
FAILED=1
|
||||
fi
|
||||
|
||||
# Cleanup smoke artifacts.
|
||||
for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=datasets/$NAME/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true
|
||||
done
|
||||
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_catalog/manifests/$NAME.parquet" || true
|
||||
|
||||
if [ "$FAILED" -eq 0 ]; then
|
||||
echo "[d5-smoke] D5 acceptance gate: PASSED"
|
||||
exit 0
|
||||
else
|
||||
echo "[d5-smoke] D5 acceptance gate: FAILED"
|
||||
exit 1
|
||||
fi
|
||||
Loading…
x
Reference in New Issue
Block a user