root 9e9e4c26a4 G0 D5: queryd DuckDB SELECT over Parquet via httpfs · 4 scrum fixes
Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector
that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET
(TYPE S3) on every new connection, sourced from SecretsProvider +
shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and
handler's SELECTs serialize through one connection (avoids cross-
connection MVCC visibility edge cases).

Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR
REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key')
per manifest, drops views for removed manifests, skips on unchanged
updated_at (the implicit etag). Drop pass runs BEFORE create pass so
a poison manifest can't block other manifest refreshes (post-scrum
C1 fix).

POST /sql with JSON body {"sql":"…"} returns
{"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]],
"row_count":N}. []byte → string conversion so VARCHAR rows
JSON-encode as text. 30s default refresh ticker, configurable via
[queryd].refresh_every.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                      1 BLOCK + 4 WARN + 4 INFO
  - Kimi K2-0905 (openrouter):                2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                 2 BLOCK + 1 WARN + 1 INFO

Fixed (4):
  C1 (Opus + Kimi convergent): Refresh aborts on first per-view error
    → drop pass first, collect errors, errors.Join. Poison manifest
    no longer blocks the rest of the catalog from re-syncing.
  B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx →
    cancelled-ctx silently fails every reconnect. context.Background()
    inside closure; passed ctx only for initial Ping.
  B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80
    chars but those 80 chars contained KEY_ID + SECRET prefix → log
    aggregator captures credentials. Stable per-statement labels +
    redactCreds() filter on wrapped DuckDB errors.
  JSON-ERR (Opus WARN): swallowed json.Encode error → silent
    truncated 200 on unsupported column types. slog.Warn the failure.

Dismissed (4 false positives):
  Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit
  Qwen BLOCK "MaxBytesReader after Decode" — false, applied before
  Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a
    deadlock, just serialization, by design with 10s timeout retry
  Kimi WARN "dropView leaves r.known inconsistent" — current code
    returns before the delete; the entry persists for retry

Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi
on the per-view error blocking, plus two independent single-reviewer
BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The
B-LEAK fix uses defense-in-depth: never pass SQL into the error
path AND redact known cred values from DuckDB's own error message.

DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per
ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every
fix round.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 00:10:55 -05:00

182 lines
5.6 KiB
Go

// registrar.go — turn catalogd's manifest list into DuckDB views.
//
// Refresh() reads /catalog/list, runs CREATE OR REPLACE VIEW for
// every manifest, drops views for manifests that disappeared, and
// skips any whose updated_at hasn't changed since the prior refresh
// (per kickoff D5.3 spec — "don't re-CREATE on every request,
// Opus review flagged that as the perf cliff").
//
// Identifier safety: all view names are double-quoted in SQL with
// internal " escaped to "" — catalogd accepts user-supplied dataset
// names that wouldn't pass SQL bare-identifier rules (hyphens,
// digit-leading, reserved words). Quoting makes them unambiguous.
package queryd
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
)
// CatalogLister is what registrar needs from the catalog client —
// only List, no Register. Defined as an interface so unit tests can
// inject a fake without spinning up a real catalogd.
type CatalogLister interface {
List(ctx context.Context) ([]*catalogd.Manifest, error)
}
// Execer is the subset of *sql.DB that registrar actually uses.
// Same testability win — unit tests can record the SQL strings
// without booting DuckDB. *sql.DB satisfies this directly.
type Execer interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}
// Registrar holds the running view state and the dependencies needed
// to refresh it.
type Registrar struct {
exec Execer
catalog CatalogLister
bucket string
// known tracks the current view names + the updated_at we last
// rebuilt them from. Used by Refresh to skip unchanged views and
// drop ones that disappeared from the catalog.
known map[string]time.Time
}
// NewRegistrar builds a fresh registrar. bucket is the S3 bucket
// name used to construct s3:// URLs for read_parquet().
func NewRegistrar(exec Execer, catalog CatalogLister, bucket string) *Registrar {
return &Registrar{
exec: exec,
catalog: catalog,
bucket: bucket,
known: make(map[string]time.Time),
}
}
// RefreshStats reports what Refresh did — for logs + tests.
type RefreshStats struct {
Created int
Updated int
Dropped int
Skipped int
}
// Refresh syncs DuckDB's view catalog with catalogd's manifest list.
// Returns counts of {Created, Updated, Dropped, Skipped} so the
// caller can decide whether to log loudly.
//
// Per scrum C1 (Opus + Kimi convergent): drop pass runs BEFORE create
// pass so a failure in one view's CREATE doesn't block another view's
// DROP. Per-iteration errors are collected and the refresh continues
// — a single poison manifest must not block the whole catalog from
// re-syncing. The collected errors are joined into the return value;
// callers see "everything that happened to fail this round."
func (r *Registrar) Refresh(ctx context.Context) (RefreshStats, error) {
var stats RefreshStats
manifests, err := r.catalog.List(ctx)
if err != nil {
return stats, fmt.Errorf("registrar list: %w", err)
}
wantNames := make(map[string]struct{}, len(manifests))
for _, m := range manifests {
wantNames[m.Name] = struct{}{}
}
var errs []error
// 1. Drop views whose manifests disappeared. Run first so the
// create pass can't block removals on a poison-create error.
for name := range r.known {
if _, still := wantNames[name]; still {
continue
}
if err := r.dropView(ctx, name); err != nil {
errs = append(errs, err)
continue
}
delete(r.known, name)
stats.Dropped++
}
// 2. Create / update views for current manifests.
for _, m := range manifests {
prior, exists := r.known[m.Name]
if exists && prior.Equal(m.UpdatedAt) {
stats.Skipped++
continue
}
if err := r.createOrReplaceView(ctx, m); err != nil {
errs = append(errs, err)
continue
}
if exists {
stats.Updated++
} else {
stats.Created++
}
r.known[m.Name] = m.UpdatedAt
}
if len(errs) > 0 {
return stats, errors.Join(errs...)
}
return stats, nil
}
// createOrReplaceView builds the DuckDB SQL for one manifest and
// runs it. G0 manifests have exactly one Object per dataset; if more
// land later (G2 multi-part), `read_parquet([...])` accepts a list.
func (r *Registrar) createOrReplaceView(ctx context.Context, m *catalogd.Manifest) error {
if len(m.Objects) == 0 {
return fmt.Errorf("registrar: manifest %q has no objects", m.Name)
}
urls := make([]string, len(m.Objects))
for i, obj := range m.Objects {
urls[i] = fmt.Sprintf("'%s'", sqlEscape(buildS3URL(r.bucket, obj.Key)))
}
var fromExpr string
if len(urls) == 1 {
fromExpr = "read_parquet(" + urls[0] + ")"
} else {
fromExpr = "read_parquet([" + strings.Join(urls, ", ") + "])"
}
sql := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s",
quoteIdent(m.Name), fromExpr)
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("create view %q: %w", m.Name, err)
}
return nil
}
func (r *Registrar) dropView(ctx context.Context, name string) error {
sql := "DROP VIEW IF EXISTS " + quoteIdent(name)
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
return fmt.Errorf("drop view %q: %w", name, err)
}
return nil
}
// quoteIdent wraps a SQL identifier in double quotes and escapes
// internal " by doubling. This is the SQL-standard rule — works in
// every engine that accepts quoted identifiers.
func quoteIdent(name string) string {
return `"` + strings.ReplaceAll(name, `"`, `""`) + `"`
}
// buildS3URL composes the s3://bucket/key URL DuckDB's httpfs
// extension consumes. Keys aren't URL-escaped because read_parquet
// accepts S3-style literal paths.
func buildS3URL(bucket, key string) string {
return "s3://" + bucket + "/" + key
}