observerd: test that locks ADR-005 Decision 5.3
TestWorkflowRun_AllProvenanceRecordedPostRun proves that
handleWorkflowRun records ObservedOps only AFTER runner.Run returns,
not interleaved with node execution.
The test pauses inside a node via a controlled channel, samples
observer.Store mid-run (must be 0), unblocks, then samples again
(must be N). If a future commit adds per-node streaming (e.g.
runner.NodeHook firing as each node finishes), n1's record would
appear before the unblock and the first assertion fires.
This is intentional test-as-spec lock. Closing the streaming gap is
deferred per the ADR ("acceptable for short workflows; streaming
callback is the right shape when workflows get longer") — but if
someone later adds the streaming callback without updating the ADR,
this test catches it in `go test` instead of leaving the doc and
code drifted.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2c71d1c637
commit
9ce067bd9d
@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
|
|
||||||
@ -90,6 +91,80 @@ func TestEvent_InvalidOp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWorkflowRun_AllProvenanceRecordedPostRun proves the gap ratified
|
||||||
|
// in ADR-005 Decision 5.3: handleWorkflowRun calls runner.Run
|
||||||
|
// synchronously and only records ObservedOps from the returned
|
||||||
|
// RunResult AFTER Run completes. A crash mid-Run would lose ALL
|
||||||
|
// provenance for that workflow.
|
||||||
|
//
|
||||||
|
// The test pauses inside a node, samples observer state (must be 0),
|
||||||
|
// unblocks, then samples again (must be N). If a future commit adds
|
||||||
|
// per-node streaming (e.g. runner.NodeHook firing before Run returns),
|
||||||
|
// the first assertion fires — that's the intentional test-as-spec
|
||||||
|
// lock so the behavior change is visible in `go test` instead of
|
||||||
|
// surfacing under load.
|
||||||
|
func TestWorkflowRun_AllProvenanceRecordedPostRun(t *testing.T) {
|
||||||
|
pauseCh := make(chan struct{})
|
||||||
|
|
||||||
|
runner := workflow.NewRunner()
|
||||||
|
runner.RegisterMode("test.pause", func(_ workflow.Context, _ map[string]any) (map[string]any, error) {
|
||||||
|
<-pauseCh
|
||||||
|
return map[string]any{"unpaused": true}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
h := &handlers{
|
||||||
|
store: observer.NewStore(nil),
|
||||||
|
runner: runner,
|
||||||
|
}
|
||||||
|
r := chi.NewRouter()
|
||||||
|
h.register(r)
|
||||||
|
|
||||||
|
// Two-node serial workflow so we have something to record post-run.
|
||||||
|
body := []byte(`{"workflow":{"name":"adr_005_5_3","nodes":[
|
||||||
|
{"id":"n1","mode":"test.pause"},
|
||||||
|
{"id":"n2","mode":"test.pause","depends_on":["n1"]}
|
||||||
|
]}}`)
|
||||||
|
|
||||||
|
// Send the request in a goroutine — it'll block until pauseCh closes.
|
||||||
|
done := make(chan int)
|
||||||
|
go func() {
|
||||||
|
req := httptest.NewRequest("POST", "/observer/workflow/run", bytes.NewReader(body))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
done <- w.Code
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait briefly for the runner to enter n1 and block on pauseCh.
|
||||||
|
// 50ms is conservative; the goroutine + chi routing + topo sort
|
||||||
|
// take well under that on this hardware.
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
// LOCK: store MUST be empty while runner.Run is paused.
|
||||||
|
// If a future change adds streaming-record-as-each-node-finishes,
|
||||||
|
// n1's record would land here as soon as n1 returns — but n1
|
||||||
|
// hasn't returned yet (we're paused before it does), so the
|
||||||
|
// only way this assertion passes is if recording is post-run-only.
|
||||||
|
if got := h.store.Stats().Total; got != 0 {
|
||||||
|
t.Errorf("expected 0 observer ops during paused run, got %d "+
|
||||||
|
"(if non-zero, ADR-005 Decision 5.3 must be updated — recording "+
|
||||||
|
"is no longer post-run-only)", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock all paused nodes (channel close broadcasts to all receivers).
|
||||||
|
close(pauseCh)
|
||||||
|
|
||||||
|
// Wait for the handler to return + record post-run.
|
||||||
|
if code := <-done; code != http.StatusOK {
|
||||||
|
t.Errorf("workflow run failed: HTTP %d", code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LOCK: store MUST have 2 ops after run completes.
|
||||||
|
if got := h.store.Stats().Total; got != 2 {
|
||||||
|
t.Errorf("expected 2 observer ops after run, got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions
|
// TestWorkflowRun_UnknownMode locks the 400 path on workflow definitions
|
||||||
// that reference modes not registered with the runner. The harness's
|
// that reference modes not registered with the runner. The harness's
|
||||||
// reality test runs depend on this so an unknown-mode misconfiguration
|
// reality test runs depend on this so an unknown-mode misconfiguration
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user