Cross-lineage scrum (Opus 4.7 / Kimi K2.6 / Qwen3-coder via chatd's
/v1/chat) on the harness's first 4 commits surfaced 5 real bugs;
this commit lands the 4 inside the LLM/validator stack. B5 (scanner
skip-list semantics) ships separately as it changes scan behavior
on every target repo.
B1 (Kimi BLOCK + Opus WARN convergent) — internal/validators:
evidencePresent had two flaws: (1) cursor advanced on match in the
trim-line fallback, breaking same-line repeated matches AND skipping
not-yet-considered lines so out-of-order evidence spuriously failed;
(2) strings.Contains on a single `}` trim-matched any closing brace
in the file, defeating the "evidence quotes real text" contract.
Fix: trivial-evidence guard FIRST (reject anything <4 non-whitespace
chars) + per-line search no longer advances a cursor. New regression
test TestEvidencePresent_RejectsTrivialMatches covers `}`, `{`, `)`,
empty, and out-of-order multi-line evidence (which now passes —
order isn't part of the contract).
B2 (Kimi WARN + Opus WARN convergent) — internal/pipeline:
WriteJSON error for rejected-findings.json was swallowed with
`if err == nil`, so a write failure left the validation phase
reporting status="ok" while the audit trail vanished. Mirror the
validated-findings branch: surface the error in
validatePhase.Errors + bump status to degraded + ExitCode=66.
B3 (Kimi BLOCK + Opus BLOCK convergent) — internal/llm/ollama.go:
HealthCheck.basic_prompt_ok was set to true on ANY non-empty
response, so a model emitting `<think>...` traces or apologies
passed silently. Now requires the response to contain "OK"
(uppercase, substring). Substring rather than equality lets minor
whitespace/punctuation variations through (some models add a
trailing period). Errors now record what the model actually said
when it fails the check.
B4 (Opus BLOCK only — same class as today's chatd Anthropic-temp
fix) — internal/llm/ollama.go: chatBody had `if opts.Temperature != 0`
which silently dropped Temperature=0 from the request, so HealthCheck
+ Reviewer (both pass Temperature=0 expecting determinism) actually
ran at Ollama's ~0.8 default. Always forward Temperature now. The
two callers always set explicit values, so "0 means 0" is correct;
if a future caller wants Ollama's default they'll switch
CompleteOptions.Temperature to *float64 like chatd did this morning.
Verified end-to-end: insecure-repo + --enable-llm still produces 25
confirmed findings (16 static + 9 LLM), 0 rejected. Validator unit
tests: 11 pass (added TestEvidencePresent_RejectsTrivialMatches).
Same-day-as-shipping scrum, same-day-as-shipping fixes. The
convergent-≥2 gate caught 3 of these; the 4th was Opus-only but
verified by reading the code (same idiom as today's chatd bug).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
332 lines
12 KiB
Go
332 lines
12 KiB
Go
// Package pipeline orchestrates the per-phase execution. Each phase
|
|
// produces JSON / markdown artifacts and a per-phase Receipt entry.
|
|
// Degraded mode propagates: if Phase C (LLM review) can't run, the
|
|
// pipeline still ships the static-scan deliverables and marks the
|
|
// LLM phase degraded — never silently skipped.
|
|
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"local-review-harness/internal/analyzers"
|
|
"local-review-harness/internal/config"
|
|
"local-review-harness/internal/git"
|
|
"local-review-harness/internal/llm"
|
|
"local-review-harness/internal/reporters"
|
|
"local-review-harness/internal/scanner"
|
|
"local-review-harness/internal/validators"
|
|
)
|
|
|
|
// Inputs is the bag the CLI passes to the pipeline.
|
|
type Inputs struct {
|
|
RepoPath string
|
|
ReviewProfile config.ReviewProfile
|
|
ModelProfile config.ModelProfile
|
|
OutputDir string
|
|
EmitScrum bool // true → also emit scrum-test/risk-register/sprint-backlog/acceptance-gates markdown
|
|
EnableLLM bool // Phase C: actually call the model. Off by default — operators opt in.
|
|
}
|
|
|
|
// Result is what the CLI shows the operator.
|
|
type Result struct {
|
|
OutputFiles []string
|
|
ExitCode int // 0=ok, 66=degraded, 65=runtime
|
|
}
|
|
|
|
// RunRepo executes Phase 0 (intake), Phase 1 (static), Phase 4 (report).
|
|
// Phases 2 (LLM) + 3 (validate) + 5 (memory) ship later — every phase
|
|
// not run lands in receipts as "skipped" or "degraded".
|
|
func RunRepo(ctx context.Context, in Inputs) (*Result, error) {
|
|
startedAt := time.Now().UTC()
|
|
runID := newRunID(startedAt)
|
|
res := &Result{ExitCode: 0}
|
|
receipt := reporters.Receipt{
|
|
RunID: runID,
|
|
RepoPath: in.RepoPath,
|
|
StartedAt: startedAt.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
// Clean output dir before each run so stale files from a prior
|
|
// run can't leak into the current report set. 2026-04-30 fix:
|
|
// before this, a previous run's rejected-findings.json could
|
|
// stick around when the current run had no rejections, confusing
|
|
// operators about which data was current.
|
|
cleanOutputDir(in.OutputDir)
|
|
|
|
// --- Phase 0: repo intake ---
|
|
scan, err := scanner.Walk(in.RepoPath, true)
|
|
scanPhase := reporters.PhaseReceipt{Name: "repo_intake", Status: "ok"}
|
|
if err != nil {
|
|
scanPhase.Status = "failed"
|
|
scanPhase.Errors = append(scanPhase.Errors, err.Error())
|
|
receipt.Phases = append(receipt.Phases, scanPhase)
|
|
res.ExitCode = 65
|
|
// Even on scan failure, write the receipt so operators can
|
|
// see what blew up.
|
|
_ = writeReceipt(in.OutputDir, &receipt, startedAt, nil)
|
|
return res, err
|
|
}
|
|
gi := git.Inspect(ctx, in.RepoPath)
|
|
intake := reporters.BuildIntake(scan, gi)
|
|
intakePath := filepath.Join(in.OutputDir, "repo-intake.json")
|
|
if sha, err := reporters.WriteJSON(intakePath, intake); err != nil {
|
|
scanPhase.Status = "failed"
|
|
scanPhase.Errors = append(scanPhase.Errors, err.Error())
|
|
} else {
|
|
scanPhase.OutputFiles = []string{"repo-intake.json"}
|
|
scanPhase.OutputHash = sha
|
|
}
|
|
if !gi.HasGit {
|
|
scanPhase.Status = "degraded"
|
|
scanPhase.Errors = append(scanPhase.Errors, "no git metadata (not a git repo or git unavailable)")
|
|
if res.ExitCode == 0 {
|
|
res.ExitCode = 66
|
|
}
|
|
}
|
|
receipt.Phases = append(receipt.Phases, scanPhase)
|
|
res.OutputFiles = append(res.OutputFiles, "repo-intake.json")
|
|
|
|
// --- Phase 1: static scan ---
|
|
findings := analyzers.Run(scan, in.ReviewProfile)
|
|
staticOut := reporters.StaticFindings{
|
|
GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Findings: findings,
|
|
Summary: reporters.SummarizeFindings(findings),
|
|
}
|
|
staticPath := filepath.Join(in.OutputDir, "static-findings.json")
|
|
staticPhase := reporters.PhaseReceipt{Name: "static_scan", Status: "ok"}
|
|
if sha, err := reporters.WriteJSON(staticPath, staticOut); err != nil {
|
|
staticPhase.Status = "failed"
|
|
staticPhase.Errors = append(staticPhase.Errors, err.Error())
|
|
res.ExitCode = 65
|
|
} else {
|
|
staticPhase.OutputFiles = []string{"static-findings.json"}
|
|
staticPhase.OutputHash = sha
|
|
}
|
|
receipt.Phases = append(receipt.Phases, staticPhase)
|
|
res.OutputFiles = append(res.OutputFiles, "static-findings.json")
|
|
|
|
// --- Phase 2: LLM review (Phase C) ---
|
|
llmDegraded := true
|
|
llmPhase := reporters.PhaseReceipt{Name: "llm_review", Status: "skipped"}
|
|
if !in.EnableLLM {
|
|
llmPhase.Errors = append(llmPhase.Errors, "LLM review not requested (pass --enable-llm to opt in)")
|
|
} else {
|
|
llmFindings, raw, llmErr := runLLMReview(ctx, scan, in)
|
|
// Always save raw output, even on failure — operator forensics.
|
|
rawPath := filepath.Join(in.OutputDir, "llm-findings.raw.json")
|
|
if _, err := reporters.WriteJSON(rawPath, raw); err == nil {
|
|
llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.raw.json")
|
|
}
|
|
if llmErr != nil {
|
|
llmPhase.Status = "degraded"
|
|
llmPhase.Errors = append(llmPhase.Errors, llmErr.Error())
|
|
} else {
|
|
normalized := reporters.StaticFindings{
|
|
GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Findings: llmFindings,
|
|
Summary: reporters.SummarizeFindings(llmFindings),
|
|
}
|
|
if sha, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "llm-findings.normalized.json"), normalized); err == nil {
|
|
llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.normalized.json")
|
|
llmPhase.OutputHash = sha
|
|
llmPhase.Status = "ok"
|
|
llmDegraded = false
|
|
findings = append(findings, llmFindings...)
|
|
res.OutputFiles = append(res.OutputFiles, "llm-findings.raw.json", "llm-findings.normalized.json")
|
|
} else {
|
|
llmPhase.Status = "failed"
|
|
llmPhase.Errors = append(llmPhase.Errors, "write normalized: "+err.Error())
|
|
}
|
|
}
|
|
}
|
|
if llmDegraded && res.ExitCode == 0 {
|
|
res.ExitCode = 66
|
|
}
|
|
receipt.Phases = append(receipt.Phases, llmPhase)
|
|
|
|
// --- Phase 3: validation (Phase D) ---
|
|
// Cross-checks every LLM-sourced finding against actual file
|
|
// content + path-traversal protection. Static findings pass
|
|
// through promoted-to-confirmed (their evidence is already
|
|
// grep-truthful by construction). Rejected findings land in
|
|
// rejected-findings.json with per-rejection reason for the
|
|
// audit trail.
|
|
validatePhase := reporters.PhaseReceipt{Name: "validation", Status: "ok"}
|
|
valOut := validators.Validate(in.RepoPath, findings)
|
|
findings = valOut.Validated // pipeline downstream only sees validated set
|
|
|
|
if sha, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "validated-findings.json"), reporters.StaticFindings{
|
|
GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano),
|
|
Findings: valOut.Validated,
|
|
Summary: reporters.SummarizeFindings(valOut.Validated),
|
|
}); err != nil {
|
|
validatePhase.Status = "failed"
|
|
validatePhase.Errors = append(validatePhase.Errors, "validated: "+err.Error())
|
|
} else {
|
|
validatePhase.OutputFiles = append(validatePhase.OutputFiles, "validated-findings.json")
|
|
validatePhase.OutputHash = sha
|
|
}
|
|
if len(valOut.Rejected) > 0 {
|
|
// Scrum fix B2 (Kimi WARN + Opus WARN, 2026-04-30): surface
|
|
// write errors into validatePhase.Errors. Pre-fix the error
|
|
// was swallowed (`if _, err := ...; err == nil`), which broke
|
|
// the receipts-honesty contract — phase status said "ok" while
|
|
// the rejected-findings audit trail silently vanished.
|
|
if _, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "rejected-findings.json"), valOut); err != nil {
|
|
validatePhase.Status = "degraded"
|
|
validatePhase.Errors = append(validatePhase.Errors, "rejected-findings.json: "+err.Error())
|
|
if res.ExitCode == 0 {
|
|
res.ExitCode = 66
|
|
}
|
|
} else {
|
|
validatePhase.OutputFiles = append(validatePhase.OutputFiles, "rejected-findings.json")
|
|
res.OutputFiles = append(res.OutputFiles, "rejected-findings.json")
|
|
}
|
|
}
|
|
res.OutputFiles = append(res.OutputFiles, "validated-findings.json")
|
|
receipt.Phases = append(receipt.Phases, validatePhase)
|
|
|
|
// --- Phase 4: report generation (markdown) ---
|
|
if in.EmitScrum {
|
|
reportPhase := reporters.PhaseReceipt{Name: "report_generation", Status: "ok"}
|
|
writers := []struct {
|
|
name string
|
|
fn func() error
|
|
}{
|
|
{"scrum-test.md", func() error {
|
|
return reporters.WriteScrumTest(filepath.Join(in.OutputDir, "scrum-test.md"), intake, findings, llmDegraded)
|
|
}},
|
|
{"risk-register.md", func() error {
|
|
return reporters.WriteRiskRegister(filepath.Join(in.OutputDir, "risk-register.md"), findings)
|
|
}},
|
|
{"claim-coverage-table.md", func() error {
|
|
return reporters.WriteClaimCoverage(filepath.Join(in.OutputDir, "claim-coverage-table.md"), findings)
|
|
}},
|
|
{"sprint-backlog.md", func() error {
|
|
return reporters.WriteSprintBacklog(filepath.Join(in.OutputDir, "sprint-backlog.md"), staticOut.Summary)
|
|
}},
|
|
{"acceptance-gates.md", func() error {
|
|
return reporters.WriteAcceptanceGates(filepath.Join(in.OutputDir, "acceptance-gates.md"), staticOut.Summary)
|
|
}},
|
|
}
|
|
for _, w := range writers {
|
|
if err := w.fn(); err != nil {
|
|
reportPhase.Status = "failed"
|
|
reportPhase.Errors = append(reportPhase.Errors, w.name+": "+err.Error())
|
|
res.ExitCode = 65
|
|
continue
|
|
}
|
|
reportPhase.OutputFiles = append(reportPhase.OutputFiles, w.name)
|
|
res.OutputFiles = append(res.OutputFiles, w.name)
|
|
}
|
|
receipt.Phases = append(receipt.Phases, reportPhase)
|
|
}
|
|
|
|
// --- Phase 5: memory (Phase E — deferred) ---
|
|
receipt.Phases = append(receipt.Phases, reporters.PhaseReceipt{
|
|
Name: "memory_update", Status: "skipped",
|
|
Errors: []string{"Phase E not implemented in MVP"},
|
|
})
|
|
|
|
// --- Receipt ---
|
|
receipt.Summary = staticOut.Summary
|
|
if err := writeReceipt(in.OutputDir, &receipt, startedAt, nil); err != nil {
|
|
return res, err
|
|
}
|
|
res.OutputFiles = append(res.OutputFiles, "receipts.json")
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func writeReceipt(outputDir string, r *reporters.Receipt, startedAt time.Time, _ error) error {
|
|
r.FinishedAt = time.Now().UTC().Format(time.RFC3339Nano)
|
|
_ = startedAt // present for future timing fields
|
|
_, err := reporters.WriteJSON(filepath.Join(outputDir, "receipts.json"), r)
|
|
return err
|
|
}
|
|
|
|
// runLLMReview chunks the scan into per-file inputs, calls the
|
|
// reviewer, and aggregates parsed findings + raw outputs. Returns
|
|
// (findings, raw-outputs-array-for-receipts, error). The error is
|
|
// non-nil only when the provider is fundamentally unreachable;
|
|
// per-chunk parse failures land as ReviewOutput.Error and don't
|
|
// fail the whole phase.
|
|
func runLLMReview(ctx context.Context, scan *scanner.Result, in Inputs) ([]analyzers.Finding, []llm.ReviewOutput, error) {
|
|
prov := llm.NewOllama(in.ModelProfile.BaseURL, time.Duration(in.ModelProfile.TimeoutSeconds)*time.Second)
|
|
hctx, hcancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer hcancel()
|
|
hs := prov.HealthCheck(hctx, in.ModelProfile.Model, in.ModelProfile.FallbackModel)
|
|
if !hs.ServerAvailable {
|
|
return nil, nil, fmt.Errorf("ollama unreachable at %s — Phase 2 cannot run", in.ModelProfile.BaseURL)
|
|
}
|
|
if !hs.PrimaryModelAvailable && !hs.FallbackModelAvailable {
|
|
return nil, nil, fmt.Errorf("neither primary %q nor fallback %q loaded in Ollama", in.ModelProfile.Model, in.ModelProfile.FallbackModel)
|
|
}
|
|
model := in.ModelProfile.Model
|
|
if !hs.PrimaryModelAvailable {
|
|
model = in.ModelProfile.FallbackModel
|
|
}
|
|
|
|
r := llm.NewReviewer(prov, model, llm.CompleteOptions{
|
|
Temperature: in.ModelProfile.Temperature,
|
|
MaxTokens: 0, // let model decide
|
|
TimeoutSeconds: in.ModelProfile.TimeoutSeconds,
|
|
})
|
|
|
|
chunks := llm.ChunkInputsFromScan(scan, in.ReviewProfile.Limits.MaxFileBytes, in.ReviewProfile.Limits.MaxLLMChunkChars, func(abs string) string {
|
|
b, err := os.ReadFile(abs)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return string(b)
|
|
})
|
|
|
|
outputs := r.ReviewBatch(ctx, chunks)
|
|
findings := []analyzers.Finding{}
|
|
for _, o := range outputs {
|
|
findings = append(findings, o.Findings...)
|
|
}
|
|
return findings, outputs, nil
|
|
}
|
|
|
|
// cleanOutputDir removes only the files this pipeline emits. We don't
|
|
// nuke the dir because operators might keep adjacent files there
|
|
// (e.g. `.gitkeep`); we delete a bounded list so prior-run artifacts
|
|
// can't masquerade as current data, but operator-owned files stay.
|
|
func cleanOutputDir(dir string) {
|
|
if dir == "" {
|
|
return
|
|
}
|
|
known := []string{
|
|
"repo-intake.json",
|
|
"static-findings.json",
|
|
"llm-findings.raw.json",
|
|
"llm-findings.normalized.json",
|
|
"validated-findings.json",
|
|
"rejected-findings.json",
|
|
"scrum-test.md",
|
|
"risk-register.md",
|
|
"claim-coverage-table.md",
|
|
"sprint-backlog.md",
|
|
"acceptance-gates.md",
|
|
"receipts.json",
|
|
"model-doctor.json",
|
|
}
|
|
for _, name := range known {
|
|
_ = os.Remove(filepath.Join(dir, name))
|
|
}
|
|
}
|
|
|
|
func newRunID(t time.Time) string {
|
|
var rb [4]byte
|
|
_, _ = rand.Read(rb[:])
|
|
return t.UTC().Format("20060102T150405") + "-" + hex.EncodeToString(rb[:])
|
|
}
|