// queryd is the SQL execution layer. DuckDB engine (via cgo), reads // Parquet directly from S3 via httpfs, registers catalog datasets as // views, exposes POST /sql. The interesting glue is in // internal/queryd/{db,registrar}.go; main.go wires the lifecycle. package main import ( "context" "database/sql" "encoding/json" "errors" "flag" "log/slog" "net/http" "os" "strings" "time" "github.com/go-chi/chi/v5" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient" "git.agentview.dev/profit/golangLAKEHOUSE/internal/queryd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" ) const ( primaryBucket = "primary" maxSQLBodyBytes = 64 << 10 // 64 KiB cap on POST /sql body — SQL strings are not large defaultRefresh = 30 * time.Second ) func main() { configPath := flag.String("config", "lakehouse.toml", "path to TOML config") flag.Parse() cfg, err := shared.LoadConfig(*configPath) if err != nil { slog.Error("config", "err", err) os.Exit(1) } if cfg.Queryd.CatalogdURL == "" { slog.Error("config", "err", "queryd.catalogd_url is required") os.Exit(1) } if cfg.S3.Bucket == "" { slog.Error("config", "err", "s3.bucket is required") os.Exit(1) } refreshEvery := defaultRefresh if cfg.Queryd.RefreshEvery != "" { d, err := time.ParseDuration(cfg.Queryd.RefreshEvery) if err != nil { slog.Error("config", "err", "queryd.refresh_every is not a valid duration: "+err.Error()) os.Exit(1) } refreshEvery = d } // Long-running ctx tied to lifetime of the process for the // background refresh goroutine. Cancelled when shared.Run returns. procCtx, procCancel := context.WithCancel(context.Background()) defer procCancel() prov, err := secrets.NewFileProvider(cfg.Queryd.SecretsPath, secrets.S3Credentials{ AccessKeyID: cfg.S3.AccessKeyID, SecretAccessKey: cfg.S3.SecretAccessKey, }) if err != nil { slog.Error("secrets", "err", err) os.Exit(1) } db, err := queryd.OpenDB(procCtx, cfg.S3, prov, primaryBucket) if err != nil { slog.Error("duckdb open", "err", err) os.Exit(1) } defer db.Close() catalog := catalogclient.New(cfg.Queryd.CatalogdURL) registrar := queryd.NewRegistrar(db, catalog, cfg.S3.Bucket) // Initial refresh — log if non-empty so the operator sees what // got loaded. A failure here is non-fatal: catalogd may be coming // up later in the boot order, the TTL goroutine will retry. refreshCtx, refreshCancel := context.WithTimeout(procCtx, 10*time.Second) stats, err := registrar.Refresh(refreshCtx) refreshCancel() if err != nil { slog.Warn("initial refresh failed (will retry)", "err", err) } else { slog.Info("initial refresh", "created", stats.Created, "skipped", stats.Skipped) } // Background ticker — drives Refresh on the configured interval. go runRefreshLoop(procCtx, registrar, refreshEvery) h := &handlers{db: db} if err := shared.Run("queryd", cfg.Queryd.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) } } // runRefreshLoop drives Registrar.Refresh on a ticker. Cancellable // via ctx. Logs every refresh; in a quiet run that's chatty but // useful for tracking when datasets land. func runRefreshLoop(ctx context.Context, r *queryd.Registrar, every time.Duration) { ticker := time.NewTicker(every) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: rctx, cancel := context.WithTimeout(ctx, 10*time.Second) stats, err := r.Refresh(rctx) cancel() if err != nil { slog.Warn("refresh failed", "err", err) continue } if stats.Created+stats.Updated+stats.Dropped > 0 { slog.Info("refresh", "created", stats.Created, "updated", stats.Updated, "dropped", stats.Dropped, "skipped", stats.Skipped) } } } } type handlers struct { db *sql.DB } func (h *handlers) register(r chi.Router) { r.Post("/sql", h.handleSQL) } // sqlRequest is the POST /sql body shape. type sqlRequest struct { SQL string `json:"sql"` } // sqlResponse is the result envelope. Columns + rows is the compact // form; verbose row-as-object form is post-G0 if anyone needs it. type sqlResponse struct { Columns []sqlColumn `json:"columns"` Rows [][]any `json:"rows"` RowCount int `json:"row_count"` } type sqlColumn struct { Name string `json:"name"` Type string `json:"type"` } func (h *handlers) handleSQL(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() r.Body = http.MaxBytesReader(w, r.Body, maxSQLBodyBytes) var req sqlRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { var maxErr *http.MaxBytesError if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { http.Error(w, "sql body too large", http.StatusRequestEntityTooLarge) return } http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) return } if strings.TrimSpace(req.SQL) == "" { http.Error(w, "sql is empty", http.StatusBadRequest) return } rows, err := h.db.QueryContext(r.Context(), req.SQL) if err != nil { // DuckDB errors are user-facing for ad-hoc SQL — expose them // at 400 so callers can see what went wrong (table not found, // syntax error, etc.). Internal infra issues would surface as // 500 once we distinguish them later. http.Error(w, "query: "+err.Error(), http.StatusBadRequest) return } defer rows.Close() cols, err := rows.Columns() if err != nil { http.Error(w, "columns: "+err.Error(), http.StatusInternalServerError) return } colTypes, err := rows.ColumnTypes() if err != nil { http.Error(w, "column types: "+err.Error(), http.StatusInternalServerError) return } resp := sqlResponse{ Columns: make([]sqlColumn, len(cols)), Rows: [][]any{}, } for i, name := range cols { resp.Columns[i] = sqlColumn{Name: name, Type: colTypes[i].DatabaseTypeName()} } for rows.Next() { dest := make([]any, len(cols)) ptrs := make([]any, len(cols)) for i := range dest { ptrs[i] = &dest[i] } if err := rows.Scan(ptrs...); err != nil { http.Error(w, "scan: "+err.Error(), http.StatusInternalServerError) return } // JSON can't encode []byte as text by default — DuckDB returns // VARCHAR as []byte through database/sql. Convert here. for i, v := range dest { if b, ok := v.([]byte); ok { dest[i] = string(b) } } resp.Rows = append(resp.Rows, dest) } if err := rows.Err(); err != nil { http.Error(w, "rows err: "+err.Error(), http.StatusInternalServerError) return } resp.RowCount = len(resp.Rows) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(resp); err != nil { // Headers already sent — can't change the status code, but log // so an unsupported DuckDB column type (Decimal, INTERVAL, etc.) // surfaces in the operator log. Per scrum JSON-ERR (Opus). slog.Warn("sql encode response", "err", err) } }