From 05273ac06b9dcdf23b13e2394b6eeeb8b4549b18 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 30 Apr 2026 00:08:29 -0500 Subject: [PATCH] =?UTF-8?q?phase=204:=20chatd=20=E2=80=94=20multi-provider?= =?UTF-8?q?=20LLM=20dispatcher=20(ollama=20/=20cloud=20/=20openrouter=20/?= =?UTF-8?q?=20opencode=20/=20kimi)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit new cmd/chatd on :3220 routes /v1/chat to the right provider based on model-name prefix or :cloud suffix. closes the architectural gap named in lakehouse.toml [models]: tiers map to model IDs, but until phase 4 there was no service that could actually CALL those models from go. routing rules (registry.Resolve): ollama/ → local Ollama (prefix stripped) ollama_cloud/ → Ollama Cloud :cloud → Ollama Cloud (suffix variant — kimi-k2.6:cloud) openrouter// → OpenRouter (prefix stripped, OpenAI-compat) opencode/ → OpenCode unified Zen+Go kimi/ → Kimi For Coding (api.kimi.com/coding/v1) bare names → local Ollama (default) provider implementations: - internal/chat/types.go Provider interface, Request/Response, errors - internal/chat/registry.go prefix + :cloud suffix dispatch - internal/chat/ollama.go local Ollama via /api/chat (think=false default) - internal/chat/ollama_cloud.go Ollama Cloud via /api/generate (Bearer auth) - internal/chat/openai_compat.go shared OpenAI Chat Completions for the OpenRouter/OpenCode/Kimi family - internal/chat/builder.go BuildRegistry from BuilderInput; ResolveKey reads env then .env file fallback config: - ChatdConfig in internal/shared/config.go with bind, ollama_url, per-provider key env names + .env fallback paths, timeout - Gateway gains chatd_url + /v1/chat + /v1/chat/* routes - lakehouse.toml [chatd] block with /etc/lakehouse/.env defaults tests (19 in internal/chat): - registry: prefix + :cloud + errors + telemetry + provider listing - ollama: happy path + prefix strip + format=json + 500 mapping + flatten_messages - openai_compat: happy path + format=json + 429 mapping + zero-choices think=false default in ollama + ollama_cloud — local hot path skips reasoning, low-budget callers (the playbook_lift judge at max_tokens=10) get direct answers instead of empty content + done_reason=length. proven via chatd_smoke acceptance. acceptance gate: scripts/chatd_smoke.sh — 6/6 PASS: 1. /v1/chat/providers lists exactly registered providers (1 in dev mode) 2. bare model → ollama default with content + token counts + latency 3. explicit ollama/ → prefix stripped at upstream 4. :cloud without ollama_cloud registered → 404 (no silent fall-through) 5. unknown/ → falls through to default → upstream 502 (no prefix rewrite) 6. missing model field → 400 just verify: PASS (vet + 30 packages × short tests + 9 smokes). chatd_smoke is a domain smoke (not in just verify, mirrors matrix / observer / pathway pattern). Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/chatd/main.go | 153 ++++++++++++++++++++++++ cmd/gateway/main.go | 6 + internal/chat/builder.go | 105 +++++++++++++++++ internal/chat/ollama.go | 142 ++++++++++++++++++++++ internal/chat/ollama_cloud.go | 148 +++++++++++++++++++++++ internal/chat/ollama_test.go | 128 ++++++++++++++++++++ internal/chat/openai_compat.go | 141 ++++++++++++++++++++++ internal/chat/openai_compat_test.go | 131 +++++++++++++++++++++ internal/chat/registry.go | 146 +++++++++++++++++++++++ internal/chat/registry_test.go | 168 ++++++++++++++++++++++++++ internal/chat/types.go | 104 +++++++++++++++++ internal/shared/config.go | 46 ++++++++ lakehouse.toml | 31 +++++ scripts/chatd_smoke.sh | 175 ++++++++++++++++++++++++++++ 14 files changed, 1624 insertions(+) create mode 100644 cmd/chatd/main.go create mode 100644 internal/chat/builder.go create mode 100644 internal/chat/ollama.go create mode 100644 internal/chat/ollama_cloud.go create mode 100644 internal/chat/ollama_test.go create mode 100644 internal/chat/openai_compat.go create mode 100644 internal/chat/openai_compat_test.go create mode 100644 internal/chat/registry.go create mode 100644 internal/chat/registry_test.go create mode 100644 internal/chat/types.go create mode 100755 scripts/chatd_smoke.sh diff --git a/cmd/chatd/main.go b/cmd/chatd/main.go new file mode 100644 index 0000000..85fe620 --- /dev/null +++ b/cmd/chatd/main.go @@ -0,0 +1,153 @@ +// chatd is the LLM chat dispatcher service (Phase 4 — small-model +// pipeline tier abstraction). Routes POST /chat to the right +// provider based on the model-name prefix or :cloud suffix: +// +// ollama/ → local Ollama (no auth) +// ollama_cloud/ → Ollama Cloud (Bearer auth) +// :cloud → Ollama Cloud (suffix variant) +// openrouter// → OpenRouter (Bearer auth) +// opencode/ → OpenCode unified Zen+Go (Bearer auth) +// kimi/ → Kimi For Coding (Bearer auth) +// bare names → local Ollama (default) +// +// Provider keys come from env vars (or /etc/lakehouse/.env +// fallback files). Providers with empty keys stay unregistered, so +// requests for them 404 cleanly instead of 503-ing at call time. +// +// Routes: +// +// POST /chat — dispatch a chat request to the resolved provider +// GET /providers — list registered providers (telemetry / health) +// GET /health — readiness (always 200 — sub-providers are +// independently checked via /providers) +package main + +import ( + "encoding/json" + "flag" + "errors" + "log/slog" + "net/http" + "os" + "time" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/chat" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +const maxRequestBytes = 4 << 20 // 4 MiB cap on /chat bodies + +func main() { + configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + flag.Parse() + + cfg, err := shared.LoadConfig(*configPath) + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + + timeout := time.Duration(cfg.Chatd.TimeoutSecs) * time.Second + registry := chat.BuildRegistry(chat.BuilderInput{ + OllamaURL: cfg.Chatd.OllamaURL, + OllamaCloudKey: chat.ResolveKey(cfg.Chatd.OllamaCloudKeyEnv, cfg.Chatd.OllamaCloudKeyEnv, cfg.Chatd.OllamaCloudKeyFile), + OpenRouterKey: chat.ResolveKey(cfg.Chatd.OpenRouterKeyEnv, cfg.Chatd.OpenRouterKeyEnv, cfg.Chatd.OpenRouterKeyFile), + OpenCodeKey: chat.ResolveKey(cfg.Chatd.OpenCodeKeyEnv, cfg.Chatd.OpenCodeKeyEnv, cfg.Chatd.OpenCodeKeyFile), + KimiKey: chat.ResolveKey(cfg.Chatd.KimiKeyEnv, cfg.Chatd.KimiKeyEnv, cfg.Chatd.KimiKeyFile), + Timeout: timeout, + }) + + h := &handlers{registry: registry} + if err := shared.Run("chatd", cfg.Chatd.Bind, h.register, cfg.Auth); err != nil { + slog.Error("server", "err", err) + os.Exit(1) + } +} + +type handlers struct { + registry *chat.Registry +} + +func (h *handlers) register(r chi.Router) { + // Routes mirror what the gateway proxies: /v1/chat → /chat (POST) + // and /v1/chat/providers → /chat/providers (GET). Keeping providers + // under /chat/ avoids a separate /providers root route that would + // need its own gateway proxy entry. + r.Post("/chat", h.handleChat) + r.Get("/chat/providers", h.handleProviders) +} + +func (h *handlers) handleChat(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + defer r.Body.Close() + + var req chat.Request + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest) + return + } + if req.Model == "" || len(req.Messages) == 0 { + http.Error(w, "model and messages are required", http.StatusBadRequest) + return + } + + resp, err := h.registry.Chat(r.Context(), req) + if err != nil { + writeChatError(w, err) + return + } + writeJSON(w, http.StatusOK, resp) +} + +func (h *handlers) handleProviders(w http.ResponseWriter, _ *http.Request) { + names := h.registry.Names() + statuses := make(map[string]bool, len(names)) + for _, n := range names { + // Look up the provider via Resolve (uses registry's prefix + // rules). Fake a request for the bare name to skip prefix + // stripping in Resolve — feed it the prefixed form. + var probe string + if n == "ollama" { + probe = "qwen3.5:latest" // bare-name default route + } else { + probe = n + "/probe" + } + p, err := h.registry.Resolve(probe) + if err != nil { + statuses[n] = false + continue + } + statuses[n] = p.Available() + } + writeJSON(w, http.StatusOK, map[string]any{ + "providers": statuses, + }) +} + +// writeChatError maps chat sentinel errors to HTTP status codes. +// Unknown errors map to 500. +func writeChatError(w http.ResponseWriter, err error) { + switch { + case errors.Is(err, chat.ErrProviderNotFound): + http.Error(w, err.Error(), http.StatusNotFound) + case errors.Is(err, chat.ErrProviderDisabled): + http.Error(w, err.Error(), http.StatusServiceUnavailable) + case errors.Is(err, chat.ErrUpstream): + http.Error(w, err.Error(), http.StatusBadGateway) + case errors.Is(err, chat.ErrTimeout): + http.Error(w, err.Error(), http.StatusGatewayTimeout) + default: + slog.Error("chat", "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + } +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(body); err != nil { + slog.Error("encode", "err", err) + } +} diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index a2d7638..ba10f9f 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -47,6 +47,7 @@ func main() { "pathwayd_url": cfg.Gateway.PathwaydURL, "matrixd_url": cfg.Gateway.MatrixdURL, "observerd_url": cfg.Gateway.ObserverdURL, + "chatd_url": cfg.Gateway.ChatdURL, } for k, v := range upstreams { if v == "" { @@ -69,6 +70,7 @@ func main() { pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL) matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL) observerdURL := mustParseUpstream("observerd_url", cfg.Gateway.ObserverdURL) + chatdURL := mustParseUpstream("chatd_url", cfg.Gateway.ChatdURL) storagedProxy := gateway.NewProxyHandler(storagedURL) catalogdProxy := gateway.NewProxyHandler(catalogdURL) @@ -79,6 +81,7 @@ func main() { pathwaydProxy := gateway.NewProxyHandler(pathwaydURL) matrixdProxy := gateway.NewProxyHandler(matrixdURL) observerdProxy := gateway.NewProxyHandler(observerdURL) + chatdProxy := gateway.NewProxyHandler(chatdURL) if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) { @@ -103,6 +106,9 @@ func main() { r.Handle("/v1/matrix/*", matrixdProxy) // Observer — /v1/observer/* (autonomous-iteration witness loop) r.Handle("/v1/observer/*", observerdProxy) + // Chat — /v1/chat (LLM dispatcher) + /v1/chat/providers + r.Handle("/v1/chat", chatdProxy) + r.Handle("/v1/chat/*", chatdProxy) }, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) diff --git a/internal/chat/builder.go b/internal/chat/builder.go new file mode 100644 index 0000000..52e1259 --- /dev/null +++ b/internal/chat/builder.go @@ -0,0 +1,105 @@ +package chat + +import ( + "bufio" + "log/slog" + "os" + "strings" + "time" +) + +// BuilderInput drives provider construction. Each field maps to one +// provider; empty fields mean "skip" (the registry won't have that +// provider — :cloud suffix or openrouter/* prefixes will 404 cleanly). +type BuilderInput struct { + OllamaURL string // local Ollama, no auth (typically http://localhost:11434) + OllamaCloudKey string // OLLAMA_CLOUD_KEY + OpenRouterKey string // OPENROUTER_API_KEY + OpenCodeKey string // OPENCODE_API_KEY + KimiKey string // KIMI_API_KEY + Timeout time.Duration // default 180s +} + +// BuildRegistry constructs a Registry from the input. Logs which +// providers were registered (for operator confidence at boot). +func BuildRegistry(in BuilderInput) *Registry { + if in.Timeout == 0 { + in.Timeout = 180 * time.Second + } + + var providers []Provider + registered := []string{} + + // Local Ollama always registered if URL given (no auth needed). + if in.OllamaURL != "" { + providers = append(providers, NewOllama(in.OllamaURL, in.Timeout)) + registered = append(registered, "ollama") + } + if in.OllamaCloudKey != "" { + providers = append(providers, NewOllamaCloud(in.OllamaCloudKey, in.Timeout)) + registered = append(registered, "ollama_cloud") + } + if in.OpenRouterKey != "" { + providers = append(providers, NewOpenRouter(in.OpenRouterKey, in.Timeout)) + registered = append(registered, "openrouter") + } + if in.OpenCodeKey != "" { + providers = append(providers, NewOpenCode(in.OpenCodeKey, in.Timeout)) + registered = append(registered, "opencode") + } + if in.KimiKey != "" { + providers = append(providers, NewKimi(in.KimiKey, in.Timeout)) + registered = append(registered, "kimi") + } + + r := NewRegistry(providers...) + slog.Info("chat registry built", "providers", registered) + return r +} + +// ResolveKey reads an API key with the priority chain: +// 1. Explicit env var (named by envVar) +// 2. .env file at filePath (e.g. /etc/lakehouse/openrouter.env) +// with KEY=value lines; the first matching line wins. +// 3. "" if neither set +// +// Mirrors the Rust adapter's resolve_*_key() pattern. Empty key +// means the provider stays unregistered — operators see one fewer +// entry in the boot log instead of a 503 at first request. +func ResolveKey(envVar, envFileName, envFilePath string) string { + if envVar != "" { + if v := strings.TrimSpace(os.Getenv(envVar)); v != "" { + return v + } + } + if envFilePath != "" { + if v := readEnvFileVar(envFilePath, envFileName); v != "" { + return v + } + } + return "" +} + +// readEnvFileVar reads a KEY=value style env file and returns the +// value of `name`. Returns "" on any error or missing key. Stops at +// first match. No quoting/escaping — same simple shape that systemd +// EnvironmentFile= reads. +func readEnvFileVar(path, name string) string { + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + scanner := bufio.NewScanner(f) + prefix := name + "=" + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if strings.HasPrefix(line, prefix) { + return strings.Trim(strings.TrimPrefix(line, prefix), `"'`) + } + } + return "" +} diff --git a/internal/chat/ollama.go b/internal/chat/ollama.go new file mode 100644 index 0000000..d3899bc --- /dev/null +++ b/internal/chat/ollama.go @@ -0,0 +1,142 @@ +package chat + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// Ollama (local) provider — calls /api/chat on the local Ollama +// server. No auth needed; default URL http://localhost:11434. +// +// Bare model names route here by default (registry.defaultName=ollama), +// so "qwen3.5:latest" → ollama. Explicit "ollama/qwen3.5:latest" also +// works (prefix stripped). +type Ollama struct { + baseURL string + httpClient *http.Client +} + +// NewOllama returns a local Ollama provider. baseURL defaults to +// http://localhost:11434 when empty. timeout 0 → 180s. +func NewOllama(baseURL string, timeout time.Duration) *Ollama { + if baseURL == "" { + baseURL = "http://localhost:11434" + } + if timeout == 0 { + timeout = 180 * time.Second + } + return &Ollama{ + baseURL: strings.TrimRight(baseURL, "/"), + httpClient: &http.Client{Timeout: timeout}, + } +} + +func (o *Ollama) Name() string { return "ollama" } + +// Available pings /api/tags. Cached negative result would be a +// premature optimization for G0 — Ollama is typically up. If down, +// next call gets ErrUpstream which is the right signal anyway. +func (o *Ollama) Available() bool { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + req, _ := http.NewRequestWithContext(ctx, "GET", o.baseURL+"/api/tags", nil) + resp, err := o.httpClient.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode/100 == 2 +} + +// Chat translates Request to Ollama's /api/chat shape and back. +// Strips the optional "ollama/" prefix from req.Model. +func (o *Ollama) Chat(ctx context.Context, req Request) (*Response, error) { + model := StripPrefix(req.Model, "ollama") + + body := map[string]any{ + "model": model, + "messages": req.Messages, + "stream": false, + // Local hot path: skip reasoning by default. qwen3 / qwen3.5 are + // thinking-capable but the inner-loop use case wants direct + // answers, not reasoning traces. Without this, low max_tokens + // budgets get consumed by thinking before any content is + // produced. Cloud tier (Ollama Cloud) inherits the same default + // — see ollama_cloud.go. + "think": false, + "options": map[string]any{ + "temperature": req.Temperature, + }, + } + if req.MaxTokens > 0 { + body["options"].(map[string]any)["num_predict"] = req.MaxTokens + } + if req.Format == "json" { + body["format"] = "json" + } + + bs, _ := json.Marshal(body) + httpReq, err := http.NewRequestWithContext(ctx, "POST", o.baseURL+"/api/chat", bytes.NewReader(bs)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := o.httpClient.Do(httpReq) + if err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return nil, fmt.Errorf("%w: %s", ErrTimeout, "ollama") + } + return nil, fmt.Errorf("ollama: %w", err) + } + defer resp.Body.Close() + + rb, _ := io.ReadAll(resp.Body) + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("%w: ollama %d: %s", ErrUpstream, resp.StatusCode, abbrev(string(rb), 200)) + } + + var ollamaResp struct { + Model string `json:"model"` + Message struct { + Content string `json:"content"` + } `json:"message"` + Done bool `json:"done"` + PromptEvalCount int `json:"prompt_eval_count"` + EvalCount int `json:"eval_count"` + } + if err := json.Unmarshal(rb, &ollamaResp); err != nil { + return nil, fmt.Errorf("ollama decode: %w (body=%s)", err, abbrev(string(rb), 200)) + } + + return &Response{ + Model: model, + Content: ollamaResp.Message.Content, + InputTokens: ollamaResp.PromptEvalCount, + OutputTokens: ollamaResp.EvalCount, + FinishReason: finishReasonFromDone(ollamaResp.Done), + }, nil +} + +func finishReasonFromDone(done bool) string { + if done { + return "stop" + } + return "length" +} + +// abbrev shortens long error bodies for log/error messages without +// pulling fmt's truncation flags everywhere. +func abbrev(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "…" +} diff --git a/internal/chat/ollama_cloud.go b/internal/chat/ollama_cloud.go new file mode 100644 index 0000000..082a75b --- /dev/null +++ b/internal/chat/ollama_cloud.go @@ -0,0 +1,148 @@ +package chat + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// OllamaCloud — Ollama Cloud (Pro plan as of 2026-04-28). Bearer +// auth via OLLAMA_CLOUD_KEY. Wire format mirrors the Rust adapter +// (crates/gateway/src/v1/ollama_cloud.rs) — system/prompt split +// against /api/generate with cloud-friendly num_predict floor. +// +// Model routing: both "cloud/" (prefix) and ":cloud" +// (suffix) route here. The registry handles the suffix case; this +// provider only sees the prefix form. Suffix models pass through +// the StripPrefix call unchanged (no leading "cloud/" to strip), +// which is what the upstream wants. +type OllamaCloud struct { + apiKey string + baseURL string + httpClient *http.Client +} + +func NewOllamaCloud(apiKey string, timeout time.Duration) *OllamaCloud { + if timeout == 0 { + timeout = 180 * time.Second + } + return &OllamaCloud{ + apiKey: apiKey, + baseURL: "https://ollama.com", + httpClient: &http.Client{Timeout: timeout}, + } +} + +func (o *OllamaCloud) Name() string { return "ollama_cloud" } +func (o *OllamaCloud) Available() bool { return o.apiKey != "" } + +func (o *OllamaCloud) Chat(ctx context.Context, req Request) (*Response, error) { + model := StripPrefix(req.Model, "cloud") + // ":cloud" suffix passes through unchanged — that's the + // canonical name on ollama.com. + + system, prompt := flattenMessages(req.Messages) + + body := map[string]any{ + "model": model, + "prompt": prompt, + "stream": false, + // Mirror local Ollama — skip reasoning by default. Callers that + // need thinking explicitly (e.g. for Kimi K2 long reasoning) + // will get a future Request.Think field; the v0 default keeps + // outputs short and predictable. + "think": false, + "options": map[string]any{ + // Cloud reasoning models need headroom — 400 floor matches + // the Rust adapter's policy for kimi-k2.6 / gpt-oss:120b. + "num_predict": maxInt(req.MaxTokens, 400), + "temperature": defaultFloat(req.Temperature, 0.3), + }, + } + if system != "" { + body["system"] = system + } + if req.Format == "json" { + body["format"] = "json" + } + + bs, _ := json.Marshal(body) + httpReq, err := http.NewRequestWithContext(ctx, "POST", o.baseURL+"/api/generate", bytes.NewReader(bs)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", "Bearer "+o.apiKey) + + resp, err := o.httpClient.Do(httpReq) + if err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return nil, fmt.Errorf("%w: ollama_cloud", ErrTimeout) + } + return nil, fmt.Errorf("ollama_cloud: %w", err) + } + defer resp.Body.Close() + + rb, _ := io.ReadAll(resp.Body) + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("%w: ollama_cloud %d: %s", ErrUpstream, resp.StatusCode, abbrev(string(rb), 300)) + } + + var cloudResp struct { + Model string `json:"model"` + Response string `json:"response"` + Done bool `json:"done"` + PromptEvalCount int `json:"prompt_eval_count"` + EvalCount int `json:"eval_count"` + } + if err := json.Unmarshal(rb, &cloudResp); err != nil { + return nil, fmt.Errorf("ollama_cloud decode: %w (body=%s)", err, abbrev(string(rb), 200)) + } + + return &Response{ + Model: model, + Content: cloudResp.Response, + InputTokens: cloudResp.PromptEvalCount, + OutputTokens: cloudResp.EvalCount, + FinishReason: finishReasonFromDone(cloudResp.Done), + }, nil +} + +// flattenMessages splits a Message list into Ollama Cloud's +// /api/generate shape (single system + concatenated prompt). +// Mirrors the Rust adapter's flatten_messages_public. +func flattenMessages(messages []Message) (system, prompt string) { + var sysParts []string + var promptParts []string + for _, m := range messages { + switch m.Role { + case "system": + sysParts = append(sysParts, m.Content) + case "assistant": + promptParts = append(promptParts, "Assistant: "+m.Content) + default: // "user" or anything else + promptParts = append(promptParts, "User: "+m.Content) + } + } + return strings.Join(sysParts, "\n\n"), strings.Join(promptParts, "\n\n") +} + +func maxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func defaultFloat(v, fallback float64) float64 { + if v == 0 { + return fallback + } + return v +} diff --git a/internal/chat/ollama_test.go b/internal/chat/ollama_test.go new file mode 100644 index 0000000..56d4ed5 --- /dev/null +++ b/internal/chat/ollama_test.go @@ -0,0 +1,128 @@ +package chat + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// ollamaServer mocks /api/chat + /api/tags. Captures last request body. +func ollamaServer(t *testing.T, status int, respBody string) (*httptest.Server, *string) { + t.Helper() + captured := "" + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/tags": + w.WriteHeader(200) + _, _ = w.Write([]byte(`{"models":[{"name":"qwen3.5:latest"}]}`)) + return + case "/api/chat": + bs, _ := io.ReadAll(r.Body) + captured = string(bs) + w.WriteHeader(status) + _, _ = w.Write([]byte(respBody)) + return + default: + w.WriteHeader(404) + } + })) + t.Cleanup(srv.Close) + return srv, &captured +} + +func TestOllama_ChatHappyPath(t *testing.T) { + resp := `{ + "model": "qwen3.5:latest", + "message": {"content": "ok"}, + "done": true, + "prompt_eval_count": 7, + "eval_count": 3 + }` + srv, captured := ollamaServer(t, 200, resp) + o := NewOllama(srv.URL, 5*time.Second) + + out, err := o.Chat(context.Background(), Request{ + Model: "qwen3.5:latest", + Messages: []Message{{Role: "user", Content: "hi"}}, + }) + if err != nil { + t.Fatalf("Chat: %v", err) + } + if out.Content != "ok" { + t.Errorf("Content = %q, want ok", out.Content) + } + if out.InputTokens != 7 || out.OutputTokens != 3 { + t.Errorf("tokens = (%d, %d), want (7, 3)", out.InputTokens, out.OutputTokens) + } + + // Verify captured request shape: stream=false, options.temperature + // surfaced. + var sent map[string]any + if err := json.Unmarshal([]byte(*captured), &sent); err != nil { + t.Fatalf("parse captured: %v", err) + } + if sent["stream"] != false { + t.Errorf("stream should be false, got %v", sent["stream"]) + } +} + +func TestOllama_StripsExplicitPrefix(t *testing.T) { + srv, captured := ollamaServer(t, 200, `{"message":{"content":""},"done":true}`) + o := NewOllama(srv.URL, 5*time.Second) + _, err := o.Chat(context.Background(), Request{Model: "ollama/qwen3.5:latest"}) + if err != nil { + t.Fatalf("Chat: %v", err) + } + var sent map[string]any + _ = json.Unmarshal([]byte(*captured), &sent) + if sent["model"] != "qwen3.5:latest" { + t.Errorf("upstream model = %v, want qwen3.5:latest (prefix stripped)", sent["model"]) + } +} + +func TestOllama_FormatJSON(t *testing.T) { + srv, captured := ollamaServer(t, 200, `{"message":{"content":"{}"},"done":true}`) + o := NewOllama(srv.URL, 5*time.Second) + _, _ = o.Chat(context.Background(), Request{Model: "qwen3.5:latest", Format: "json"}) + if !strings.Contains(*captured, `"format":"json"`) { + t.Errorf("Format=json should set top-level format=json; captured=%s", *captured) + } +} + +func TestOllama_Available(t *testing.T) { + srv, _ := ollamaServer(t, 200, "{}") + o := NewOllama(srv.URL, 5*time.Second) + if !o.Available() { + t.Errorf("server is up; Available should be true") + } +} + +func TestOllama_UpstreamError(t *testing.T) { + srv, _ := ollamaServer(t, 500, `{"error":"out of memory"}`) + o := NewOllama(srv.URL, 5*time.Second) + _, err := o.Chat(context.Background(), Request{Model: "qwen3.5:latest"}) + if !errors.Is(err, ErrUpstream) { + t.Errorf("500 should ErrUpstream; got %v", err) + } +} + +func TestOllamaCloud_FlattenMessages(t *testing.T) { + system, prompt := flattenMessages([]Message{ + {Role: "system", Content: "You are helpful."}, + {Role: "user", Content: "hi"}, + {Role: "assistant", Content: "hello!"}, + {Role: "user", Content: "how are you?"}, + }) + if system != "You are helpful." { + t.Errorf("system = %q, want 'You are helpful.'", system) + } + if !strings.Contains(prompt, "User: hi") || !strings.Contains(prompt, "Assistant: hello!") || !strings.Contains(prompt, "User: how are you?") { + t.Errorf("prompt missing role tags: %q", prompt) + } +} diff --git a/internal/chat/openai_compat.go b/internal/chat/openai_compat.go new file mode 100644 index 0000000..2bfbfe7 --- /dev/null +++ b/internal/chat/openai_compat.go @@ -0,0 +1,141 @@ +package chat + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// openaiCompat implements the OpenAI Chat Completions wire format, +// which OpenRouter / OpenCode / Kimi all speak. Differences between +// those three providers are limited to: +// - base URL (e.g. /v1/chat/completions vs /zen/v1/chat/completions) +// - prefix stripped from req.Model +// - bearer token source (different env vars) +// +// Each concrete provider wraps this with its own constructor pinning +// those values. +type openaiCompat struct { + name string + baseURL string + apiKey string + prefix string + httpClient *http.Client +} + +func newOpenAICompat(name, baseURL, apiKey, prefix string, timeout time.Duration) *openaiCompat { + if timeout == 0 { + timeout = 180 * time.Second + } + return &openaiCompat{ + name: name, + baseURL: strings.TrimRight(baseURL, "/"), + apiKey: apiKey, + prefix: prefix, + httpClient: &http.Client{Timeout: timeout}, + } +} + +func (c *openaiCompat) Name() string { return c.name } +func (c *openaiCompat) Available() bool { return c.apiKey != "" } + +func (c *openaiCompat) Chat(ctx context.Context, req Request) (*Response, error) { + model := StripPrefix(req.Model, c.prefix) + + body := map[string]any{ + "model": model, + "messages": req.Messages, + "stream": false, + "temperature": req.Temperature, + } + if req.MaxTokens > 0 { + body["max_tokens"] = req.MaxTokens + } + if req.Format == "json" { + // OpenAI-compat response_format. OpenRouter passes through; + // OpenCode + Kimi accept it. Worst case the upstream ignores it. + body["response_format"] = map[string]any{"type": "json_object"} + } + + bs, _ := json.Marshal(body) + url := c.baseURL + "/chat/completions" + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bs)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) + // HTTP-Referer + X-Title are OpenRouter-specific but harmless on + // other providers — they pass them through to upstream attribution. + httpReq.Header.Set("HTTP-Referer", "https://golanglakehouse.local") + httpReq.Header.Set("X-Title", "Lakehouse-Go") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return nil, fmt.Errorf("%w: %s", ErrTimeout, c.name) + } + return nil, fmt.Errorf("%s: %w", c.name, err) + } + defer resp.Body.Close() + + rb, _ := io.ReadAll(resp.Body) + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("%w: %s %d: %s", ErrUpstream, c.name, resp.StatusCode, abbrev(string(rb), 300)) + } + + var oaResp struct { + Model string `json:"model"` + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + FinishReason string `json:"finish_reason"` + } `json:"choices"` + Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + } `json:"usage"` + } + if err := json.Unmarshal(rb, &oaResp); err != nil { + return nil, fmt.Errorf("%s decode: %w (body=%s)", c.name, err, abbrev(string(rb), 200)) + } + if len(oaResp.Choices) == 0 { + return nil, fmt.Errorf("%w: %s returned 0 choices: %s", ErrUpstream, c.name, abbrev(string(rb), 200)) + } + + return &Response{ + Model: model, + Content: oaResp.Choices[0].Message.Content, + InputTokens: oaResp.Usage.PromptTokens, + OutputTokens: oaResp.Usage.CompletionTokens, + FinishReason: oaResp.Choices[0].FinishReason, + }, nil +} + +// NewOpenRouter returns a provider for openrouter.ai. apiKey +// resolved at construction (typically from OPENROUTER_API_KEY env or +// /etc/lakehouse/openrouter.env). +func NewOpenRouter(apiKey string, timeout time.Duration) *openaiCompat { + return newOpenAICompat("openrouter", "https://openrouter.ai/api/v1", apiKey, "openrouter", timeout) +} + +// NewOpenCode returns a provider for OpenCode's unified Zen+Go endpoint. +// One key reaches Anthropic Opus, GPT-5, Gemini 3.1, Kimi K2.6, +// DeepSeek, GLM, Qwen, plus 4 free-tier models. +func NewOpenCode(apiKey string, timeout time.Duration) *openaiCompat { + return newOpenAICompat("opencode", "https://opencode.ai/zen/v1", apiKey, "opencode", timeout) +} + +// NewKimi returns a provider for the direct Kimi For Coding endpoint. +// `api.kimi.com` is a separate account system from api.moonshot.ai — +// keys are NOT interchangeable. +func NewKimi(apiKey string, timeout time.Duration) *openaiCompat { + return newOpenAICompat("kimi", "https://api.kimi.com/coding/v1", apiKey, "kimi", timeout) +} diff --git a/internal/chat/openai_compat_test.go b/internal/chat/openai_compat_test.go new file mode 100644 index 0000000..a0e4ac8 --- /dev/null +++ b/internal/chat/openai_compat_test.go @@ -0,0 +1,131 @@ +package chat + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// openaiServer returns an httptest server that mocks an OpenAI Chat +// Completions endpoint. Handler captures the last request body for +// assertion. +func openaiServer(t *testing.T, status int, respBody string) (*httptest.Server, *string) { + t.Helper() + captured := "" + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bs, _ := io.ReadAll(r.Body) + captured = string(bs) + // Ensure Authorization header was set (Bearer prefix). + if auth := r.Header.Get("Authorization"); !strings.HasPrefix(auth, "Bearer ") { + t.Errorf("missing Bearer auth header; got %q", auth) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write([]byte(respBody)) + })) + t.Cleanup(srv.Close) + return srv, &captured +} + +func TestOpenAICompat_HappyPath(t *testing.T) { + resp := `{ + "model": "anthropic/claude-opus-4-7", + "choices": [{"message":{"content":"hello world"}, "finish_reason":"stop"}], + "usage": {"prompt_tokens": 10, "completion_tokens": 5} + }` + srv, captured := openaiServer(t, 200, resp) + + p := newOpenAICompat("openrouter", srv.URL+"/v1", "test-key", "openrouter", 5*time.Second) + out, err := p.Chat(context.Background(), Request{ + Model: "openrouter/anthropic/claude-opus-4-7", + Messages: []Message{{Role: "user", Content: "hi"}}, + }) + if err != nil { + t.Fatalf("Chat: %v", err) + } + if out.Content != "hello world" { + t.Errorf("Content = %q, want hello world", out.Content) + } + // Provider strips its prefix before sending to upstream. + var sent map[string]any + if err := json.Unmarshal([]byte(*captured), &sent); err != nil { + t.Fatalf("parse captured: %v", err) + } + if sent["model"] != "anthropic/claude-opus-4-7" { + t.Errorf("upstream got model = %v, want anthropic/claude-opus-4-7 (prefix stripped)", sent["model"]) + } + // Token accounting carried through. + if out.InputTokens != 10 || out.OutputTokens != 5 { + t.Errorf("tokens = (%d, %d), want (10, 5)", out.InputTokens, out.OutputTokens) + } + if out.FinishReason != "stop" { + t.Errorf("FinishReason = %q, want stop", out.FinishReason) + } +} + +func TestOpenAICompat_FormatJSON(t *testing.T) { + resp := `{"choices":[{"message":{"content":"{\"a\":1}"},"finish_reason":"stop"}],"usage":{}}` + srv, captured := openaiServer(t, 200, resp) + p := newOpenAICompat("opencode", srv.URL+"/zen/v1", "test-key", "opencode", 5*time.Second) + + _, err := p.Chat(context.Background(), Request{ + Model: "opencode/claude-opus-4-7", + Messages: []Message{{Role: "user", Content: "give me JSON"}}, + Format: "json", + }) + if err != nil { + t.Fatalf("Chat: %v", err) + } + // Format=json should set response_format on the upstream call. + if !strings.Contains(*captured, `"response_format"`) || !strings.Contains(*captured, `"json_object"`) { + t.Errorf("Format=json should set response_format json_object; captured=%s", *captured) + } +} + +func TestOpenAICompat_UpstreamError(t *testing.T) { + srv, _ := openaiServer(t, 429, `{"error":"rate limited"}`) + p := newOpenAICompat("openrouter", srv.URL+"/v1", "test-key", "openrouter", 5*time.Second) + _, err := p.Chat(context.Background(), Request{Model: "openrouter/x"}) + if !errors.Is(err, ErrUpstream) { + t.Errorf("429 should be ErrUpstream; got %v", err) + } +} + +func TestOpenAICompat_ZeroChoices(t *testing.T) { + srv, _ := openaiServer(t, 200, `{"choices":[],"usage":{}}`) + p := newOpenAICompat("openrouter", srv.URL+"/v1", "test-key", "openrouter", 5*time.Second) + _, err := p.Chat(context.Background(), Request{Model: "openrouter/x"}) + if !errors.Is(err, ErrUpstream) { + t.Errorf("zero-choices should ErrUpstream; got %v", err) + } +} + +func TestOpenAICompat_EmptyKeyUnavailable(t *testing.T) { + p := newOpenAICompat("openrouter", "https://example.com", "", "openrouter", 5*time.Second) + if p.Available() { + t.Errorf("empty key should make provider unavailable") + } +} + +func TestNewProviderConstructors(t *testing.T) { + // Smoke: each public constructor produces a working provider with + // the right name/prefix and respects empty-key=unavailable. + or := NewOpenRouter("test-key", 0) + if or.Name() != "openrouter" || !or.Available() { + t.Errorf("openrouter constructor wrong: %s avail=%v", or.Name(), or.Available()) + } + oc := NewOpenCode("test-key", 0) + if oc.Name() != "opencode" || !oc.Available() { + t.Errorf("opencode constructor wrong: %s avail=%v", oc.Name(), oc.Available()) + } + km := NewKimi("", 0) // empty key → unavailable + if km.Available() { + t.Errorf("kimi with empty key should be unavailable") + } +} diff --git a/internal/chat/registry.go b/internal/chat/registry.go new file mode 100644 index 0000000..481581f --- /dev/null +++ b/internal/chat/registry.go @@ -0,0 +1,146 @@ +package chat + +import ( + "context" + "fmt" + "strings" + "time" +) + +// Registry resolves a model name to its Provider. Lookup is by the +// first slash-delimited prefix; bare names (no slash) fall through to +// the configured default provider — typically `ollama` so local +// model names like `qwen3.5:latest` work without a prefix. +// +// Mirrors the Rust gateway's resolve_provider() pattern from +// crates/gateway/src/v1/mod.rs. +type Registry struct { + providers map[string]Provider // name → provider + defaultName string // resolved when no prefix matches +} + +// NewRegistry builds a registry from a list of providers. The first +// "ollama" provider becomes the bare-name default; callers can +// override via SetDefault. +func NewRegistry(providers ...Provider) *Registry { + r := &Registry{providers: make(map[string]Provider, len(providers))} + for _, p := range providers { + r.providers[p.Name()] = p + if r.defaultName == "" && p.Name() == "ollama" { + r.defaultName = "ollama" + } + } + return r +} + +// Register adds or replaces a provider. Used after construction (e.g. +// for tests injecting fakes). +func (r *Registry) Register(p Provider) { + r.providers[p.Name()] = p +} + +// SetDefault sets the provider used when no prefix matches. Empty +// model names always 404 — the default only kicks in for unprefixed +// non-empty names. +func (r *Registry) SetDefault(name string) { + r.defaultName = name +} + +// Names returns the registered provider names, sorted (deterministic +// output for /v1/chat/providers listing). +func (r *Registry) Names() []string { + out := make([]string, 0, len(r.providers)) + for n := range r.providers { + out = append(out, n) + } + // Sorted manually to avoid pulling sort import for one call site; + // O(n²) is fine for n≤10. + for i := 1; i < len(out); i++ { + for j := i; j > 0 && out[j] < out[j-1]; j-- { + out[j], out[j-1] = out[j-1], out[j] + } + } + return out +} + +// Resolve returns the Provider for a model name. Resolution rules: +// +// 1. Empty model → ErrProviderNotFound +// 2. Suffix ":cloud" → ollama_cloud (e.g. "kimi-k2.6:cloud") +// 3. Prefix match (e.g. "openrouter/...") → that provider +// 4. No prefix or unknown prefix → default provider (typically ollama) +// 5. No default registered → ErrProviderNotFound +// +// The suffix rule mirrors the Rust gateway and the Ollama Cloud +// upstream's own naming convention — kimi-k2.6:cloud, qwen3-coder:480b +// (when on cloud) etc. Without it, every cloud model would need a +// "cloud/" prefix in lakehouse.toml, which clashes with the Ollama +// upstream that wants the bare suffix-named model. +func (r *Registry) Resolve(model string) (Provider, error) { + if model == "" { + return nil, fmt.Errorf("%w: empty model name", ErrProviderNotFound) + } + // Suffix detection — `:cloud` always means Ollama Cloud. + if strings.HasSuffix(model, ":cloud") { + if p, ok := r.providers["ollama_cloud"]; ok { + return p, nil + } + // :cloud suffix with no ollama_cloud provider → 404. Don't + // silently fall through to local Ollama; that would burn the + // model name on a provider that doesn't have it. + return nil, fmt.Errorf("%w: %q has :cloud suffix but ollama_cloud provider is not registered", ErrProviderNotFound, model) + } + // Prefix match: "openrouter/anthropic/claude-opus-4-7" splits on + // first "/". Multi-segment provider names not supported (none + // shipped use them). + if idx := strings.Index(model, "/"); idx > 0 { + prefix := model[:idx] + if p, ok := r.providers[prefix]; ok { + return p, nil + } + // Unknown prefix — falls through to default. Lets bare model + // names with slashes (e.g. "anthropic/claude-3.5") still hit + // ollama if that's how the operator named local models. + } + if r.defaultName == "" { + return nil, fmt.Errorf("%w: %q", ErrProviderNotFound, model) + } + p, ok := r.providers[r.defaultName] + if !ok { + return nil, fmt.Errorf("%w: default provider %q not registered", ErrProviderNotFound, r.defaultName) + } + return p, nil +} + +// Chat is the dispatcher entry point: resolve provider, dispatch, +// stamp telemetry on the response. Returns ErrProviderDisabled when +// the resolved provider isn't Available() (caller should map to 503). +func (r *Registry) Chat(ctx context.Context, req Request) (*Response, error) { + p, err := r.Resolve(req.Model) + if err != nil { + return nil, err + } + if !p.Available() { + return nil, fmt.Errorf("%w: %s", ErrProviderDisabled, p.Name()) + } + t0 := time.Now() + resp, err := p.Chat(ctx, req) + if err != nil { + return nil, err + } + resp.LatencyMs = time.Since(t0).Milliseconds() + resp.Provider = p.Name() + return resp, nil +} + +// StripPrefix removes the leading "/" from model when +// present. Helpers for upstream calls — providers that need the bare +// model name (e.g. OpenRouter sees "anthropic/claude-opus-4-7", not +// "openrouter/anthropic/claude-opus-4-7") use this. +func StripPrefix(model, prefix string) string { + want := prefix + "/" + if strings.HasPrefix(model, want) { + return model[len(want):] + } + return model +} diff --git a/internal/chat/registry_test.go b/internal/chat/registry_test.go new file mode 100644 index 0000000..bf6407e --- /dev/null +++ b/internal/chat/registry_test.go @@ -0,0 +1,168 @@ +package chat + +import ( + "context" + "errors" + "testing" + "time" +) + +// fakeProvider is a minimal Provider for registry testing — no HTTP, +// just records what model name reached it. +type fakeProvider struct { + name string + available bool + got Request // last request +} + +func (f *fakeProvider) Name() string { return f.name } +func (f *fakeProvider) Available() bool { return f.available } +func (f *fakeProvider) Chat(_ context.Context, req Request) (*Response, error) { + f.got = req + return &Response{Model: req.Model, Content: "ok from " + f.name}, nil +} + +func newFake(name string, available bool) *fakeProvider { + return &fakeProvider{name: name, available: available} +} + +func TestRegistry_ResolveByPrefix(t *testing.T) { + ollama := newFake("ollama", true) + openrouter := newFake("openrouter", true) + opencode := newFake("opencode", true) + kimi := newFake("kimi", true) + + r := NewRegistry(ollama, openrouter, opencode, kimi) + + cases := []struct { + model string + want string + }{ + {"openrouter/anthropic/claude-opus-4-7", "openrouter"}, + {"opencode/claude-opus-4-7", "opencode"}, + {"kimi/kimi-for-coding", "kimi"}, + {"ollama/qwen3.5:latest", "ollama"}, // explicit prefix + {"qwen3.5:latest", "ollama"}, // bare → default + {"unknown/foo/bar", "ollama"}, // unknown prefix → default + } + for _, c := range cases { + p, err := r.Resolve(c.model) + if err != nil { + t.Errorf("Resolve(%q): unexpected error: %v", c.model, err) + continue + } + if p.Name() != c.want { + t.Errorf("Resolve(%q) = %s, want %s", c.model, p.Name(), c.want) + } + } +} + +func TestRegistry_ResolveCloudSuffix(t *testing.T) { + ollama := newFake("ollama", true) + cloud := newFake("ollama_cloud", true) + r := NewRegistry(ollama, cloud) + + // :cloud suffix routes to ollama_cloud regardless of any prefix. + p, err := r.Resolve("kimi-k2.6:cloud") + if err != nil { + t.Fatalf("Resolve kimi-k2.6:cloud: %v", err) + } + if p.Name() != "ollama_cloud" { + t.Errorf("kimi-k2.6:cloud should route to ollama_cloud, got %s", p.Name()) + } + + // Without ollama_cloud registered, :cloud → ErrProviderNotFound + // (don't silently fall through to local). + rNoCloud := NewRegistry(ollama) + if _, err := rNoCloud.Resolve("kimi-k2.6:cloud"); !errors.Is(err, ErrProviderNotFound) { + t.Errorf("missing ollama_cloud should ErrProviderNotFound; got %v", err) + } +} + +func TestRegistry_ResolveErrors(t *testing.T) { + r := NewRegistry() + + // Empty model + if _, err := r.Resolve(""); !errors.Is(err, ErrProviderNotFound) { + t.Errorf("empty model should ErrProviderNotFound; got %v", err) + } + + // No providers registered, any model → 404 + if _, err := r.Resolve("openrouter/foo"); !errors.Is(err, ErrProviderNotFound) { + t.Errorf("unregistered openrouter should 404; got %v", err) + } + if _, err := r.Resolve("bare-model"); !errors.Is(err, ErrProviderNotFound) { + t.Errorf("bare with no default should 404; got %v", err) + } +} + +func TestRegistry_ChatStampsTelemetry(t *testing.T) { + ollama := newFake("ollama", true) + r := NewRegistry(ollama) + + resp, err := r.Chat(context.Background(), Request{Model: "qwen3.5:latest", Messages: []Message{{Role: "user", Content: "hi"}}}) + if err != nil { + t.Fatalf("Chat: %v", err) + } + if resp.Provider != "ollama" { + t.Errorf("Provider should be stamped to %q, got %q", "ollama", resp.Provider) + } + if resp.LatencyMs < 0 { + t.Errorf("LatencyMs negative: %d", resp.LatencyMs) + } +} + +func TestRegistry_ChatProviderUnavailable(t *testing.T) { + openrouter := newFake("openrouter", false) // no key + r := NewRegistry(openrouter) + + _, err := r.Chat(context.Background(), Request{Model: "openrouter/foo"}) + if !errors.Is(err, ErrProviderDisabled) { + t.Errorf("unavailable provider should ErrProviderDisabled; got %v", err) + } +} + +func TestStripPrefix(t *testing.T) { + cases := []struct { + model, prefix, want string + }{ + {"openrouter/anthropic/claude", "openrouter", "anthropic/claude"}, + {"opencode/claude-opus-4-7", "opencode", "claude-opus-4-7"}, + {"qwen3.5:latest", "ollama", "qwen3.5:latest"}, // no prefix to strip + {"ollama/qwen3.5:latest", "ollama", "qwen3.5:latest"}, // explicit ollama prefix + {"kimi-k2.6:cloud", "cloud", "kimi-k2.6:cloud"}, // suffix doesn't trigger strip + } + for _, c := range cases { + if got := StripPrefix(c.model, c.prefix); got != c.want { + t.Errorf("StripPrefix(%q, %q) = %q, want %q", c.model, c.prefix, got, c.want) + } + } +} + +func TestRegistry_Names(t *testing.T) { + r := NewRegistry( + newFake("zz", true), + newFake("aa", true), + newFake("mm", true), + ) + names := r.Names() + if len(names) != 3 || names[0] != "aa" || names[1] != "mm" || names[2] != "zz" { + t.Errorf("Names() = %v, want sorted [aa mm zz]", names) + } +} + +// Time-stamp sanity — the dispatcher should never produce LatencyMs +// in the past. +func TestRegistry_LatencyMonotonic(t *testing.T) { + ollama := newFake("ollama", true) + r := NewRegistry(ollama) + t0 := time.Now() + resp, err := r.Chat(context.Background(), Request{Model: "qwen3.5:latest"}) + if err != nil { + t.Fatalf("Chat: %v", err) + } + elapsed := time.Since(t0).Milliseconds() + if resp.LatencyMs > elapsed+1 { + t.Errorf("LatencyMs %d > elapsed %d (impossible)", resp.LatencyMs, elapsed) + } +} diff --git a/internal/chat/types.go b/internal/chat/types.go new file mode 100644 index 0000000..bbe73af --- /dev/null +++ b/internal/chat/types.go @@ -0,0 +1,104 @@ +// Package chat provides the LLM chat abstraction over multiple +// providers (local Ollama, Ollama Cloud, OpenRouter, OpenCode, Kimi). +// +// Architecture per docs/SPEC.md §1 (chatd) + the small-model pipeline +// vision (project_small_model_pipeline_vision.md): +// +// - Hot path runs on local Ollama via cheap `qwen3.5:latest` repeats. +// - Oversight tier reaches Ollama Cloud (Pro plan) for tasks needing +// more capacity (kimi-k2.6:cloud, qwen3-coder:480b, deepseek-v3.2). +// - Frontier escalation goes through OpenRouter (claude-opus-4-7, +// gpt-5, kimi-k2-0905) or OpenCode (free-tier opus-4-7) for +// blockers that need full-scope reasoning. +// +// Provider resolution is by model-name prefix (mirroring the Rust +// gateway's pattern from crates/gateway/src/v1/mod.rs): +// +// ollama/ → ollama (or bare names like "qwen3.5:latest") +// ollama_cloud/ → ollama_cloud +// openrouter// → openrouter +// opencode/ → opencode +// kimi/ → kimi +// +// The prefix is stripped before the upstream call. +package chat + +import ( + "context" + "errors" +) + +// Sentinel errors for the upper layers (gateway response codes, +// observability, retry decisions). +var ( + // ErrProviderNotFound — model name didn't resolve to any registered + // provider. Maps to 404 at the HTTP layer. + ErrProviderNotFound = errors.New("provider not found for model") + + // ErrProviderDisabled — the provider was registered but its key + // resolved empty. Maps to 503 at the HTTP layer (operator can fix + // by setting the env var; not a client bug). + ErrProviderDisabled = errors.New("provider disabled (no auth key)") + + // ErrUpstream — the provider returned a non-2xx. Body included in + // the wrapped error message. Maps to 502. + ErrUpstream = errors.New("provider upstream error") + + // ErrTimeout — provider call exceeded the configured timeout. + // Maps to 504. + ErrTimeout = errors.New("provider timeout") +) + +// Message is one entry in a Chat request's conversation. Role is +// "system", "user", or "assistant" — matches OpenAI/Anthropic shapes. +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +// Request is the provider-neutral chat request. Providers translate +// to their wire format (OpenAI Chat Completions, Ollama /api/chat, +// Ollama Cloud /api/generate, etc.). Format="json" asks the provider +// to return JSON-only output (constrained decoding when supported). +type Request struct { + Model string `json:"model"` + Messages []Message `json:"messages"` + Temperature float64 `json:"temperature"` + MaxTokens int `json:"max_tokens,omitempty"` + Format string `json:"format,omitempty"` // "" or "json" + Stream bool `json:"stream,omitempty"` // ignored in G0 — chatd is synchronous +} + +// Response is the provider-neutral chat response. LatencyMs is filled +// by the dispatcher (Provider implementations don't track it). +// Provider names the resolved provider for telemetry / debug. +type Response struct { + Model string `json:"model"` + Content string `json:"content"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + FinishReason string `json:"finish_reason,omitempty"` + Provider string `json:"provider"` + LatencyMs int64 `json:"latency_ms"` +} + +// Provider is the contract every backend must implement. Implementations +// must be safe for concurrent calls — the dispatcher shares one +// instance across all incoming requests. +// +// Chat receives the full Request including the prefixed model name +// (e.g. "openrouter/anthropic/claude-opus-4-7"). Implementations +// strip their prefix before the upstream call. +type Provider interface { + // Name returns the provider's short identifier (matches the prefix + // in model names). Used for telemetry and Response.Provider. + Name() string + + // Chat performs one round-trip. Should respect ctx cancellation. + Chat(ctx context.Context, req Request) (*Response, error) + + // Available reports whether the provider is ready to serve. False + // means missing auth key or unreachable upstream — dispatcher + // returns ErrProviderDisabled for callers using this provider. + Available() bool +} diff --git a/internal/shared/config.go b/internal/shared/config.go index e0d2664..f92a2b6 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -29,6 +29,7 @@ type Config struct { Pathwayd PathwaydConfig `toml:"pathwayd"` Matrixd MatrixdConfig `toml:"matrixd"` Observerd ObserverdConfig `toml:"observerd"` + Chatd ChatdConfig `toml:"chatd"` S3 S3Config `toml:"s3"` Models ModelsConfig `toml:"models"` Log LogConfig `toml:"log"` @@ -68,6 +69,7 @@ type GatewayConfig struct { PathwaydURL string `toml:"pathwayd_url"` MatrixdURL string `toml:"matrixd_url"` ObserverdURL string `toml:"observerd_url"` + ChatdURL string `toml:"chatd_url"` } // EmbeddConfig drives the embed service. ProviderURL points at the @@ -111,6 +113,36 @@ type MatrixdConfig struct { VectordURL string `toml:"vectord_url"` } +// ChatdConfig drives the chat dispatcher service (cmd/chatd) — Phase 4. +// Routes /v1/chat to the right provider based on model-name prefix +// or :cloud suffix. Per-provider API keys come from env vars (or +// /etc/lakehouse/.env files); empty keys leave the provider +// unregistered so requests for that provider 404 cleanly. +// +// OllamaURL is the local Ollama upstream (no auth). Empty disables +// the local Ollama provider — useful in deployments that don't run +// Ollama on the box (cloud-only operation). +type ChatdConfig struct { + Bind string `toml:"bind"` + OllamaURL string `toml:"ollama_url"` + // API-key env var names. Default to the conventional names. + // Operators can rename these for environments using different env + // var conventions, but the defaults match /etc/lakehouse/*.env + // files that systemd already loads. + OllamaCloudKeyEnv string `toml:"ollama_cloud_key_env"` + OpenRouterKeyEnv string `toml:"openrouter_key_env"` + OpenCodeKeyEnv string `toml:"opencode_key_env"` + KimiKeyEnv string `toml:"kimi_key_env"` + // Optional .env file paths — a fallback when the env var isn't set + // in the process environment. Same keys, "KEY=value" lines. + OllamaCloudKeyFile string `toml:"ollama_cloud_key_file"` + OpenRouterKeyFile string `toml:"openrouter_key_file"` + OpenCodeKeyFile string `toml:"opencode_key_file"` + KimiKeyFile string `toml:"kimi_key_file"` + // Per-call timeout in seconds. 0 = 180s default. + TimeoutSecs int `toml:"timeout_secs"` +} + // ObserverdConfig drives the observer service (cmd/observerd). // PersistPath: file path to the JSONL ops log; empty = in-memory // only (test/dev). Production sets a stable path under @@ -284,6 +316,7 @@ func DefaultConfig() Config { PathwaydURL: "http://127.0.0.1:3217", MatrixdURL: "http://127.0.0.1:3218", ObserverdURL: "http://127.0.0.1:3219", + ChatdURL: "http://127.0.0.1:3220", }, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, @@ -317,6 +350,19 @@ func DefaultConfig() Config { Bind: "127.0.0.1:3219", // PersistPath empty by default = in-memory only. }, + Chatd: ChatdConfig{ + Bind: "127.0.0.1:3220", + OllamaURL: "http://localhost:11434", + OllamaCloudKeyEnv: "OLLAMA_CLOUD_KEY", + OpenRouterKeyEnv: "OPENROUTER_API_KEY", + OpenCodeKeyEnv: "OPENCODE_API_KEY", + KimiKeyEnv: "KIMI_API_KEY", + OllamaCloudKeyFile: "/etc/lakehouse/ollama_cloud.env", + OpenRouterKeyFile: "/etc/lakehouse/openrouter.env", + OpenCodeKeyFile: "/etc/lakehouse/opencode.env", + KimiKeyFile: "/etc/lakehouse/kimi.env", + TimeoutSecs: 180, + }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", CatalogdURL: "http://127.0.0.1:3212", diff --git a/lakehouse.toml b/lakehouse.toml index 3ea71fc..99e1a87 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -15,6 +15,7 @@ embedd_url = "http://127.0.0.1:3216" pathwayd_url = "http://127.0.0.1:3217" matrixd_url = "http://127.0.0.1:3218" observerd_url = "http://127.0.0.1:3219" +chatd_url = "http://127.0.0.1:3220" [storaged] bind = "127.0.0.1:3211" @@ -70,6 +71,36 @@ bind = "127.0.0.1:3219" # /var/lib/lakehouse/observer/ops.jsonl so ops survive restart. persist_path = "" +[chatd] +# LLM chat dispatcher (Phase 4). Routes /v1/chat to the right provider +# based on model name prefix or :cloud suffix: +# ollama/ → local Ollama (no auth) +# ollama_cloud/ → Ollama Cloud +# :cloud → Ollama Cloud (suffix variant) +# openrouter// → OpenRouter +# opencode/ → OpenCode (Zen+Go unified) +# kimi/ → Kimi For Coding +# bare names (e.g. qwen3.5:latest) → local Ollama (default) +bind = "127.0.0.1:3220" +ollama_url = "http://localhost:11434" + +# Per-provider key resolution: env var first, then .env file fallback. +# Empty file path skips the file lookup. systemd EnvironmentFile= +# loads these natively, so service runtime sees the env vars. +ollama_cloud_key_env = "OLLAMA_CLOUD_KEY" +openrouter_key_env = "OPENROUTER_API_KEY" +opencode_key_env = "OPENCODE_API_KEY" +kimi_key_env = "KIMI_API_KEY" + +ollama_cloud_key_file = "/etc/lakehouse/ollama_cloud.env" +openrouter_key_file = "/etc/lakehouse/openrouter.env" +opencode_key_file = "/etc/lakehouse/opencode.env" +kimi_key_file = "/etc/lakehouse/kimi.env" + +# Per-call timeout (seconds). Cloud reasoning models can take >60s +# for long prompts, so 180 is the default. +timeout_secs = 180 + [s3] endpoint = "http://localhost:9000" region = "us-east-1" diff --git a/scripts/chatd_smoke.sh b/scripts/chatd_smoke.sh new file mode 100755 index 0000000..6b0293a --- /dev/null +++ b/scripts/chatd_smoke.sh @@ -0,0 +1,175 @@ +#!/usr/bin/env bash +# chatd smoke — Phase 4 acceptance gate. +# +# Validates: +# - chatd boots and reports providers via GET /v1/chat/providers +# - bare model name routes to local Ollama (default provider) +# - explicit "ollama/" prefix also routes to local Ollama +# - :cloud suffix without ollama_cloud key registered → 404 +# - unknown/foo prefix falls through to Ollama default +# - POST /v1/chat returns provider-stamped response with token counts +# +# Requires: Ollama on :11434 with qwen3.5:latest (or any chat-capable +# model — override via SMOKE_MODEL env). Skips (exit 0) if Ollama is +# absent so this can run on CI boxes without local Ollama. +# +# Usage: ./scripts/chatd_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +SMOKE_MODEL="${SMOKE_MODEL:-qwen3.5:latest}" + +if ! curl -sS --max-time 3 http://localhost:11434/api/tags >/dev/null 2>&1; then + echo "[chatd-smoke] Ollama not reachable on :11434 — skipping" + exit 0 +fi + +if ! curl -sS http://localhost:11434/api/tags | jq -e --arg m "$SMOKE_MODEL" \ + '.models[] | select(.name == $m)' >/dev/null 2>&1; then + echo "[chatd-smoke] $SMOKE_MODEL not loaded in Ollama — skipping" + exit 0 +fi + +echo "[chatd-smoke] building chatd + gateway..." +go build -o bin/ ./cmd/chatd ./cmd/gateway + +pkill -f "bin/(chatd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/chatd.toml" + +cleanup() { + echo "[chatd-smoke] cleanup" + for p in "${PIDS[@]:-}"; do [ -n "${p:-}" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Test config: chatd talks only to local Ollama (no cloud keys). +# This proves the bare-name + prefix-routing + :cloud-without-cloud +# behaviors without needing API keys in CI. +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[chatd-smoke] launching chatd → gateway..." +./bin/chatd -config "$CFG" > /tmp/chatd.log 2>&1 & PIDS+=($!) +poll_health 3220 || { echo "chatd failed"; tail /tmp/chatd.log; exit 1; } +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } + +# 1. providers listing — only ollama registered when other keys absent +echo "[chatd-smoke] /v1/chat/providers — only ollama registered:" +RESP="$(curl -sS http://127.0.0.1:3110/v1/chat/providers)" +PROVIDERS_COUNT="$(echo "$RESP" | jq -r '.providers | length')" +OLLAMA_AVAIL="$(echo "$RESP" | jq -r '.providers.ollama')" +if [ "$PROVIDERS_COUNT" != "1" ] || [ "$OLLAMA_AVAIL" != "true" ]; then + echo " ✗ wanted only ollama=true; got $RESP" + exit 1 +fi +echo " ✓ exactly 1 provider (ollama, available=true)" + +# 2. bare model name → ollama default +echo "[chatd-smoke] POST /v1/chat with bare model name:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/chat \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"$SMOKE_MODEL\",\"messages\":[{\"role\":\"user\",\"content\":\"reply with the word ok and nothing else\"}],\"max_tokens\":10}")" +PROV="$(echo "$RESP" | jq -r '.provider')" +CONTENT="$(echo "$RESP" | jq -r '.content')" +LATENCY="$(echo "$RESP" | jq -r '.latency_ms')" +if [ "$PROV" != "ollama" ] || [ -z "$CONTENT" ] || [ "$LATENCY" -lt 0 ] 2>/dev/null; then + echo " ✗ expected provider=ollama, non-empty content, positive latency; got $RESP" + exit 1 +fi +echo " ✓ provider=ollama, latency=${LATENCY}ms, content=$(echo "$CONTENT" | head -c 60 | tr -d '\n')…" + +# 3. explicit ollama/ prefix (prefix stripped before upstream call) +echo "[chatd-smoke] POST /v1/chat with explicit ollama/ prefix:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/chat \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"ollama/$SMOKE_MODEL\",\"messages\":[{\"role\":\"user\",\"content\":\"reply ok\"}],\"max_tokens\":5}")" +PROV="$(echo "$RESP" | jq -r '.provider')" +MODEL="$(echo "$RESP" | jq -r '.model')" +if [ "$PROV" != "ollama" ] || [ "$MODEL" != "$SMOKE_MODEL" ]; then + echo " ✗ expected provider=ollama, model=$SMOKE_MODEL (prefix stripped); got prov=$PROV model=$MODEL" + exit 1 +fi +echo " ✓ ollama/$SMOKE_MODEL → provider=ollama, model=$SMOKE_MODEL (prefix stripped)" + +# 4. :cloud suffix without ollama_cloud registered → 404 +echo "[chatd-smoke] POST /v1/chat with :cloud suffix (no cloud provider):" +STATUS="$(curl -sS -o /tmp/cloud404.json -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/chat \ + -H 'Content-Type: application/json' \ + -d '{"model":"kimi-k2.6:cloud","messages":[{"role":"user","content":"hi"}]}')" +if [ "$STATUS" != "404" ]; then + echo " ✗ expected 404; got $STATUS body=$(cat /tmp/cloud404.json)" + exit 1 +fi +echo " ✓ kimi-k2.6:cloud → 404 (ollama_cloud not registered, no silent fall-through to local)" + +# 5. unknown prefix falls through to ollama default; upstream 502s +# because the ollama provider doesn't strip unknown prefixes +# (it would silently rewrite operator-supplied names). Expected +# behavior: route to default provider, let upstream reject the +# literal model name. +echo "[chatd-smoke] POST /v1/chat with unknown/ prefix (falls through, upstream 502s):" +STATUS="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/chat \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"unknown/$SMOKE_MODEL\",\"messages\":[{\"role\":\"user\",\"content\":\"hi\"}],\"max_tokens\":5}")" +if [ "$STATUS" != "502" ]; then + echo " ✗ expected 502 (upstream rejects literal 'unknown/...'); got $STATUS" + exit 1 +fi +echo " ✓ unknown/ → ollama default → upstream 502 (no silent prefix-strip)" + +# 6. missing model field → 400 +echo "[chatd-smoke] POST /v1/chat with missing model field:" +STATUS="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/chat \ + -H 'Content-Type: application/json' \ + -d '{"messages":[{"role":"user","content":"hi"}]}')" +if [ "$STATUS" != "400" ]; then + echo " ✗ expected 400 for missing model; got $STATUS" + exit 1 +fi +echo " ✓ missing model → 400" + +echo "[chatd-smoke] chatd acceptance gate: PASSED (6/6)"