Last day of Phase G0. Gateway promotes the D1 stub endpoints into
real reverse-proxies on :3110 fronting storaged + catalogd + ingestd
+ queryd. /v1 prefix lives at the edge — internal services route on
/storage, /catalog, /ingest, /sql, with the prefix stripped by a
custom Director per Kimi K2's D1-plan finding.
Routes:
/v1/storage/* → storaged
/v1/catalog/* → catalogd
/v1/ingest → ingestd
/v1/sql → queryd
Acceptance smoke 6/6 PASS — every assertion goes through :3110, none
direct to backing services. Full ingest → storage → catalog → query
round-trip verified end-to-end. The smoke's "rows[0].name=Alice"
assertion is the architectural payoff: five binaries, six HTTP
routes, one round-trip through one edge.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 2 WARN + 2 INFO
- Kimi K2-0905 (openrouter): 1 BLOCK + 3 WARN + 1 INFO (3 false positives, all from one wrong TrimPrefix theory)
- Qwen3-coder (openrouter): 5 completion tokens — "No BLOCKs."
Fixed (2, both Opus single-reviewer):
O-BLOCK: Director path stripping fails if upstream URL has a
non-empty path. The default Director's singleJoiningSlash runs
BEFORE the custom code, so an upstream like http://host/api
produces /api/v1/storage/... after the join — then TrimPrefix("/v1")
is a no-op because the string starts with /api. Fix: strip /v1
BEFORE calling origDirector. New TestProxy_SubPathUpstream regression
locks this in. Today: bare-host URLs only, dormant — but moving
gateway behind a sub-path in prod would have silently 404'd.
O-WARN2: url.Parse is permissive — typo "127.0.0.1:3211" (no scheme)
parses fine, produces empty Host, every request 502s. mustParseUpstream
fail-fast at startup with a clear message naming the offending
config field.
Dismissed (3, all Kimi, same false TrimPrefix theory):
K-BLOCK "TrimPrefix loops forever on //v1storage" — false, single
check-and-trim, no loop
K-WARN "no upper bound on repeated // removal" — same false theory
K-WARN "goroutines leak if upstream parse fails while binaries
running" — confused scope; binaries are separate OS processes
launched by the smoke script
D1 smoke updated (post-D6): the 501 stub probes are gone (gateway no
longer stubs /v1/ingest and /v1/sql). Replaced with proxy probes that
verify gateway forwards malformed requests to ingestd and queryd. Launch
order changed from parallel to dep-ordered (storaged → catalogd →
ingestd → queryd → gateway) since catalogd's rehydrate now needs
storaged, queryd's initial Refresh needs catalogd.
All six G0 smokes (D1 through D6) PASS end-to-end after every fix
round. Phase G0 substrate is complete: 5 binaries, 6 routes, 25 fixes
applied across 6 days from cross-lineage review.
G1+ next: gRPC adapters, Lance/HNSW vector indices, Go MCP SDK port,
distillation rebuild, observer + Langfuse integration.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
170 lines
5.3 KiB
Go
170 lines
5.3 KiB
Go
package gateway
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
// TestProxy_StripsV1Prefix is the load-bearing assertion: the path
|
|
// the upstream sees is the inbound path minus "/v1".
|
|
func TestProxy_StripsV1Prefix(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
inbound string
|
|
upstream string
|
|
}{
|
|
{"storage get", "/v1/storage/get/datasets/x.parquet", "/storage/get/datasets/x.parquet"},
|
|
{"catalog list", "/v1/catalog/list", "/catalog/list"},
|
|
{"ingest", "/v1/ingest", "/ingest"},
|
|
{"sql", "/v1/sql", "/sql"},
|
|
{"deeply nested key", "/v1/storage/get/a/b/c/d.parquet", "/storage/get/a/b/c/d.parquet"},
|
|
}
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
var got string
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
got = r.URL.Path
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
}))
|
|
defer upstream.Close()
|
|
|
|
u, _ := url.Parse(upstream.URL)
|
|
h := NewProxyHandler(u)
|
|
|
|
req := httptest.NewRequest(http.MethodGet, tc.inbound, nil)
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
if rr.Code != http.StatusOK {
|
|
t.Errorf("status: got %d, want 200", rr.Code)
|
|
}
|
|
if got != tc.upstream {
|
|
t.Errorf("upstream path: got %q, want %q", got, tc.upstream)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestProxy_PreservesQueryString verifies ingestd's `?name=X` flows
|
|
// through unchanged. The proxy's Director only touches Path; RawQuery
|
|
// is set by httputil.NewSingleHostReverseProxy's standard director.
|
|
func TestProxy_PreservesQueryString(t *testing.T) {
|
|
var gotQuery string
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
gotQuery = r.URL.RawQuery
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer upstream.Close()
|
|
u, _ := url.Parse(upstream.URL)
|
|
h := NewProxyHandler(u)
|
|
|
|
req := httptest.NewRequest(http.MethodPost, "/v1/ingest?name=workers&debug=1", nil)
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
if gotQuery != "name=workers&debug=1" {
|
|
t.Errorf("query: got %q, want %q", gotQuery, "name=workers&debug=1")
|
|
}
|
|
}
|
|
|
|
// TestProxy_PreservesBody verifies POST bodies flow through.
|
|
func TestProxy_PreservesBody(t *testing.T) {
|
|
var gotBody string
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
b, _ := io.ReadAll(r.Body)
|
|
gotBody = string(b)
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer upstream.Close()
|
|
u, _ := url.Parse(upstream.URL)
|
|
h := NewProxyHandler(u)
|
|
|
|
req := httptest.NewRequest(http.MethodPost, "/v1/sql", strings.NewReader(`{"sql":"SELECT 1"}`))
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
if gotBody != `{"sql":"SELECT 1"}` {
|
|
t.Errorf("body: got %q, want %q", gotBody, `{"sql":"SELECT 1"}`)
|
|
}
|
|
}
|
|
|
|
// TestProxy_UpstreamDown_502 verifies the default ErrorHandler maps
|
|
// connection failures to 502 Bad Gateway. We point at a freshly
|
|
// closed listener address to provoke a connect failure.
|
|
func TestProxy_UpstreamDown_502(t *testing.T) {
|
|
// Bind + close to grab an unused port.
|
|
srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}))
|
|
addr := srv.URL
|
|
srv.Close() // now nothing listens on this port
|
|
|
|
u, _ := url.Parse(addr)
|
|
h := NewProxyHandler(u)
|
|
req := httptest.NewRequest(http.MethodGet, "/v1/health", nil)
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
if rr.Code != http.StatusBadGateway {
|
|
t.Errorf("status: got %d, want 502", rr.Code)
|
|
}
|
|
}
|
|
|
|
// TestProxy_SubPathUpstream is the regression for the D6 scrum
|
|
// O-BLOCK finding: when the upstream URL has a non-empty path
|
|
// (e.g. running gateway behind a sub-path on a shared host),
|
|
// the prior order-of-operations let the default Director's
|
|
// singleJoiningSlash run BEFORE the /v1 strip, producing
|
|
// "/api/v1/storage/..." which then failed TrimPrefix("/v1") and
|
|
// forwarded the un-stripped path. Stripping FIRST means the join
|
|
// sees the clean path.
|
|
func TestProxy_SubPathUpstream(t *testing.T) {
|
|
var got string
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
got = r.URL.Path
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer upstream.Close()
|
|
|
|
// Append a sub-path to the upstream URL. The proxy MUST forward
|
|
// to /api/storage/get/x.parquet, NOT /api/v1/storage/get/x.parquet.
|
|
u, _ := url.Parse(upstream.URL + "/api")
|
|
h := NewProxyHandler(u)
|
|
|
|
req := httptest.NewRequest(http.MethodGet, "/v1/storage/get/x.parquet", nil)
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
want := "/api/storage/get/x.parquet"
|
|
if got != want {
|
|
t.Errorf("upstream path under sub-path: got %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
// TestProxy_RewritesHost ensures the upstream Host header is the
|
|
// upstream's, not the inbound. Backing services route on chi which
|
|
// is host-agnostic, but if any future middleware checks Host (e.g.
|
|
// CORS allow-list) it should see the right value.
|
|
func TestProxy_RewritesHost(t *testing.T) {
|
|
var gotHost string
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
gotHost = r.Host
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer upstream.Close()
|
|
u, _ := url.Parse(upstream.URL)
|
|
h := NewProxyHandler(u)
|
|
|
|
req := httptest.NewRequest(http.MethodGet, "/v1/health", nil)
|
|
req.Host = "gateway.example.com"
|
|
rr := httptest.NewRecorder()
|
|
h.ServeHTTP(rr, req)
|
|
|
|
if gotHost != u.Host {
|
|
t.Errorf("Host: got %q, want %q", gotHost, u.Host)
|
|
}
|
|
}
|