golangLAKEHOUSE/internal/queryd/registrar_test.go
root 9e9e4c26a4 G0 D5: queryd DuckDB SELECT over Parquet via httpfs · 4 scrum fixes
Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector
that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET
(TYPE S3) on every new connection, sourced from SecretsProvider +
shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and
handler's SELECTs serialize through one connection (avoids cross-
connection MVCC visibility edge cases).

Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR
REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key')
per manifest, drops views for removed manifests, skips on unchanged
updated_at (the implicit etag). Drop pass runs BEFORE create pass so
a poison manifest can't block other manifest refreshes (post-scrum
C1 fix).

POST /sql with JSON body {"sql":"…"} returns
{"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]],
"row_count":N}. []byte → string conversion so VARCHAR rows
JSON-encode as text. 30s default refresh ticker, configurable via
[queryd].refresh_every.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                      1 BLOCK + 4 WARN + 4 INFO
  - Kimi K2-0905 (openrouter):                2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                 2 BLOCK + 1 WARN + 1 INFO

Fixed (4):
  C1 (Opus + Kimi convergent): Refresh aborts on first per-view error
    → drop pass first, collect errors, errors.Join. Poison manifest
    no longer blocks the rest of the catalog from re-syncing.
  B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx →
    cancelled-ctx silently fails every reconnect. context.Background()
    inside closure; passed ctx only for initial Ping.
  B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80
    chars but those 80 chars contained KEY_ID + SECRET prefix → log
    aggregator captures credentials. Stable per-statement labels +
    redactCreds() filter on wrapped DuckDB errors.
  JSON-ERR (Opus WARN): swallowed json.Encode error → silent
    truncated 200 on unsupported column types. slog.Warn the failure.

Dismissed (4 false positives):
  Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit
  Qwen BLOCK "MaxBytesReader after Decode" — false, applied before
  Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a
    deadlock, just serialization, by design with 10s timeout retry
  Kimi WARN "dropView leaves r.known inconsistent" — current code
    returns before the delete; the entry persists for retry

Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi
on the per-view error blocking, plus two independent single-reviewer
BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The
B-LEAK fix uses defense-in-depth: never pass SQL into the error
path AND redact known cred values from DuckDB's own error message.

DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per
ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every
fix round.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 00:10:55 -05:00

193 lines
5.4 KiB
Go

package queryd
import (
"context"
"database/sql"
"errors"
"strings"
"testing"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
)
// fakeExecer records every SQL string Exec'd against it.
type fakeExecer struct {
calls []string
}
func (f *fakeExecer) ExecContext(_ context.Context, q string, _ ...any) (sql.Result, error) {
f.calls = append(f.calls, q)
return nil, nil
}
// fakeCatalog returns a fixed manifest list and an optional error.
type fakeCatalog struct {
manifests []*catalogd.Manifest
err error
}
func (f *fakeCatalog) List(_ context.Context) ([]*catalogd.Manifest, error) {
return f.manifests, f.err
}
func mkManifest(name, fp string, updatedAt time.Time, key string) *catalogd.Manifest {
return &catalogd.Manifest{
Name: name,
DatasetID: "id-" + name,
SchemaFingerprint: fp,
Objects: []catalogd.Object{{Key: key, Size: 1024}},
CreatedAt: updatedAt,
UpdatedAt: updatedAt,
}
}
func TestRefresh_CreatesViewsForNewManifests(t *testing.T) {
exec := &fakeExecer{}
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
mkManifest("workers", "fp1", time.Unix(100, 0), "datasets/workers/fp1.parquet"),
}}
r := NewRegistrar(exec, cat, "lakehouse-go-primary")
stats, err := r.Refresh(context.Background())
if err != nil {
t.Fatal(err)
}
if stats.Created != 1 || stats.Updated != 0 || stats.Dropped != 0 {
t.Errorf("stats: got %+v, want {Created:1}", stats)
}
if len(exec.calls) != 1 {
t.Fatalf("calls: got %d, want 1", len(exec.calls))
}
got := exec.calls[0]
wantHas := []string{
`CREATE OR REPLACE VIEW "workers"`,
`read_parquet('s3://lakehouse-go-primary/datasets/workers/fp1.parquet')`,
}
for _, w := range wantHas {
if !strings.Contains(got, w) {
t.Errorf("missing %q in %q", w, got)
}
}
}
func TestRefresh_SkipsUnchangedUpdatedAt(t *testing.T) {
exec := &fakeExecer{}
t1 := time.Unix(100, 0)
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
mkManifest("workers", "fp1", t1, "k1"),
}}
r := NewRegistrar(exec, cat, "b")
if _, err := r.Refresh(context.Background()); err != nil {
t.Fatal(err)
}
// Second refresh with same updated_at — should skip.
stats, err := r.Refresh(context.Background())
if err != nil {
t.Fatal(err)
}
if stats.Skipped != 1 || stats.Created != 0 || stats.Updated != 0 {
t.Errorf("second refresh stats: got %+v, want {Skipped:1}", stats)
}
// Only the first refresh should have produced an Exec call.
if len(exec.calls) != 1 {
t.Errorf("calls: got %d, want 1 (skip should not Exec)", len(exec.calls))
}
}
func TestRefresh_RebuildsOnUpdatedAtBump(t *testing.T) {
exec := &fakeExecer{}
t1 := time.Unix(100, 0)
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
mkManifest("workers", "fp1", t1, "k1"),
}}
r := NewRegistrar(exec, cat, "b")
_, _ = r.Refresh(context.Background())
// Bump updated_at — same name + fp, but updated_at changed (idempotent re-register on ingestd).
t2 := time.Unix(200, 0)
cat.manifests[0].UpdatedAt = t2
stats, err := r.Refresh(context.Background())
if err != nil {
t.Fatal(err)
}
if stats.Updated != 1 {
t.Errorf("expected Updated:1 on updated_at bump, got %+v", stats)
}
if len(exec.calls) != 2 {
t.Errorf("calls: got %d, want 2 (initial + rebuild)", len(exec.calls))
}
}
func TestRefresh_DropsViewForRemovedManifest(t *testing.T) {
exec := &fakeExecer{}
t1 := time.Unix(100, 0)
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
mkManifest("workers", "fp1", t1, "k1"),
mkManifest("retired", "fp2", t1, "k2"),
}}
r := NewRegistrar(exec, cat, "b")
_, _ = r.Refresh(context.Background())
// "retired" disappears from the catalog → registrar should DROP its view.
cat.manifests = cat.manifests[:1]
stats, err := r.Refresh(context.Background())
if err != nil {
t.Fatal(err)
}
if stats.Dropped != 1 {
t.Errorf("expected Dropped:1, got %+v", stats)
}
// Find the DROP VIEW call.
var found bool
for _, c := range exec.calls {
if strings.HasPrefix(c, `DROP VIEW IF EXISTS "retired"`) {
found = true
}
}
if !found {
t.Errorf("DROP VIEW not emitted; calls=%v", exec.calls)
}
}
func TestRefresh_QuotesAndEscapesNames(t *testing.T) {
exec := &fakeExecer{}
weird := `my "weird" dataset` // contains internal double quotes
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
mkManifest(weird, "fp", time.Unix(1, 0), "k"),
}}
r := NewRegistrar(exec, cat, "b")
if _, err := r.Refresh(context.Background()); err != nil {
t.Fatal(err)
}
got := exec.calls[0]
wantQuoted := `"my ""weird"" dataset"`
if !strings.Contains(got, wantQuoted) {
t.Errorf("identifier not properly quoted: got %q, want substring %q", got, wantQuoted)
}
}
func TestRefresh_PropagatesCatalogError(t *testing.T) {
exec := &fakeExecer{}
cat := &fakeCatalog{err: errors.New("catalog down")}
r := NewRegistrar(exec, cat, "b")
_, err := r.Refresh(context.Background())
if err == nil || !strings.Contains(err.Error(), "catalog down") {
t.Errorf("expected wrapped catalog error, got %v", err)
}
}
func TestRefresh_ManifestWithNoObjectsErrors(t *testing.T) {
exec := &fakeExecer{}
m := mkManifest("empty", "fp", time.Unix(1, 0), "k")
m.Objects = nil // pathological — registrar shouldn't synthesize an empty SQL.
cat := &fakeCatalog{manifests: []*catalogd.Manifest{m}}
r := NewRegistrar(exec, cat, "b")
_, err := r.Refresh(context.Background())
if err == nil || !strings.Contains(err.Error(), "no objects") {
t.Errorf("expected 'no objects' error, got %v", err)
}
}