Phase G0 Day 5 ships queryd: in-memory DuckDB with custom Connector
that runs INSTALL httpfs / LOAD httpfs / CREATE OR REPLACE SECRET
(TYPE S3) on every new connection, sourced from SecretsProvider +
shared.S3Config. SetMaxOpenConns(1) so registrar's CREATE VIEWs and
handler's SELECTs serialize through one connection (avoids cross-
connection MVCC visibility edge cases).
Registrar.Refresh reads catalogd /catalog/list, runs CREATE OR
REPLACE VIEW "name" AS SELECT * FROM read_parquet('s3://bucket/key')
per manifest, drops views for removed manifests, skips on unchanged
updated_at (the implicit etag). Drop pass runs BEFORE create pass so
a poison manifest can't block other manifest refreshes (post-scrum
C1 fix).
POST /sql with JSON body {"sql":"…"} returns
{"columns":[{"name":"id","type":"BIGINT"},…], "rows":[[…]],
"row_count":N}. []byte → string conversion so VARCHAR rows
JSON-encode as text. 30s default refresh ticker, configurable via
[queryd].refresh_every.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 4 WARN + 4 INFO
- Kimi K2-0905 (openrouter): 2 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO
Fixed (4):
C1 (Opus + Kimi convergent): Refresh aborts on first per-view error
→ drop pass first, collect errors, errors.Join. Poison manifest
no longer blocks the rest of the catalog from re-syncing.
B-CTX (Opus BLOCK): bootstrap closure captured OpenDB's ctx →
cancelled-ctx silently fails every reconnect. context.Background()
inside closure; passed ctx only for initial Ping.
B-LEAK (Kimi BLOCK): firstLine(stmt) truncated CREATE SECRET to 80
chars but those 80 chars contained KEY_ID + SECRET prefix → log
aggregator captures credentials. Stable per-statement labels +
redactCreds() filter on wrapped DuckDB errors.
JSON-ERR (Opus WARN): swallowed json.Encode error → silent
truncated 200 on unsupported column types. slog.Warn the failure.
Dismissed (4 false positives):
Qwen BLOCK "bootstrap not transactional" — DuckDB DDL is auto-commit
Qwen BLOCK "MaxBytesReader after Decode" — false, applied before
Kimi BLOCK "concurrent Refresh + user SELECT deadlock" — not a
deadlock, just serialization, by design with 10s timeout retry
Kimi WARN "dropView leaves r.known inconsistent" — current code
returns before the delete; the entry persists for retry
Critical reviewer behavior: 1 convergent BLOCK between Opus + Kimi
on the per-view error blocking, plus two independent single-reviewer
BLOCKs (B-CTX, B-LEAK) that smoke could never have caught. The
B-LEAK fix uses defense-in-depth: never pass SQL into the error
path AND redact known cred values from DuckDB's own error message.
DuckDB cgo path: github.com/duckdb/duckdb-go/v2 v2.10502.0 (per
ADR-001 §1) on Go 1.25 + arrow-go. Smoke 6/6 PASS after every
fix round.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
193 lines
5.4 KiB
Go
193 lines
5.4 KiB
Go
package queryd
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
|
)
|
|
|
|
// fakeExecer records every SQL string Exec'd against it.
|
|
type fakeExecer struct {
|
|
calls []string
|
|
}
|
|
|
|
func (f *fakeExecer) ExecContext(_ context.Context, q string, _ ...any) (sql.Result, error) {
|
|
f.calls = append(f.calls, q)
|
|
return nil, nil
|
|
}
|
|
|
|
// fakeCatalog returns a fixed manifest list and an optional error.
|
|
type fakeCatalog struct {
|
|
manifests []*catalogd.Manifest
|
|
err error
|
|
}
|
|
|
|
func (f *fakeCatalog) List(_ context.Context) ([]*catalogd.Manifest, error) {
|
|
return f.manifests, f.err
|
|
}
|
|
|
|
func mkManifest(name, fp string, updatedAt time.Time, key string) *catalogd.Manifest {
|
|
return &catalogd.Manifest{
|
|
Name: name,
|
|
DatasetID: "id-" + name,
|
|
SchemaFingerprint: fp,
|
|
Objects: []catalogd.Object{{Key: key, Size: 1024}},
|
|
CreatedAt: updatedAt,
|
|
UpdatedAt: updatedAt,
|
|
}
|
|
}
|
|
|
|
func TestRefresh_CreatesViewsForNewManifests(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
|
mkManifest("workers", "fp1", time.Unix(100, 0), "datasets/workers/fp1.parquet"),
|
|
}}
|
|
r := NewRegistrar(exec, cat, "lakehouse-go-primary")
|
|
|
|
stats, err := r.Refresh(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if stats.Created != 1 || stats.Updated != 0 || stats.Dropped != 0 {
|
|
t.Errorf("stats: got %+v, want {Created:1}", stats)
|
|
}
|
|
if len(exec.calls) != 1 {
|
|
t.Fatalf("calls: got %d, want 1", len(exec.calls))
|
|
}
|
|
got := exec.calls[0]
|
|
wantHas := []string{
|
|
`CREATE OR REPLACE VIEW "workers"`,
|
|
`read_parquet('s3://lakehouse-go-primary/datasets/workers/fp1.parquet')`,
|
|
}
|
|
for _, w := range wantHas {
|
|
if !strings.Contains(got, w) {
|
|
t.Errorf("missing %q in %q", w, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRefresh_SkipsUnchangedUpdatedAt(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
t1 := time.Unix(100, 0)
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
|
mkManifest("workers", "fp1", t1, "k1"),
|
|
}}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
|
|
if _, err := r.Refresh(context.Background()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Second refresh with same updated_at — should skip.
|
|
stats, err := r.Refresh(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if stats.Skipped != 1 || stats.Created != 0 || stats.Updated != 0 {
|
|
t.Errorf("second refresh stats: got %+v, want {Skipped:1}", stats)
|
|
}
|
|
// Only the first refresh should have produced an Exec call.
|
|
if len(exec.calls) != 1 {
|
|
t.Errorf("calls: got %d, want 1 (skip should not Exec)", len(exec.calls))
|
|
}
|
|
}
|
|
|
|
func TestRefresh_RebuildsOnUpdatedAtBump(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
t1 := time.Unix(100, 0)
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
|
mkManifest("workers", "fp1", t1, "k1"),
|
|
}}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
_, _ = r.Refresh(context.Background())
|
|
|
|
// Bump updated_at — same name + fp, but updated_at changed (idempotent re-register on ingestd).
|
|
t2 := time.Unix(200, 0)
|
|
cat.manifests[0].UpdatedAt = t2
|
|
stats, err := r.Refresh(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if stats.Updated != 1 {
|
|
t.Errorf("expected Updated:1 on updated_at bump, got %+v", stats)
|
|
}
|
|
if len(exec.calls) != 2 {
|
|
t.Errorf("calls: got %d, want 2 (initial + rebuild)", len(exec.calls))
|
|
}
|
|
}
|
|
|
|
func TestRefresh_DropsViewForRemovedManifest(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
t1 := time.Unix(100, 0)
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
|
mkManifest("workers", "fp1", t1, "k1"),
|
|
mkManifest("retired", "fp2", t1, "k2"),
|
|
}}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
_, _ = r.Refresh(context.Background())
|
|
|
|
// "retired" disappears from the catalog → registrar should DROP its view.
|
|
cat.manifests = cat.manifests[:1]
|
|
stats, err := r.Refresh(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if stats.Dropped != 1 {
|
|
t.Errorf("expected Dropped:1, got %+v", stats)
|
|
}
|
|
// Find the DROP VIEW call.
|
|
var found bool
|
|
for _, c := range exec.calls {
|
|
if strings.HasPrefix(c, `DROP VIEW IF EXISTS "retired"`) {
|
|
found = true
|
|
}
|
|
}
|
|
if !found {
|
|
t.Errorf("DROP VIEW not emitted; calls=%v", exec.calls)
|
|
}
|
|
}
|
|
|
|
func TestRefresh_QuotesAndEscapesNames(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
weird := `my "weird" dataset` // contains internal double quotes
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{
|
|
mkManifest(weird, "fp", time.Unix(1, 0), "k"),
|
|
}}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
|
|
if _, err := r.Refresh(context.Background()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
got := exec.calls[0]
|
|
wantQuoted := `"my ""weird"" dataset"`
|
|
if !strings.Contains(got, wantQuoted) {
|
|
t.Errorf("identifier not properly quoted: got %q, want substring %q", got, wantQuoted)
|
|
}
|
|
}
|
|
|
|
func TestRefresh_PropagatesCatalogError(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
cat := &fakeCatalog{err: errors.New("catalog down")}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
_, err := r.Refresh(context.Background())
|
|
if err == nil || !strings.Contains(err.Error(), "catalog down") {
|
|
t.Errorf("expected wrapped catalog error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRefresh_ManifestWithNoObjectsErrors(t *testing.T) {
|
|
exec := &fakeExecer{}
|
|
m := mkManifest("empty", "fp", time.Unix(1, 0), "k")
|
|
m.Objects = nil // pathological — registrar shouldn't synthesize an empty SQL.
|
|
cat := &fakeCatalog{manifests: []*catalogd.Manifest{m}}
|
|
r := NewRegistrar(exec, cat, "b")
|
|
_, err := r.Refresh(context.Background())
|
|
if err == nil || !strings.Contains(err.Error(), "no objects") {
|
|
t.Errorf("expected 'no objects' error, got %v", err)
|
|
}
|
|
}
|