Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector
that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET
(TYPE S3) on every new connection, sourced from SecretsProvider +
shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and
handler's SELECTs serialize through one connection (avoids cross-
connection MVCC visibility edge cases).
Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR
REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key')
per manifest, drops views for removed manifests, skips on unchanged
updated_at (the implicit etag). Drop pass runs BEFORE create pass so
a poison manifest can't block other manifest refreshes (post-scrum
C1 fix).
POST /sql with JSON body {"sql":"…"} returns
{"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]],
"row_count":N}. []byte → string conversion so VARCHAR rows
JSON-encode as text. 30s default refresh ticker, configurable via
[queryd].refresh_every.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 4 WARN + 4 INFO
- Kimi K2-0905 (openrouter): 2 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO
Fixed (4):
C1 (Opus + Kimi convergent): Refresh aborts on first per-view error
→ drop pass first, collect errors, errors.Join. Poison manifest
no longer blocks the rest of the catalog from re-syncing.
B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx →
cancelled-ctx silently fails every reconnect. context.Background()
inside closure; passed ctx only for initial Ping.
B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80
chars but those 80 chars contained KEY_ID + SECRET prefix → log
aggregator captures credentials. Stable per-statement labels +
redactCreds() filter on wrapped DuckDB errors.
JSON-ERR (Opus WARN): swallowed json.Encode error → silent
truncated 200 on unsupported column types. slog.Warn the failure.
Dismissed (4 false positives):
Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit
Qwen BLOCK "MaxBytesReader after Decode" — false, applied before
Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a
deadlock, just serialization, by design with 10s timeout retry
Kimi WARN "dropView leaves r.known inconsistent" — current code
returns before the delete; the entry persists for retry
Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi
on the per-view error blocking, plus two independent single-reviewer
BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The
B-LEAK fix uses defense-in-depth: never pass SQL into the error
path AND redact known cred values from DuckDB's own error message.
DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per
ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every
fix round.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
182 lines
5.6 KiB
Go
182 lines
5.6 KiB
Go
// registrar.go — turn catalogd's manifest list into DuckDB views.
|
|
//
|
|
// Refresh() reads /catalog/list, runs CREATE OR REPLACE VIEW for
|
|
// every manifest, drops views for manifests that disappeared, and
|
|
// skips any whose updated_at hasn't changed since the prior refresh
|
|
// (per kickoff D5.3 spec — "don't re-CREATE on every request,
|
|
// Opus review flagged that as the perf cliff").
|
|
//
|
|
// Identifier safety: all view names are double-quoted in SQL with
|
|
// internal " escaped to "" — catalogd accepts user-supplied dataset
|
|
// names that wouldn't pass SQL bare-identifier rules (hyphens,
|
|
// digit-leading, reserved words). Quoting makes them unambiguous.
|
|
package queryd
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
|
)
|
|
|
|
// CatalogLister is what registrar needs from the catalog client —
|
|
// only List, no Register. Defined as an interface so unit tests can
|
|
// inject a fake without spinning up a real catalogd.
|
|
type CatalogLister interface {
|
|
List(ctx context.Context) ([]*catalogd.Manifest, error)
|
|
}
|
|
|
|
// Execer is the subset of *sql.DB that registrar actually uses.
|
|
// Same testability win — unit tests can record the SQL strings
|
|
// without booting DuckDB. *sql.DB satisfies this directly.
|
|
type Execer interface {
|
|
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
|
|
}
|
|
|
|
// Registrar holds the running view state and the dependencies needed
|
|
// to refresh it.
|
|
type Registrar struct {
|
|
exec Execer
|
|
catalog CatalogLister
|
|
bucket string
|
|
|
|
// known tracks the current view names + the updated_at we last
|
|
// rebuilt them from. Used by Refresh to skip unchanged views and
|
|
// drop ones that disappeared from the catalog.
|
|
known map[string]time.Time
|
|
}
|
|
|
|
// NewRegistrar builds a fresh registrar. bucket is the S3 bucket
|
|
// name used to construct s3:// URLs for read_parquet().
|
|
func NewRegistrar(exec Execer, catalog CatalogLister, bucket string) *Registrar {
|
|
return &Registrar{
|
|
exec: exec,
|
|
catalog: catalog,
|
|
bucket: bucket,
|
|
known: make(map[string]time.Time),
|
|
}
|
|
}
|
|
|
|
// RefreshStats reports what Refresh did — for logs + tests.
|
|
type RefreshStats struct {
|
|
Created int
|
|
Updated int
|
|
Dropped int
|
|
Skipped int
|
|
}
|
|
|
|
// Refresh syncs DuckDB's view catalog with catalogd's manifest list.
|
|
// Returns counts of {Created, Updated, Dropped, Skipped} so the
|
|
// caller can decide whether to log loudly.
|
|
//
|
|
// Per scrum C1 (Opus + Kimi convergent): drop pass runs BEFORE create
|
|
// pass so a failure in one view's CREATE doesn't block another view's
|
|
// DROP. Per-iteration errors are collected and the refresh continues
|
|
// — a single poison manifest must not block the whole catalog from
|
|
// re-syncing. The collected errors are joined into the return value;
|
|
// callers see "everything that happened to fail this round."
|
|
func (r *Registrar) Refresh(ctx context.Context) (RefreshStats, error) {
|
|
var stats RefreshStats
|
|
|
|
manifests, err := r.catalog.List(ctx)
|
|
if err != nil {
|
|
return stats, fmt.Errorf("registrar list: %w", err)
|
|
}
|
|
|
|
wantNames := make(map[string]struct{}, len(manifests))
|
|
for _, m := range manifests {
|
|
wantNames[m.Name] = struct{}{}
|
|
}
|
|
|
|
var errs []error
|
|
|
|
// 1. Drop views whose manifests disappeared. Run first so the
|
|
// create pass can't block removals on a poison-create error.
|
|
for name := range r.known {
|
|
if _, still := wantNames[name]; still {
|
|
continue
|
|
}
|
|
if err := r.dropView(ctx, name); err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
delete(r.known, name)
|
|
stats.Dropped++
|
|
}
|
|
|
|
// 2. Create / update views for current manifests.
|
|
for _, m := range manifests {
|
|
prior, exists := r.known[m.Name]
|
|
if exists && prior.Equal(m.UpdatedAt) {
|
|
stats.Skipped++
|
|
continue
|
|
}
|
|
if err := r.createOrReplaceView(ctx, m); err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
if exists {
|
|
stats.Updated++
|
|
} else {
|
|
stats.Created++
|
|
}
|
|
r.known[m.Name] = m.UpdatedAt
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return stats, errors.Join(errs...)
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
// createOrReplaceView builds the DuckDB SQL for one manifest and
|
|
// runs it. G0 manifests have exactly one Object per dataset; if more
|
|
// land later (G2 multi-part), `read_parquet([...])` accepts a list.
|
|
func (r *Registrar) createOrReplaceView(ctx context.Context, m *catalogd.Manifest) error {
|
|
if len(m.Objects) == 0 {
|
|
return fmt.Errorf("registrar: manifest %q has no objects", m.Name)
|
|
}
|
|
urls := make([]string, len(m.Objects))
|
|
for i, obj := range m.Objects {
|
|
urls[i] = fmt.Sprintf("'%s'", sqlEscape(buildS3URL(r.bucket, obj.Key)))
|
|
}
|
|
var fromExpr string
|
|
if len(urls) == 1 {
|
|
fromExpr = "read_parquet(" + urls[0] + ")"
|
|
} else {
|
|
fromExpr = "read_parquet([" + strings.Join(urls, ", ") + "])"
|
|
}
|
|
sql := fmt.Sprintf("CREATE OR REPLACE VIEW %s AS SELECT * FROM %s",
|
|
quoteIdent(m.Name), fromExpr)
|
|
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
|
|
return fmt.Errorf("create view %q: %w", m.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Registrar) dropView(ctx context.Context, name string) error {
|
|
sql := "DROP VIEW IF EXISTS " + quoteIdent(name)
|
|
if _, err := r.exec.ExecContext(ctx, sql); err != nil {
|
|
return fmt.Errorf("drop view %q: %w", name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// quoteIdent wraps a SQL identifier in double quotes and escapes
|
|
// internal " by doubling. This is the SQL-standard rule — works in
|
|
// every engine that accepts quoted identifiers.
|
|
func quoteIdent(name string) string {
|
|
return `"` + strings.ReplaceAll(name, `"`, `""`) + `"`
|
|
}
|
|
|
|
// buildS3URL composes the s3://bucket/key URL DuckDB's httpfs
|
|
// extension consumes. Keys aren't URL-escaped because read_parquet
|
|
// accepts S3-style literal paths.
|
|
func buildS3URL(bucket, key string) string {
|
|
return "s3://" + bucket + "/" + key
|
|
}
|