§3.8 second slice: real modes wired (matrix.relevance/downgrade/search,
distillation.score, drift.scorer)
Lands the workflow.Mode adapters for the §3.4 components + the
distillation scorer + drift quantifier. Workflows can now compose
real measurement capabilities; the substrate's parallel
capabilities become composable Lego bricks (per the prior commit's
closing insight).
Modes registered (in observerd's registerBuiltinModes):
Pure-function wrappers (no I/O):
- matrix.relevance → matrix.FilterChunks
- matrix.downgrade → matrix.MaybeDowngrade
- distillation.score → distillation.ScoreRecord
- drift.scorer → drift.ComputeScorerDrift
HTTP-backed:
- matrix.search → POST matrixd /matrix/search
(registered only when matrixd_url is set)
Fixture (kept from §3.8 first slice):
- fixture.echo, fixture.upper
internal/workflow/modes.go:
Each mode follows the same glue pattern: marshal generic input
through a typed struct (free schema validation + clear error
messages), call the underlying capability, return a generic
output map. Roundtrip-via-JSON gives us schema validation
without writing custom field-by-field coercion.
internal/workflow/modes_test.go (10 tests, all PASS):
- matrix.relevance filters adjacency pollution (Connector kept,
catalogd::Registry dropped — same headline as the relevance
smoke, run through the workflow mode)
- matrix.downgrade flips lakehouse→isolation on strong model;
keeps lakehouse on weak (qwen3.5:latest); errors on missing
fields
- distillation.score rates scrum_review attempt_1 as accepted;
rejects empty record
- drift.scorer reports zero drift on matched inputs; errors on
empty inputs slice
- matrix.search HTTP flow round-trips through httptest fake
matrixd; non-OK status surfaces a clear error
scripts/workflow_smoke.sh (5 assertions PASS, was 4):
New assertion #5: real-mode chain
matrix.downgrade (lakehouse + grok-4.1-fast → isolation)
→ distillation.score (scrum_review attempt_1 → accepted)
Proves §3.4 components compose through the workflow runner with
no fixture intermediation. Both nodes ran successfully, runner
recorded provenance, status=succeeded.
Mode listing assertion now expects 7 modes (5 real + 2 fixture)
instead of just the fixtures.
17-smoke regression all green. SPEC §3.8 acceptance gate G3.8.D
("Mode catalog dispatches matrix.search invocation to the matrixd
backend without going through HTTP") still pending — current path
goes through HTTP for matrix.search, which is the cleaner service-
mesh shape but slower than direct in-process. In-process dispatch
when matrixd is co-resident is a future optimization.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e30da6e5aa
commit
c7e3124208
@ -71,7 +71,11 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
runner := workflow.NewRunner()
|
runner := workflow.NewRunner()
|
||||||
registerBuiltinModes(runner, store)
|
// matrixd URL: prefer explicit observerd config field, fall back
|
||||||
|
// to gateway's matrixd_url so a single-toml deploy works without
|
||||||
|
// duplicating the address.
|
||||||
|
matrixdURL := cfg.Gateway.MatrixdURL
|
||||||
|
registerBuiltinModes(runner, store, matrixdURL)
|
||||||
|
|
||||||
h := &handlers{store: store, runner: runner}
|
h := &handlers{store: store, runner: runner}
|
||||||
if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil {
|
if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil {
|
||||||
@ -187,18 +191,23 @@ func summarizeOutput(output map[string]any) string {
|
|||||||
return string(bs)
|
return string(bs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// registerBuiltinModes wires the modes the runner knows about. v0
|
// registerBuiltinModes wires the modes the runner knows about. The
|
||||||
// ships with fixture.echo + fixture.upper for testing the runner
|
// pure-function wrappers (matrix.relevance, matrix.downgrade,
|
||||||
// mechanics; real-mode integrations (matrix.search, distillation.
|
// distillation.score, drift.scorer) are direct Go calls. matrix.search
|
||||||
// score, drift.scorer, llm.chat) land in follow-up commits.
|
// is HTTP-backed, pointed at the configured matrixd_url so workflows
|
||||||
|
// can compose retrieval into multi-pass measurement chains.
|
||||||
//
|
//
|
||||||
// Each mode's signature matches workflow.Mode. The store parameter
|
// Fixture modes (fixture.echo, fixture.upper) stay registered for
|
||||||
// is reserved for modes that need to record their own ObservedOps
|
// the workflow_smoke that proves the runner mechanics independently
|
||||||
// (most don't — the runner records per-node provenance generically).
|
// of the real modes' availability.
|
||||||
func registerBuiltinModes(r *workflow.Runner, _ *observer.Store) {
|
//
|
||||||
|
// Real-mode follow-ups still pending:
|
||||||
|
// - playbook.record (HTTP to matrixd)
|
||||||
|
// - playbook.lookup (HTTP to matrixd)
|
||||||
|
// - llm.chat (HTTP to gateway /v1/chat)
|
||||||
|
func registerBuiltinModes(r *workflow.Runner, store *observer.Store, matrixdURL string) {
|
||||||
|
// Fixture modes for runner mechanics smokes.
|
||||||
r.RegisterMode("fixture.echo", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
|
r.RegisterMode("fixture.echo", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
|
||||||
// Verbatim copy of input → output. Useful for ref-substitution
|
|
||||||
// chains in smokes.
|
|
||||||
out := make(map[string]any, len(input))
|
out := make(map[string]any, len(input))
|
||||||
for k, v := range input {
|
for k, v := range input {
|
||||||
out[k] = v
|
out[k] = v
|
||||||
@ -206,14 +215,28 @@ func registerBuiltinModes(r *workflow.Runner, _ *observer.Store) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
})
|
})
|
||||||
r.RegisterMode("fixture.upper", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
|
r.RegisterMode("fixture.upper", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
|
||||||
// Returns {"upper": strings.ToUpper(input["prompt"])}. Toy
|
|
||||||
// mode for proving DAG ref substitution end-to-end.
|
|
||||||
prompt, _ := input["prompt"].(string)
|
prompt, _ := input["prompt"].(string)
|
||||||
return map[string]any{"upper": strings.ToUpper(prompt)}, nil
|
return map[string]any{"upper": strings.ToUpper(prompt)}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Real modes — pure-function wrappers (no I/O).
|
||||||
|
r.RegisterMode("matrix.relevance", workflow.MatrixRelevance)
|
||||||
|
r.RegisterMode("matrix.downgrade", workflow.MatrixDowngrade)
|
||||||
|
r.RegisterMode("distillation.score", workflow.DistillationScore)
|
||||||
|
r.RegisterMode("drift.scorer", workflow.DriftScorer)
|
||||||
|
|
||||||
|
// HTTP-backed modes — only register when their backend URL is set.
|
||||||
|
// matrixd_url defaults to a known address but tests/dev may run
|
||||||
|
// without matrixd.
|
||||||
|
if matrixdURL != "" {
|
||||||
|
hc := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
r.RegisterMode("matrix.search", workflow.MatrixSearch(matrixdURL, hc))
|
||||||
}
|
}
|
||||||
|
|
||||||
// stub to silence "imported and not used" until a real mode uses it
|
_ = store // reserved for future modes that need self-provenance
|
||||||
|
}
|
||||||
|
|
||||||
|
// context still used in decodeJSON via http.Request.Context().
|
||||||
var _ = context.Background
|
var _ = context.Background
|
||||||
|
|
||||||
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
|
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
|
||||||
|
|||||||
214
internal/workflow/modes.go
Normal file
214
internal/workflow/modes.go
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
package workflow
|
||||||
|
|
||||||
|
// modes.go — adapters that wrap §3.4 capabilities + §3.5 drift +
|
||||||
|
// distillation scorer as workflow.Mode functions. Each mode follows
|
||||||
|
// the same glue pattern: marshal the generic input map through a
|
||||||
|
// typed struct (so workflow YAML schemas are self-documenting and
|
||||||
|
// validation errors are clear), call the underlying capability,
|
||||||
|
// return a generic output map.
|
||||||
|
//
|
||||||
|
// Pure modes (no I/O): MatrixRelevance, MatrixDowngrade,
|
||||||
|
// DistillationScore, DriftScorer.
|
||||||
|
//
|
||||||
|
// HTTP modes: MatrixSearch + PlaybookRecord — observerd talks to
|
||||||
|
// matrixd over HTTP since the search/record paths need vectord
|
||||||
|
// access. Constructed via factory funcs that take the matrixd base
|
||||||
|
// URL + an http.Client.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/drift"
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/matrix"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ─── Pure-function wrappers ─────────────────────────────────────
|
||||||
|
|
||||||
|
// MatrixRelevance wraps matrix.FilterChunks. Input shape:
|
||||||
|
//
|
||||||
|
// {
|
||||||
|
// "focus": {"Path":"...", "Content":"...", ...},
|
||||||
|
// "chunks": [{"source":"...", "doc_id":"...", "text":"...", "score":0.8}, ...],
|
||||||
|
// "threshold": 0.3 # optional; default = matrix.DefaultRelevanceThreshold
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Output: {"kept":[...], "dropped":[...], "threshold":N, "total_in":N}.
|
||||||
|
func MatrixRelevance(_ Context, input map[string]any) (map[string]any, error) {
|
||||||
|
var req struct {
|
||||||
|
Focus matrix.FocusFile `json:"focus"`
|
||||||
|
Chunks []matrix.CandidateChunk `json:"chunks"`
|
||||||
|
Threshold float64 `json:"threshold"`
|
||||||
|
}
|
||||||
|
if err := remarshalInput(input, &req); err != nil {
|
||||||
|
return nil, fmt.Errorf("matrix.relevance: %w", err)
|
||||||
|
}
|
||||||
|
threshold := req.Threshold
|
||||||
|
if threshold == 0 {
|
||||||
|
threshold = matrix.DefaultRelevanceThreshold
|
||||||
|
}
|
||||||
|
res := matrix.FilterChunks(req.Focus, req.Chunks, threshold)
|
||||||
|
return map[string]any{
|
||||||
|
"kept": res.Kept,
|
||||||
|
"dropped": res.Dropped,
|
||||||
|
"threshold": res.Threshold,
|
||||||
|
"total_in": res.TotalIn,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MatrixDowngrade wraps matrix.MaybeDowngrade. Input shape:
|
||||||
|
//
|
||||||
|
// {
|
||||||
|
// "mode": "codereview_lakehouse",
|
||||||
|
// "model": "x-ai/grok-4.1-fast",
|
||||||
|
// "forced_mode": false, # optional
|
||||||
|
// "force_full_override": false # optional
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Output: matrix.DowngradeDecision JSON.
|
||||||
|
func MatrixDowngrade(_ Context, input map[string]any) (map[string]any, error) {
|
||||||
|
var req struct {
|
||||||
|
Mode string `json:"mode"`
|
||||||
|
Model string `json:"model"`
|
||||||
|
ForcedMode bool `json:"forced_mode"`
|
||||||
|
ForceFullOverride bool `json:"force_full_override"`
|
||||||
|
}
|
||||||
|
if err := remarshalInput(input, &req); err != nil {
|
||||||
|
return nil, fmt.Errorf("matrix.downgrade: %w", err)
|
||||||
|
}
|
||||||
|
if req.Mode == "" || req.Model == "" {
|
||||||
|
return nil, fmt.Errorf("matrix.downgrade: mode and model are required")
|
||||||
|
}
|
||||||
|
dec := matrix.MaybeDowngrade(matrix.DowngradeInput{
|
||||||
|
Mode: req.Mode,
|
||||||
|
Model: req.Model,
|
||||||
|
ForcedMode: req.ForcedMode,
|
||||||
|
ForceFullOverride: req.ForceFullOverride,
|
||||||
|
})
|
||||||
|
return map[string]any{
|
||||||
|
"mode": dec.Mode,
|
||||||
|
"downgraded_from": dec.DowngradedFrom,
|
||||||
|
"reason": dec.Reason,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DistillationScore wraps distillation.ScoreRecord — re-runs the
|
||||||
|
// scorer over a single EvidenceRecord. Useful as a workflow node
|
||||||
|
// that grades a freshly-produced evidence row.
|
||||||
|
//
|
||||||
|
// Input: a JSON EvidenceRecord under the key "record":
|
||||||
|
//
|
||||||
|
// {"record": {"run_id":"...", "task_id":"...", ...}}
|
||||||
|
//
|
||||||
|
// Output: ScoreOutput-ish map with category, reasons, sub_scores.
|
||||||
|
func DistillationScore(_ Context, input map[string]any) (map[string]any, error) {
|
||||||
|
var req struct {
|
||||||
|
Record distillation.EvidenceRecord `json:"record"`
|
||||||
|
}
|
||||||
|
if err := remarshalInput(input, &req); err != nil {
|
||||||
|
return nil, fmt.Errorf("distillation.score: %w", err)
|
||||||
|
}
|
||||||
|
if req.Record.RunID == "" {
|
||||||
|
return nil, fmt.Errorf("distillation.score: record.run_id required")
|
||||||
|
}
|
||||||
|
out := distillation.ScoreRecord(req.Record)
|
||||||
|
return map[string]any{
|
||||||
|
"category": string(out.Category),
|
||||||
|
"reasons": out.Reasons,
|
||||||
|
"sub_scores": out.SubScores,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DriftScorer wraps drift.ComputeScorerDrift. Input shape:
|
||||||
|
//
|
||||||
|
// {
|
||||||
|
// "inputs": [
|
||||||
|
// {"record": {...EvidenceRecord...}, "persisted_category": "accepted"},
|
||||||
|
// ...
|
||||||
|
// ],
|
||||||
|
// "include_entries": false # optional, default false
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Output: ScorerDriftReport JSON.
|
||||||
|
func DriftScorer(_ Context, input map[string]any) (map[string]any, error) {
|
||||||
|
var req struct {
|
||||||
|
Inputs []drift.ScorerDriftInput `json:"inputs"`
|
||||||
|
IncludeEntries bool `json:"include_entries"`
|
||||||
|
}
|
||||||
|
if err := remarshalInput(input, &req); err != nil {
|
||||||
|
return nil, fmt.Errorf("drift.scorer: %w", err)
|
||||||
|
}
|
||||||
|
if len(req.Inputs) == 0 {
|
||||||
|
return nil, fmt.Errorf("drift.scorer: inputs must be non-empty")
|
||||||
|
}
|
||||||
|
report := drift.ComputeScorerDrift(req.Inputs, req.IncludeEntries)
|
||||||
|
bs, err := json.Marshal(report)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var asMap map[string]any
|
||||||
|
if err := json.Unmarshal(bs, &asMap); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return asMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── HTTP-backed modes ──────────────────────────────────────────
|
||||||
|
|
||||||
|
// MatrixSearch returns a workflow.Mode bound to a matrixd base URL
|
||||||
|
// and HTTP client. The mode posts to /v1/matrix/search via the
|
||||||
|
// gateway-internal upstream (caller passes the URL).
|
||||||
|
//
|
||||||
|
// Input shape mirrors matrix.SearchRequest (see retrieve.go).
|
||||||
|
// Output is the matrix.SearchResponse JSON.
|
||||||
|
func MatrixSearch(matrixdURL string, hc *http.Client) Mode {
|
||||||
|
return func(ctx Context, input map[string]any) (map[string]any, error) {
|
||||||
|
bs, err := json.Marshal(input)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("matrix.search: marshal: %w", err)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx.Ctx, http.MethodPost,
|
||||||
|
matrixdURL+"/matrix/search", bytes.NewReader(bs))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
resp, err := hc.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("matrix.search: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("matrix.search: status %d: %s", resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
var out map[string]any
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("matrix.search: decode: %w", err)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// remarshalInput round-trips a generic input map through JSON into
|
||||||
|
// the typed target struct. Same trick as the matrixd handlers — gives
|
||||||
|
// us schema validation for free without writing custom field-by-field
|
||||||
|
// coercion.
|
||||||
|
func remarshalInput(input map[string]any, target any) error {
|
||||||
|
bs, err := json.Marshal(input)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return json.Unmarshal(bs, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
// silence "imported and not used" if context isn't referenced after
|
||||||
|
// the MatrixSearch factory is used. Compiler will catch the real case.
|
||||||
|
var _ = context.Background
|
||||||
211
internal/workflow/modes_test.go
Normal file
211
internal/workflow/modes_test.go
Normal file
@ -0,0 +1,211 @@
|
|||||||
|
package workflow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMatrixRelevance_FiltersAdjacencyPollution(t *testing.T) {
|
||||||
|
input := map[string]any{
|
||||||
|
"focus": map[string]any{
|
||||||
|
"Path": "crates/queryd/src/db.go",
|
||||||
|
"Content": "pub struct Connector {}\nuse catalogd::Registry;",
|
||||||
|
},
|
||||||
|
"chunks": []any{
|
||||||
|
map[string]any{
|
||||||
|
"source": "lakehouse_symbols_v1",
|
||||||
|
"doc_id": "symbol:queryd::struct::Connector",
|
||||||
|
"text": "Connector wraps the DuckDB handle.",
|
||||||
|
"score": 0.9,
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"source": "lakehouse_symbols_v1",
|
||||||
|
"doc_id": "symbol:catalogd::struct::Registry",
|
||||||
|
"text": "Registry stores manifests. Used by ingestd.",
|
||||||
|
"score": 0.85,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"threshold": 0.3,
|
||||||
|
}
|
||||||
|
out, err := MatrixRelevance(Context{}, input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MatrixRelevance: %v", err)
|
||||||
|
}
|
||||||
|
if out["total_in"].(int) != 2 {
|
||||||
|
t.Errorf("total_in: want 2, got %v", out["total_in"])
|
||||||
|
}
|
||||||
|
// Connector should be in kept (path/symbol match), Registry in dropped (import-only).
|
||||||
|
keptStr, _ := json.Marshal(out["kept"])
|
||||||
|
if !strings.Contains(string(keptStr), "Connector") {
|
||||||
|
t.Errorf("expected Connector in kept; kept=%s", keptStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatrixDowngrade_StrongModelDowngrades(t *testing.T) {
|
||||||
|
out, err := MatrixDowngrade(Context{}, map[string]any{
|
||||||
|
"mode": "codereview_lakehouse",
|
||||||
|
"model": "x-ai/grok-4.1-fast",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MatrixDowngrade: %v", err)
|
||||||
|
}
|
||||||
|
if out["mode"] != "codereview_isolation" {
|
||||||
|
t.Errorf("strong model should downgrade; got mode=%v", out["mode"])
|
||||||
|
}
|
||||||
|
if out["downgraded_from"] != "codereview_lakehouse" {
|
||||||
|
t.Errorf("downgraded_from: %v", out["downgraded_from"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatrixDowngrade_WeakModelKept(t *testing.T) {
|
||||||
|
out, err := MatrixDowngrade(Context{}, map[string]any{
|
||||||
|
"mode": "codereview_lakehouse",
|
||||||
|
"model": "qwen3.5:latest",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if out["mode"] != "codereview_lakehouse" {
|
||||||
|
t.Errorf("weak model should keep lakehouse; got %v", out["mode"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatrixDowngrade_MissingFieldsError(t *testing.T) {
|
||||||
|
_, err := MatrixDowngrade(Context{}, map[string]any{"mode": "codereview_lakehouse"})
|
||||||
|
if err == nil {
|
||||||
|
t.Error("missing model should error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDistillationScore_ScrumReviewAccepted(t *testing.T) {
|
||||||
|
out, err := DistillationScore(Context{}, map[string]any{
|
||||||
|
"record": map[string]any{
|
||||||
|
"run_id": "r-1",
|
||||||
|
"task_id": "t-1",
|
||||||
|
"timestamp": "2026-04-29T12:00:00Z",
|
||||||
|
"schema_version": 1,
|
||||||
|
"provenance": map[string]any{
|
||||||
|
"source_file": "data/_kb/scrum_reviews.jsonl",
|
||||||
|
"sig_hash": "abc",
|
||||||
|
"recorded_at": "2026-04-29T12:00:01Z",
|
||||||
|
},
|
||||||
|
"success_markers": []any{"accepted_on_attempt_1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if out["category"] != "accepted" {
|
||||||
|
t.Errorf("scrum_review attempt_1: want accepted, got %v", out["category"])
|
||||||
|
}
|
||||||
|
reasons, _ := out["reasons"].([]string)
|
||||||
|
if len(reasons) == 0 || !strings.Contains(reasons[0], "first attempt") {
|
||||||
|
t.Errorf("reasons missing 'first attempt': %v", reasons)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDistillationScore_RejectsEmptyRecord(t *testing.T) {
|
||||||
|
_, err := DistillationScore(Context{}, map[string]any{
|
||||||
|
"record": map[string]any{},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Error("empty record should error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDriftScorer_AllMatchedReturnsZeroDrift(t *testing.T) {
|
||||||
|
out, err := DriftScorer(Context{}, map[string]any{
|
||||||
|
"inputs": []any{
|
||||||
|
map[string]any{
|
||||||
|
"Record": map[string]any{
|
||||||
|
"run_id": "r-1", "task_id": "t-1",
|
||||||
|
"timestamp": "2026-04-29T12:00:00Z", "schema_version": 1,
|
||||||
|
"provenance": map[string]any{
|
||||||
|
"source_file": "data/_kb/scrum_reviews.jsonl",
|
||||||
|
"sig_hash": "x", "recorded_at": "2026-04-29T12:00:01Z",
|
||||||
|
},
|
||||||
|
"success_markers": []any{"accepted_on_attempt_1"},
|
||||||
|
},
|
||||||
|
"PersistedCategory": "accepted",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if out["drifted"].(float64) != 0 {
|
||||||
|
t.Errorf("no-drift case: drifted=%v", out["drifted"])
|
||||||
|
}
|
||||||
|
if out["matched"].(float64) != 1 {
|
||||||
|
t.Errorf("matched: want 1, got %v", out["matched"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDriftScorer_RequiresInputs(t *testing.T) {
|
||||||
|
_, err := DriftScorer(Context{}, map[string]any{"inputs": []any{}})
|
||||||
|
if err == nil {
|
||||||
|
t.Error("empty inputs should error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatrixSearch_HTTPFlow(t *testing.T) {
|
||||||
|
// Fake matrixd that echoes a canned SearchResponse.
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.HandleFunc("/matrix/search", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var body map[string]any
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&body)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
// Echo back deterministically with a synthesized result list.
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"results": []any{
|
||||||
|
map[string]any{"id": "w-1", "distance": 0.1, "corpus": "workers"},
|
||||||
|
},
|
||||||
|
"per_corpus_counts": map[string]any{"workers": 1},
|
||||||
|
"received_corpora": body["corpora"], // for round-trip verification
|
||||||
|
})
|
||||||
|
})
|
||||||
|
srv := httptest.NewServer(mux)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
mode := MatrixSearch(srv.URL, srv.Client())
|
||||||
|
out, err := mode(
|
||||||
|
Context{Ctx: context.Background()},
|
||||||
|
map[string]any{
|
||||||
|
"query_text": "forklift",
|
||||||
|
"corpora": []any{"workers"},
|
||||||
|
"k": 5,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MatrixSearch: %v", err)
|
||||||
|
}
|
||||||
|
results, ok := out["results"].([]any)
|
||||||
|
if !ok || len(results) != 1 {
|
||||||
|
t.Errorf("results: %v", out["results"])
|
||||||
|
}
|
||||||
|
if first, ok := results[0].(map[string]any); ok {
|
||||||
|
if first["id"] != "w-1" {
|
||||||
|
t.Errorf("id: %v", first["id"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatrixSearch_NonOKStatusErrors(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.Error(w, "matrixd is down", http.StatusBadGateway)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
mode := MatrixSearch(srv.URL, srv.Client())
|
||||||
|
_, err := mode(Context{Ctx: context.Background()}, map[string]any{})
|
||||||
|
if err == nil {
|
||||||
|
t.Error("502 should error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "502") {
|
||||||
|
t.Errorf("error should mention 502: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -72,14 +72,19 @@ poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
|
|||||||
FAILED=0
|
FAILED=0
|
||||||
|
|
||||||
# ── 1. /observer/workflow/modes lists registered modes ────────────
|
# ── 1. /observer/workflow/modes lists registered modes ────────────
|
||||||
echo "[workflow-smoke] /observer/workflow/modes lists fixture modes:"
|
echo "[workflow-smoke] /observer/workflow/modes lists fixtures + real modes:"
|
||||||
RESP="$(curl -sS http://127.0.0.1:3110/v1/observer/workflow/modes)"
|
RESP="$(curl -sS http://127.0.0.1:3110/v1/observer/workflow/modes)"
|
||||||
HAS_ECHO="$(echo "$RESP" | jq -r '.modes | index("fixture.echo") != null')"
|
EXPECTED=("fixture.echo" "fixture.upper" "matrix.relevance" "matrix.downgrade" "distillation.score" "drift.scorer" "matrix.search")
|
||||||
HAS_UPPER="$(echo "$RESP" | jq -r '.modes | index("fixture.upper") != null')"
|
MISSING=""
|
||||||
if [ "$HAS_ECHO" = "true" ] && [ "$HAS_UPPER" = "true" ]; then
|
for m in "${EXPECTED[@]}"; do
|
||||||
echo " ✓ fixture.echo + fixture.upper registered"
|
if [ "$(echo "$RESP" | jq -r --arg m "$m" '.modes | index($m) != null')" != "true" ]; then
|
||||||
|
MISSING="$MISSING $m"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
if [ -z "$MISSING" ]; then
|
||||||
|
echo " ✓ all 7 expected modes registered (fixtures + 4 pure + matrix.search HTTP)"
|
||||||
else
|
else
|
||||||
echo " ✗ resp: $RESP"; FAILED=1
|
echo " ✗ missing modes:$MISSING"; FAILED=1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# ── 2. 3-node DAG with $-ref substitution ─────────────────────────
|
# ── 2. 3-node DAG with $-ref substitution ─────────────────────────
|
||||||
@ -141,6 +146,44 @@ else
|
|||||||
echo " ✗ http=$HTTP err=$ERR"; FAILED=1
|
echo " ✗ http=$HTTP err=$ERR"; FAILED=1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# ── 5. Real-mode chain: matrix.downgrade → distillation.score ─────
|
||||||
|
# This proves the §3.4 components compose through the workflow runner.
|
||||||
|
# Two pure modes, no external service deps, deterministic input/output.
|
||||||
|
echo "[workflow-smoke] real-mode chain: downgrade → distillation.score"
|
||||||
|
REAL_WORKFLOW='{
|
||||||
|
"workflow": {
|
||||||
|
"name": "real-mode-chain",
|
||||||
|
"nodes": [
|
||||||
|
{"id":"gate", "mode":"matrix.downgrade",
|
||||||
|
"inputs":{"mode":"codereview_lakehouse", "model":"x-ai/grok-4.1-fast"}},
|
||||||
|
{"id":"score", "mode":"distillation.score",
|
||||||
|
"inputs":{"record":{
|
||||||
|
"run_id":"r-1", "task_id":"t-1",
|
||||||
|
"timestamp":"2026-04-29T12:00:00Z", "schema_version":1,
|
||||||
|
"provenance":{"source_file":"data/_kb/scrum_reviews.jsonl",
|
||||||
|
"sig_hash":"x", "recorded_at":"2026-04-29T12:00:01Z"},
|
||||||
|
"success_markers":["accepted_on_attempt_1"]
|
||||||
|
}}}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}'
|
||||||
|
RUN="$(curl -sS -X POST http://127.0.0.1:3110/v1/observer/workflow/run \
|
||||||
|
-H 'Content-Type: application/json' -d "$REAL_WORKFLOW")"
|
||||||
|
STATUS="$(echo "$RUN" | jq -r '.status')"
|
||||||
|
GATE_MODE="$(echo "$RUN" | jq -r '.nodes[0].output.mode')"
|
||||||
|
GATE_FROM="$(echo "$RUN" | jq -r '.nodes[0].output.downgraded_from')"
|
||||||
|
SCORE_CAT="$(echo "$RUN" | jq -r '.nodes[1].output.category')"
|
||||||
|
if [ "$STATUS" = "succeeded" ] \
|
||||||
|
&& [ "$GATE_MODE" = "codereview_isolation" ] \
|
||||||
|
&& [ "$GATE_FROM" = "codereview_lakehouse" ] \
|
||||||
|
&& [ "$SCORE_CAT" = "accepted" ]; then
|
||||||
|
echo " ✓ downgrade flipped lakehouse→isolation; scorer rated scrum_review attempt_1=accepted"
|
||||||
|
else
|
||||||
|
echo " ✗ status=$STATUS gate=$GATE_MODE from=$GATE_FROM score=$SCORE_CAT"
|
||||||
|
echo " full: $RUN"
|
||||||
|
FAILED=1
|
||||||
|
fi
|
||||||
|
|
||||||
if [ "$FAILED" -eq 0 ]; then
|
if [ "$FAILED" -eq 0 ]; then
|
||||||
echo "[workflow-smoke] Workflow runner acceptance: PASSED"
|
echo "[workflow-smoke] Workflow runner acceptance: PASSED"
|
||||||
exit 0
|
exit 0
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user