diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index f2457bb..7e41318 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -44,6 +44,7 @@ func main() { "queryd_url": cfg.Gateway.QuerydURL, "vectord_url": cfg.Gateway.VectordURL, "embedd_url": cfg.Gateway.EmbeddURL, + "pathwayd_url": cfg.Gateway.PathwaydURL, } for k, v := range upstreams { if v == "" { @@ -63,6 +64,7 @@ func main() { querydURL := mustParseUpstream("queryd_url", cfg.Gateway.QuerydURL) vectordURL := mustParseUpstream("vectord_url", cfg.Gateway.VectordURL) embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL) + pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL) storagedProxy := gateway.NewProxyHandler(storagedURL) catalogdProxy := gateway.NewProxyHandler(catalogdURL) @@ -70,6 +72,7 @@ func main() { querydProxy := gateway.NewProxyHandler(querydURL) vectordProxy := gateway.NewProxyHandler(vectordURL) embeddProxy := gateway.NewProxyHandler(embeddURL) + pathwaydProxy := gateway.NewProxyHandler(pathwaydURL) if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) { @@ -88,6 +91,8 @@ func main() { r.Handle("/v1/vectors/*", vectordProxy) // Embedding service — /v1/embed r.Handle("/v1/embed", embeddProxy) + // Pathway memory — /v1/pathway/* + r.Handle("/v1/pathway/*", pathwaydProxy) }, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) diff --git a/cmd/pathwayd/main.go b/cmd/pathwayd/main.go new file mode 100644 index 0000000..f03c194 --- /dev/null +++ b/cmd/pathwayd/main.go @@ -0,0 +1,278 @@ +// pathwayd is the pathway memory service. Wraps internal/pathway's +// Store with HTTP routes for the Mem0-style operations defined in +// ADR-004. +// +// Routes (all under /pathway): +// POST /pathway/add — new trace with fresh UID +// POST /pathway/add_idempotent — UID-keyed add or replay-bump +// POST /pathway/update — replace content for an existing UID +// POST /pathway/revise — new revision linked to predecessor +// POST /pathway/retire — mark trace retired (excluded from search) +// GET /pathway/get/{uid} — fetch one trace (incl. retired) +// GET /pathway/history/{uid} — backward chain via predecessor links +// POST /pathway/search — filter-based listing +// GET /pathway/stats — total/active/retired counters +// +// Persistence: optional. Empty [pathwayd].persist_path = in-memory +// only (matches vectord G1's pattern). Set a path for durable +// per-trace JSONL append. +package main + +import ( + "encoding/json" + "errors" + "flag" + "log/slog" + "net/http" + "os" + "strings" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/pathway" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies + +func main() { + configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + flag.Parse() + + cfg, err := shared.LoadConfig(*configPath) + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + + // Persistence is optional — empty path = in-memory ephemeral. + var persistor *pathway.Persistor + if cfg.Pathwayd.PersistPath != "" { + persistor, err = pathway.NewPersistor(cfg.Pathwayd.PersistPath) + if err != nil { + slog.Error("pathway persistor", "err", err) + os.Exit(1) + } + } + + store := pathway.NewStore(persistor) + if persistor != nil { + n, err := store.Load() + if err != nil { + slog.Warn("pathway load", "err", err, "loaded", n) + } else { + slog.Info("pathway loaded", "events", n, "path", cfg.Pathwayd.PersistPath) + } + } + + h := &handlers{store: store} + + if err := shared.Run("pathwayd", cfg.Pathwayd.Bind, h.register, cfg.Auth); err != nil { + slog.Error("server", "err", err) + os.Exit(1) + } +} + +type handlers struct { + store *pathway.Store +} + +func (h *handlers) register(r chi.Router) { + r.Post("/pathway/add", h.handleAdd) + r.Post("/pathway/add_idempotent", h.handleAddIdempotent) + r.Post("/pathway/update", h.handleUpdate) + r.Post("/pathway/revise", h.handleRevise) + r.Post("/pathway/retire", h.handleRetire) + r.Get("/pathway/get/{uid}", h.handleGet) + r.Get("/pathway/history/{uid}", h.handleHistory) + r.Post("/pathway/search", h.handleSearch) + r.Get("/pathway/stats", h.handleStats) +} + +// ── request shapes ─────────────────────────────────────────────── + +type addRequest struct { + Content json.RawMessage `json:"content"` + Tags []string `json:"tags,omitempty"` +} + +type addIdempotentRequest struct { + UID string `json:"uid"` + Content json.RawMessage `json:"content"` + Tags []string `json:"tags,omitempty"` +} + +type updateRequest struct { + UID string `json:"uid"` + Content json.RawMessage `json:"content"` +} + +type reviseRequest struct { + PredecessorUID string `json:"predecessor_uid"` + Content json.RawMessage `json:"content"` + Tags []string `json:"tags,omitempty"` +} + +type retireRequest struct { + UID string `json:"uid"` +} + +type searchRequest struct { + Tag string `json:"tag,omitempty"` + ContentContains string `json:"content_contains,omitempty"` + CreatedAfterNs int64 `json:"created_after_ns,omitempty"` + CreatedBeforeNs int64 `json:"created_before_ns,omitempty"` + IncludeRetired bool `json:"include_retired,omitempty"` +} + +// ── handlers ──────────────────────────────────────────────────── + +func (h *handlers) handleAdd(w http.ResponseWriter, r *http.Request) { + var req addRequest + if !decodeJSON(w, r, &req) { + return + } + tr, err := h.store.Add(req.Content, req.Tags...) + if writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusCreated, tr) +} + +func (h *handlers) handleAddIdempotent(w http.ResponseWriter, r *http.Request) { + var req addIdempotentRequest + if !decodeJSON(w, r, &req) { + return + } + tr, err := h.store.AddIdempotent(req.UID, req.Content, req.Tags...) + if writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusOK, tr) +} + +func (h *handlers) handleUpdate(w http.ResponseWriter, r *http.Request) { + var req updateRequest + if !decodeJSON(w, r, &req) { + return + } + if err := h.store.Update(req.UID, req.Content); writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "updated"}) +} + +func (h *handlers) handleRevise(w http.ResponseWriter, r *http.Request) { + var req reviseRequest + if !decodeJSON(w, r, &req) { + return + } + tr, err := h.store.Revise(req.PredecessorUID, req.Content, req.Tags...) + if writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusCreated, tr) +} + +func (h *handlers) handleRetire(w http.ResponseWriter, r *http.Request) { + var req retireRequest + if !decodeJSON(w, r, &req) { + return + } + if err := h.store.Retire(req.UID); writeStoreError(w, err) { + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (h *handlers) handleGet(w http.ResponseWriter, r *http.Request) { + uid := chi.URLParam(r, "uid") + tr, err := h.store.Get(uid) + if writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusOK, tr) +} + +func (h *handlers) handleHistory(w http.ResponseWriter, r *http.Request) { + uid := chi.URLParam(r, "uid") + chain, err := h.store.History(uid) + if writeStoreError(w, err) { + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "chain": chain, + "length": len(chain), + }) +} + +func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) { + var req searchRequest + if !decodeJSON(w, r, &req) { + return + } + results := h.store.Search(pathway.SearchFilter{ + Tag: req.Tag, + ContentContains: req.ContentContains, + CreatedAfterNs: req.CreatedAfterNs, + CreatedBeforeNs: req.CreatedBeforeNs, + IncludeRetired: req.IncludeRetired, + }) + writeJSON(w, http.StatusOK, map[string]any{ + "results": results, + "count": len(results), + }) +} + +func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, h.store.Stats()) +} + +// ── helpers ──────────────────────────────────────────────────── + +func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool { + defer r.Body.Close() + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + if err := json.NewDecoder(r.Body).Decode(v); err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { + http.Error(w, "body too large", http.StatusRequestEntityTooLarge) + return false + } + http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) + return false + } + return true +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + if err := json.NewEncoder(w).Encode(v); err != nil { + slog.Warn("pathway write json", "err", err) + } +} + +// writeStoreError maps internal/pathway sentinel errors to HTTP +// status codes. Returns true if a response was written (caller +// should return). Returns false on success (caller continues). +func writeStoreError(w http.ResponseWriter, err error) bool { + if err == nil { + return false + } + switch { + case errors.Is(err, pathway.ErrNotFound): + http.Error(w, err.Error(), http.StatusNotFound) + case errors.Is(err, pathway.ErrPredecessorMissing): + http.Error(w, err.Error(), http.StatusNotFound) + case errors.Is(err, pathway.ErrEmptyUID), + errors.Is(err, pathway.ErrInvalidContent): + http.Error(w, err.Error(), http.StatusBadRequest) + case errors.Is(err, pathway.ErrCycle): + http.Error(w, err.Error(), http.StatusConflict) + default: + slog.Error("pathway store", "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + } + return true +} diff --git a/internal/shared/config.go b/internal/shared/config.go index a67b2dc..1f1d2f6 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -26,6 +26,7 @@ type Config struct { Queryd QuerydConfig `toml:"queryd"` Vectord VectordConfig `toml:"vectord"` Embedd EmbeddConfig `toml:"embedd"` + Pathwayd PathwaydConfig `toml:"pathwayd"` S3 S3Config `toml:"s3"` Log LogConfig `toml:"log"` Auth AuthConfig `toml:"auth"` @@ -50,8 +51,8 @@ type IngestConfig struct { // GatewayConfig adds the upstream URLs the reverse proxy fronts. // Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql, -// /v1/vectors, /v1/embed) has its own upstream so we can scale -// services independently or move them to different boxes without +// /v1/vectors, /v1/embed, /v1/pathway) has its own upstream so we can +// scale services independently or move them to different boxes without // touching gateway code. type GatewayConfig struct { Bind string `toml:"bind"` @@ -61,6 +62,7 @@ type GatewayConfig struct { QuerydURL string `toml:"queryd_url"` VectordURL string `toml:"vectord_url"` EmbeddURL string `toml:"embedd_url"` + PathwaydURL string `toml:"pathwayd_url"` } // EmbeddConfig drives the embed service. ProviderURL points at the @@ -85,6 +87,15 @@ type VectordConfig struct { StoragedURL string `toml:"storaged_url"` } +// PathwaydConfig drives the pathway-memory service (cmd/pathwayd). +// PersistPath: file path to the JSONL log; empty = in-memory only +// (test/dev). Production sets a stable path under /var/lib/lakehouse +// or similar so traces survive restart. +type PathwaydConfig struct { + Bind string `toml:"bind"` + PersistPath string `toml:"persist_path"` +} + // QuerydConfig adds queryd-specific knobs. queryd talks DuckDB // directly to MinIO via DuckDB's httpfs extension (so no storaged // URL needed), and reads the catalog over HTTP for view registration. @@ -161,6 +172,7 @@ func DefaultConfig() Config { QuerydURL: "http://127.0.0.1:3214", VectordURL: "http://127.0.0.1:3215", EmbeddURL: "http://127.0.0.1:3216", + PathwaydURL: "http://127.0.0.1:3217", }, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, @@ -180,6 +192,11 @@ func DefaultConfig() Config { DefaultModel: "nomic-embed-text", CacheSize: 10_000, // ~30 MiB at d=768; set to 0 to disable }, + Pathwayd: PathwaydConfig{ + Bind: "127.0.0.1:3217", + // PersistPath empty by default = in-memory only. Production + // sets to e.g. /var/lib/lakehouse/pathway/state.jsonl. + }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", CatalogdURL: "http://127.0.0.1:3212", diff --git a/lakehouse.toml b/lakehouse.toml index 685cfa0..ab17993 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -12,6 +12,7 @@ ingestd_url = "http://127.0.0.1:3213" queryd_url = "http://127.0.0.1:3214" vectord_url = "http://127.0.0.1:3215" embedd_url = "http://127.0.0.1:3216" +pathwayd_url = "http://127.0.0.1:3217" [storaged] bind = "127.0.0.1:3211" @@ -47,6 +48,12 @@ catalogd_url = "http://127.0.0.1:3212" secrets_path = "/etc/lakehouse/secrets-go.toml" refresh_every = "30s" +[pathwayd] +bind = "127.0.0.1:3217" +# Empty = in-memory only (dev/test). Production sets a path under +# /var/lib/lakehouse/pathway/state.jsonl so traces survive restart. +persist_path = "" + [s3] endpoint = "http://localhost:9000" region = "us-east-1" diff --git a/scripts/pathway_smoke.sh b/scripts/pathway_smoke.sh new file mode 100755 index 0000000..8f30f21 --- /dev/null +++ b/scripts/pathway_smoke.sh @@ -0,0 +1,248 @@ +#!/usr/bin/env bash +# Pathway smoke — pathwayd Mem0-style versioned trace memory (ADR-004). +# All assertions go through gateway :3110. +# +# Validates: +# - All 9 HTTP routes (add, add_idempotent, update, revise, retire, +# get, history, search, stats) +# - Revise creates a predecessor link; History walks the chain +# backward (the audit-trail property pathway memory exists for) +# - Retire excludes from Search default; still accessible via Get +# - AddIdempotent on existing UID bumps replay_count, doesn't replace +# - Negative paths: 404 on unknown UIDs, 404 on missing predecessor, +# 400 on invalid content +# - Persistence: kill + restart pathwayd → all traces survive +# +# Usage: ./scripts/pathway_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[pathway-smoke] building pathwayd + gateway..." +go build -o bin/ ./cmd/pathwayd ./cmd/gateway + +pkill -f "bin/(pathwayd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +PERSIST="$TMP/pathway.jsonl" +CFG="$TMP/pathwayd.toml" + +cleanup() { + echo "[pathway-smoke] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Custom toml — same defaults as lakehouse.toml but with persist_path +# pointing at the temp file so kill+restart actually rehydrates. +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +launch_pathwayd() { + ./bin/pathwayd -config "$CFG" > /tmp/pathwayd.log 2>&1 & + PATHWAYD_PID=$! + PIDS+=($PATHWAYD_PID) + poll_health 3217 || { echo "pathwayd failed"; tail /tmp/pathwayd.log; return 1; } +} + +launch_gateway() { + ./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & + PIDS+=($!) + poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; return 1; } +} + +echo "[pathway-smoke] launching pathwayd → gateway..." +launch_pathwayd +launch_gateway + +FAILED=0 + +# ── 1. Add ──────────────────────────────────────────────────────── +echo "[pathway-smoke] Add → fresh UID + replay_count=1:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/add \ + -H 'Content-Type: application/json' \ + -d '{"content":{"approach":"forklift-OSHA-30","outcome":"hired"},"tags":["staffing","fill"]}')" +UID_A="$(echo "$RESP" | jq -r '.uid')" +RC_A="$(echo "$RESP" | jq -r '.replay_count')" +if [ -n "$UID_A" ] && [ "$UID_A" != "null" ] && [ "$RC_A" = "1" ]; then + echo " ✓ uid=$UID_A replay_count=1" +else + echo " ✗ resp: $RESP"; FAILED=1 +fi + +# ── 2. Get ──────────────────────────────────────────────────────── +echo "[pathway-smoke] Get → returns same trace:" +RESP="$(curl -sS "http://127.0.0.1:3110/v1/pathway/get/$UID_A")" +APPROACH="$(echo "$RESP" | jq -r '.content.approach')" +if [ "$APPROACH" = "forklift-OSHA-30" ]; then + echo " ✓ content.approach round-trips" +else + echo " ✗ resp: $RESP"; FAILED=1 +fi + +# ── 3. AddIdempotent (replay) ───────────────────────────────────── +echo "[pathway-smoke] AddIdempotent same UID → replay_count++:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/add_idempotent \ + -H 'Content-Type: application/json' \ + -d "{\"uid\":\"$UID_A\",\"content\":{\"approach\":\"forklift-OSHA-30\",\"outcome\":\"hired\"}}")" +RC_REPLAY="$(echo "$RESP" | jq -r '.replay_count')" +if [ "$RC_REPLAY" = "2" ]; then + echo " ✓ replay_count bumped to 2" +else + echo " ✗ replay_count=$RC_REPLAY"; FAILED=1 +fi + +# ── 4. Update ───────────────────────────────────────────────────── +echo "[pathway-smoke] Update → in-place content replace:" +HTTP="$(curl -sS -o "$TMP/upd.json" -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/update \ + -H 'Content-Type: application/json' \ + -d "{\"uid\":\"$UID_A\",\"content\":{\"approach\":\"forklift-OSHA-30\",\"outcome\":\"hired\",\"note\":\"cert verified\"}}")" +if [ "$HTTP" = "200" ]; then + NOTE="$(curl -sS "http://127.0.0.1:3110/v1/pathway/get/$UID_A" | jq -r '.content.note')" + if [ "$NOTE" = "cert verified" ]; then + echo " ✓ Update applied and persisted" + else + echo " ✗ note=$NOTE after update"; FAILED=1 + fi +else + echo " ✗ Update HTTP=$HTTP"; FAILED=1 +fi + +# ── 5. Revise → predecessor link ────────────────────────────────── +echo "[pathway-smoke] Revise → new UID with predecessor link:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/revise \ + -H 'Content-Type: application/json' \ + -d "{\"predecessor_uid\":\"$UID_A\",\"content\":{\"approach\":\"forklift-OSHA-30+CDL\",\"outcome\":\"upgraded\"},\"tags\":[\"staffing\",\"revision\"]}")" +UID_B="$(echo "$RESP" | jq -r '.uid')" +PRED="$(echo "$RESP" | jq -r '.predecessor_uid')" +if [ "$UID_B" != "$UID_A" ] && [ "$PRED" = "$UID_A" ]; then + echo " ✓ revision uid=$UID_B predecessor=$UID_A" +else + echo " ✗ uid=$UID_B pred=$PRED"; FAILED=1 +fi + +# ── 6. History → 2-trace chain ──────────────────────────────────── +echo "[pathway-smoke] History → walks chain backward:" +RESP="$(curl -sS "http://127.0.0.1:3110/v1/pathway/history/$UID_B")" +LEN="$(echo "$RESP" | jq -r '.length')" +HEAD="$(echo "$RESP" | jq -r '.chain[0].uid')" +TAIL="$(echo "$RESP" | jq -r '.chain[1].uid')" +if [ "$LEN" = "2" ] && [ "$HEAD" = "$UID_B" ] && [ "$TAIL" = "$UID_A" ]; then + echo " ✓ chain length=2, [0]=$UID_B [1]=$UID_A" +else + echo " ✗ len=$LEN head=$HEAD tail=$TAIL"; FAILED=1 +fi + +# ── 7. Search by tag ────────────────────────────────────────────── +echo "[pathway-smoke] Search tag=staffing → finds both traces:" +COUNT="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/search \ + -H 'Content-Type: application/json' -d '{"tag":"staffing"}' | jq -r '.count')" +if [ "$COUNT" = "2" ]; then + echo " ✓ tag search count=2" +else + echo " ✗ count=$COUNT"; FAILED=1 +fi + +# ── 8. Retire → excluded from search default, still in Get ──────── +echo "[pathway-smoke] Retire → excluded from Search but Get-able:" +HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/retire \ + -H 'Content-Type: application/json' -d "{\"uid\":\"$UID_A\"}")" +if [ "$HTTP" != "204" ]; then echo " ✗ retire HTTP=$HTTP"; FAILED=1; fi + +# Default search excludes retired → only revision (UID_B) remains +COUNT_DEFAULT="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/search \ + -H 'Content-Type: application/json' -d '{"tag":"staffing"}' | jq -r '.count')" +# IncludeRetired=true brings UID_A back +COUNT_ALL="$(curl -sS -X POST http://127.0.0.1:3110/v1/pathway/search \ + -H 'Content-Type: application/json' -d '{"tag":"staffing","include_retired":true}' | jq -r '.count')" +# Get on retired UID still returns the trace (audit trail intact) +RETIRED_FLAG="$(curl -sS "http://127.0.0.1:3110/v1/pathway/get/$UID_A" | jq -r '.retired')" +if [ "$COUNT_DEFAULT" = "1" ] && [ "$COUNT_ALL" = "2" ] && [ "$RETIRED_FLAG" = "true" ]; then + echo " ✓ retired excluded from default Search, included with flag, still Get-able" +else + echo " ✗ default=$COUNT_DEFAULT all=$COUNT_ALL retired=$RETIRED_FLAG"; FAILED=1 +fi + +# ── 9. Stats ────────────────────────────────────────────────────── +echo "[pathway-smoke] Stats → total/active/retired counters:" +STATS="$(curl -sS http://127.0.0.1:3110/v1/pathway/stats)" +T="$(echo "$STATS" | jq -r '.Total')" +A="$(echo "$STATS" | jq -r '.Active')" +R="$(echo "$STATS" | jq -r '.Retired')" +if [ "$T" = "2" ] && [ "$A" = "1" ] && [ "$R" = "1" ]; then + echo " ✓ total=2 active=1 retired=1" +else + echo " ✗ total=$T active=$A retired=$R"; FAILED=1 +fi + +# ── 10. Negative paths ──────────────────────────────────────────── +echo "[pathway-smoke] Negative paths → 4xx semantics:" +GET_404="$(curl -sS -o /dev/null -w '%{http_code}' http://127.0.0.1:3110/v1/pathway/get/no-such-uid)" +UPD_404="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/update \ + -H 'Content-Type: application/json' -d '{"uid":"no-such-uid","content":{}}')" +REV_404="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/revise \ + -H 'Content-Type: application/json' -d '{"predecessor_uid":"no-such-uid","content":{}}')" +RET_404="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/retire \ + -H 'Content-Type: application/json' -d '{"uid":"no-such-uid"}')" +ADD_400="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/pathway/add \ + -H 'Content-Type: application/json' -d '{"content":not-json}')" +if [ "$GET_404" = "404" ] && [ "$UPD_404" = "404" ] && [ "$REV_404" = "404" ] && [ "$RET_404" = "404" ] && [ "$ADD_400" = "400" ]; then + echo " ✓ get/update/revise/retire on unknown → 404; bad content → 400" +else + echo " ✗ get=$GET_404 upd=$UPD_404 rev=$REV_404 ret=$RET_404 add=$ADD_400"; FAILED=1 +fi + +# ── 11. Persistence → kill + restart preserves all traces ───────── +echo "[pathway-smoke] kill + restart pathwayd → state survives:" +kill $PATHWAYD_PID 2>/dev/null || true +wait $PATHWAYD_PID 2>/dev/null || true +sleep 0.3 +launch_pathwayd +sleep 0.2 + +# Both traces should reappear, retired flag preserved, replay_count preserved +RESP_A="$(curl -sS "http://127.0.0.1:3110/v1/pathway/get/$UID_A")" +RESP_B="$(curl -sS "http://127.0.0.1:3110/v1/pathway/get/$UID_B")" +RC_AFTER="$(echo "$RESP_A" | jq -r '.replay_count')" +RETIRED_AFTER="$(echo "$RESP_A" | jq -r '.retired')" +PRED_AFTER="$(echo "$RESP_B" | jq -r '.predecessor_uid')" +if [ "$RC_AFTER" = "2" ] && [ "$RETIRED_AFTER" = "true" ] && [ "$PRED_AFTER" = "$UID_A" ]; then + echo " ✓ replay_count, retired flag, predecessor link all preserved" +else + echo " ✗ replay_count=$RC_AFTER retired=$RETIRED_AFTER pred=$PRED_AFTER"; FAILED=1 +fi + +if [ "$FAILED" -eq 0 ]; then + echo "[pathway-smoke] Pathway acceptance gate: PASSED" + exit 0 +else + echo "[pathway-smoke] Pathway acceptance gate: FAILED" + exit 1 +fi