golangLAKEHOUSE/scripts/observer_smoke.sh
root bc9ab93afe H: observerd — autonomous-iteration witness loop (SPEC §2 port)
Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").

Routes (all under /v1/observer/* via gateway):
  GET  /observer/health   — liveness
  GET  /observer/stats    — total / successes / failures /
                             by_source / recent_scenario_ops
                             (matches Rust JSON shape exactly)
  POST /observer/event    — record one ObservedOp; auto-defaults
                             timestamp + source, validates required
                             fields (endpoint), persists to JSONL,
                             appends to ring buffer

Architecture:
  - internal/observer/types.go — ObservedOp model + Source taxonomy
    (mcp / scenario / langfuse / overseer_correction). Mirrors the
    Rust shape so JSON round-trips during cutover.
  - internal/observer/store.go — Store + Persistor. Ring buffer cap
    matches Rust's 2000; recent_scenarios cap matches Rust's 10.
    Same persist-then-apply order as pathwayd; same corruption-
    tolerant replay (skip malformed lines + warn).
  - cmd/observerd — :3219 HTTP service, fronted by gateway as
    /v1/observer/*.
  - lakehouse.toml + DefaultConfig — [observerd] block matches the
    pathwayd pattern (Bind + PersistPath; empty path = ephemeral).

Tests + smoke (all PASS):
  - 7 unit tests in store_test.go: validation, default fields,
    stats aggregation, recent-scenarios cap + ordering, ring-buffer
    rollover at cap, JSONL round-trip persistence, corruption-
    tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
  - scripts/observer_smoke.sh: 4 assertions through gateway —
    record 5 events (3 ok / 2 fail across 2 sources), stats
    aggregates correctly, empty-endpoint→400, kill+restart preserves
    via JSONL replay (5 ops, 3 ok, 2 err survive)

Deferred (named in package + cmd doc, not in this commit):
  - POST /observer/review (cloud-LLM hand-review fall-back). The
    heuristic-only path could land cheaply but the productized
    cloud path (qwen3-coder fall-back) is multi-day port.
  - Background loops: analyzeErrors, consolidatePlaybooks,
    tailOverseerCorrections (read overseer_corrections.jsonl into
    the ring buffer once per cycle).
  - escalateFailureClusterToLLMTeam (failure clustering trigger
    that posts to LLM Team's /api/run with code_review mode).

/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).

16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 20:18:02 -05:00

143 lines
5.1 KiB
Bash
Executable File

#!/usr/bin/env bash
# Observer smoke — autonomous-iteration witness service end-to-end.
# All assertions go through gateway :3110.
#
# Validates:
# - POST /observer/event records an op (success path + scenario source)
# - GET /observer/stats aggregates by source + counts successes/failures
# - Stats.recent_scenario_ops surfaces scenario digests
# - Validation: empty endpoint → 400
# - Persistence: kill+restart observerd preserves ops via JSONL replay
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[observer-smoke] building observerd + gateway..."
go build -o bin/ ./cmd/observerd ./cmd/gateway
pkill -f "bin/(observerd|gateway)" 2>/dev/null || true
sleep 0.3
PIDS=()
TMP="$(mktemp -d)"
PERSIST="$TMP/ops.jsonl"
CFG="$TMP/observer.toml"
cleanup() {
echo "[observer-smoke] cleanup"
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
cat > "$CFG" <<EOF
[gateway]
bind = "127.0.0.1:3110"
storaged_url = "http://127.0.0.1:3211"
catalogd_url = "http://127.0.0.1:3212"
ingestd_url = "http://127.0.0.1:3213"
queryd_url = "http://127.0.0.1:3214"
vectord_url = "http://127.0.0.1:3215"
embedd_url = "http://127.0.0.1:3216"
pathwayd_url = "http://127.0.0.1:3217"
matrixd_url = "http://127.0.0.1:3218"
observerd_url = "http://127.0.0.1:3219"
[observerd]
bind = "127.0.0.1:3219"
persist_path = "$PERSIST"
EOF
poll_health() {
local port="$1" deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
sleep 0.05
done
return 1
}
launch_observerd() {
./bin/observerd -config "$CFG" > /tmp/observerd.log 2>&1 &
OBSERVERD_PID=$!
PIDS+=($OBSERVERD_PID)
poll_health 3219 || { echo "observerd failed"; tail /tmp/observerd.log; return 1; }
}
echo "[observer-smoke] launching observerd → gateway..."
launch_observerd
./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 &
PIDS+=($!)
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
FAILED=0
# ── 1. Record 5 ops: 3 success + 2 fail across 2 sources ─────────
echo "[observer-smoke] record 5 ops:"
for i in 1 2 3; do
curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"ok-$i\",\"success\":true,\"duration_ms\":10,\"output_summary\":\"ok\",\"source\":\"mcp\"}"
done
for i in 1 2; do
curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"fail-$i\",\"success\":false,\"duration_ms\":10,\"output_summary\":\"err\",\"error\":\"boom\",\"source\":\"scenario\",\"staffer_id\":\"st-$i\",\"event_kind\":\"fill\",\"role\":\"Forklift\"}"
done
echo " ✓ 5 events posted"
# ── 2. Stats aggregation ─────────────────────────────────────────
echo "[observer-smoke] /observer/stats aggregates correctly:"
STATS="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)"
TOT="$(echo "$STATS" | jq -r '.total')"
OK="$(echo "$STATS" | jq -r '.successes')"
ERR="$(echo "$STATS" | jq -r '.failures')"
MCP="$(echo "$STATS" | jq -r '.by_source.mcp')"
SCEN="$(echo "$STATS" | jq -r '.by_source.scenario')"
RECENT_LEN="$(echo "$STATS" | jq -r '.recent_scenario_ops | length')"
if [ "$TOT" = "5" ] && [ "$OK" = "3" ] && [ "$ERR" = "2" ] && [ "$MCP" = "3" ] && [ "$SCEN" = "2" ] && [ "$RECENT_LEN" = "2" ]; then
echo " ✓ total=5 (3 ok + 2 fail) · by_source: mcp=3 scenario=2 · 2 scenario digests"
else
echo " ✗ total=$TOT ok=$OK err=$ERR mcp=$MCP scen=$SCEN recent=$RECENT_LEN"
echo " full: $STATS"
FAILED=1
fi
# ── 3. Validation: empty endpoint → 400 ──────────────────────────
echo "[observer-smoke] empty endpoint → 400:"
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d '{"endpoint":"","input_summary":"x","success":true,"duration_ms":1,"output_summary":"x"}')"
if [ "$HTTP" = "400" ]; then
echo " ✓ empty endpoint rejected"
else
echo " ✗ got $HTTP"; FAILED=1
fi
# ── 4. Persistence: kill + restart preserves ops ─────────────────
echo "[observer-smoke] kill + restart observerd → ops survive:"
kill $OBSERVERD_PID 2>/dev/null || true
wait $OBSERVERD_PID 2>/dev/null || true
sleep 0.3
launch_observerd
sleep 0.2
STATS2="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)"
TOT2="$(echo "$STATS2" | jq -r '.total')"
OK2="$(echo "$STATS2" | jq -r '.successes')"
ERR2="$(echo "$STATS2" | jq -r '.failures')"
if [ "$TOT2" = "5" ] && [ "$OK2" = "3" ] && [ "$ERR2" = "2" ]; then
echo " ✓ total=5 ok=3 err=2 preserved through restart"
else
echo " ✗ post-restart total=$TOT2 ok=$OK2 err=$ERR2"; FAILED=1
fi
if [ "$FAILED" -eq 0 ]; then
echo "[observer-smoke] Observer acceptance gate: PASSED"
exit 0
else
echo "[observer-smoke] Observer acceptance gate: FAILED"
exit 1
fi