package queryd import ( "context" "database/sql" "errors" "strings" "testing" "time" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" ) // fakeExecer records every SQL string Exec'd against it. type fakeExecer struct { calls []string } func (f *fakeExecer) ExecContext(_ context.Context, q string, _ ...any) (sql.Result, error) { f.calls = append(f.calls, q) return nil, nil } // fakeCatalog returns a fixed manifest list and an optional error. type fakeCatalog struct { manifests []*catalogd.Manifest err error } func (f *fakeCatalog) List(_ context.Context) ([]*catalogd.Manifest, error) { return f.manifests, f.err } func mkManifest(name, fp string, updatedAt time.Time, key string) *catalogd.Manifest { return &catalogd.Manifest{ Name: name, DatasetID: "id-" + name, SchemaFingerprint: fp, Objects: []catalogd.Object{{Key: key, Size: 1024}}, CreatedAt: updatedAt, UpdatedAt: updatedAt, } } func TestRefresh_CreatesViewsForNewManifests(t *testing.T) { exec := &fakeExecer{} cat := &fakeCatalog{manifests: []*catalogd.Manifest{ mkManifest("workers", "fp1", time.Unix(100, 0), "datasets/workers/fp1.parquet"), }} r := NewRegistrar(exec, cat, "lakehouse-go-primary") stats, err := r.Refresh(context.Background()) if err != nil { t.Fatal(err) } if stats.Created != 1 || stats.Updated != 0 || stats.Dropped != 0 { t.Errorf("stats: got %+v, want {Created:1}", stats) } if len(exec.calls) != 1 { t.Fatalf("calls: got %d, want 1", len(exec.calls)) } got := exec.calls[0] wantHas := []string{ `CREATE OR REPLACE VIEW "workers"`, `read_parquet('s3://lakehouse-go-primary/datasets/workers/fp1.parquet')`, } for _, w := range wantHas { if !strings.Contains(got, w) { t.Errorf("missing %q in %q", w, got) } } } func TestRefresh_SkipsUnchangedUpdatedAt(t *testing.T) { exec := &fakeExecer{} t1 := time.Unix(100, 0) cat := &fakeCatalog{manifests: []*catalogd.Manifest{ mkManifest("workers", "fp1", t1, "k1"), }} r := NewRegistrar(exec, cat, "b") if _, err := r.Refresh(context.Background()); err != nil { t.Fatal(err) } // Second refresh with same updated_at — should skip. stats, err := r.Refresh(context.Background()) if err != nil { t.Fatal(err) } if stats.Skipped != 1 || stats.Created != 0 || stats.Updated != 0 { t.Errorf("second refresh stats: got %+v, want {Skipped:1}", stats) } // Only the first refresh should have produced an Exec call. if len(exec.calls) != 1 { t.Errorf("calls: got %d, want 1 (skip should not Exec)", len(exec.calls)) } } func TestRefresh_RebuildsOnUpdatedAtBump(t *testing.T) { exec := &fakeExecer{} t1 := time.Unix(100, 0) cat := &fakeCatalog{manifests: []*catalogd.Manifest{ mkManifest("workers", "fp1", t1, "k1"), }} r := NewRegistrar(exec, cat, "b") _, _ = r.Refresh(context.Background()) // Bump updated_at — same name + fp, but updated_at changed (idempotent re-register on ingestd). t2 := time.Unix(200, 0) cat.manifests[0].UpdatedAt = t2 stats, err := r.Refresh(context.Background()) if err != nil { t.Fatal(err) } if stats.Updated != 1 { t.Errorf("expected Updated:1 on updated_at bump, got %+v", stats) } if len(exec.calls) != 2 { t.Errorf("calls: got %d, want 2 (initial + rebuild)", len(exec.calls)) } } func TestRefresh_DropsViewForRemovedManifest(t *testing.T) { exec := &fakeExecer{} t1 := time.Unix(100, 0) cat := &fakeCatalog{manifests: []*catalogd.Manifest{ mkManifest("workers", "fp1", t1, "k1"), mkManifest("retired", "fp2", t1, "k2"), }} r := NewRegistrar(exec, cat, "b") _, _ = r.Refresh(context.Background()) // "retired" disappears from the catalog → registrar should DROP its view. cat.manifests = cat.manifests[:1] stats, err := r.Refresh(context.Background()) if err != nil { t.Fatal(err) } if stats.Dropped != 1 { t.Errorf("expected Dropped:1, got %+v", stats) } // Find the DROP VIEW call. var found bool for _, c := range exec.calls { if strings.HasPrefix(c, `DROP VIEW IF EXISTS "retired"`) { found = true } } if !found { t.Errorf("DROP VIEW not emitted; calls=%v", exec.calls) } } func TestRefresh_QuotesAndEscapesNames(t *testing.T) { exec := &fakeExecer{} weird := `my "weird" dataset` // contains internal double quotes cat := &fakeCatalog{manifests: []*catalogd.Manifest{ mkManifest(weird, "fp", time.Unix(1, 0), "k"), }} r := NewRegistrar(exec, cat, "b") if _, err := r.Refresh(context.Background()); err != nil { t.Fatal(err) } got := exec.calls[0] wantQuoted := `"my ""weird"" dataset"` if !strings.Contains(got, wantQuoted) { t.Errorf("identifier not properly quoted: got %q, want substring %q", got, wantQuoted) } } func TestRefresh_PropagatesCatalogError(t *testing.T) { exec := &fakeExecer{} cat := &fakeCatalog{err: errors.New("catalog down")} r := NewRegistrar(exec, cat, "b") _, err := r.Refresh(context.Background()) if err == nil || !strings.Contains(err.Error(), "catalog down") { t.Errorf("expected wrapped catalog error, got %v", err) } } func TestRefresh_ManifestWithNoObjectsErrors(t *testing.T) { exec := &fakeExecer{} m := mkManifest("empty", "fp", time.Unix(1, 0), "k") m.Objects = nil // pathological — registrar shouldn't synthesize an empty SQL. cat := &fakeCatalog{manifests: []*catalogd.Manifest{m}} r := NewRegistrar(exec, cat, "b") _, err := r.Refresh(context.Background()) if err == nil || !strings.Contains(err.Error(), "no objects") { t.Errorf("expected 'no objects' error, got %v", err) } }