// Package pipeline orchestrates the per-phase execution. Each phase // produces JSON / markdown artifacts and a per-phase Receipt entry. // Degraded mode propagates: if Phase C (LLM review) can't run, the // pipeline still ships the static-scan deliverables and marks the // LLM phase degraded — never silently skipped. package pipeline import ( "context" "crypto/rand" "encoding/hex" "fmt" "os" "path/filepath" "time" "local-review-harness/internal/analyzers" "local-review-harness/internal/config" "local-review-harness/internal/git" "local-review-harness/internal/llm" "local-review-harness/internal/memory" "local-review-harness/internal/reporters" "local-review-harness/internal/scanner" "local-review-harness/internal/validators" ) // Inputs is the bag the CLI passes to the pipeline. type Inputs struct { RepoPath string ReviewProfile config.ReviewProfile ModelProfile config.ModelProfile OutputDir string EmitScrum bool // true → also emit scrum-test/risk-register/sprint-backlog/acceptance-gates markdown EnableLLM bool // Phase C: actually call the model. Off by default — operators opt in. // DiffOnlyFiles, when non-nil, restricts the scan to JUST these // repo-relative paths — the diff subcommand uses this to focus // review on changed files rather than the full repo. Nil = scan // everything. DiffOnlyFiles []string } // Result is what the CLI shows the operator. type Result struct { OutputFiles []string ExitCode int // 0=ok, 66=degraded, 65=runtime } // RunRepo executes Phase 0 (intake), Phase 1 (static), Phase 4 (report). // Phases 2 (LLM) + 3 (validate) + 5 (memory) ship later — every phase // not run lands in receipts as "skipped" or "degraded". func RunRepo(ctx context.Context, in Inputs) (*Result, error) { startedAt := time.Now().UTC() runID := newRunID(startedAt) res := &Result{ExitCode: 0} receipt := reporters.Receipt{ RunID: runID, RepoPath: in.RepoPath, StartedAt: startedAt.Format(time.RFC3339Nano), } // Clean output dir before each run so stale files from a prior // run can't leak into the current report set. 2026-04-30 fix: // before this, a previous run's rejected-findings.json could // stick around when the current run had no rejections, confusing // operators about which data was current. cleanOutputDir(in.OutputDir) // --- Phase 0: repo intake --- scan, err := scanner.Walk(in.RepoPath, true) // Diff-mode: filter the scan to just the changed files. Phase 0 // intake stats stay accurate (operator wants to see what the // repo looks like overall) but analyzers + LLM only see the // changed slice. Pre-filter saves O(N) regex passes when N is // 5000 files but only 3 changed. if len(in.DiffOnlyFiles) > 0 && err == nil { want := map[string]bool{} for _, f := range in.DiffOnlyFiles { want[f] = true } filtered := scan.Files[:0] for _, f := range scan.Files { if want[f.Path] { filtered = append(filtered, f) } } scan.Files = filtered } scanPhase := reporters.PhaseReceipt{Name: "repo_intake", Status: "ok"} if err != nil { scanPhase.Status = "failed" scanPhase.Errors = append(scanPhase.Errors, err.Error()) receipt.Phases = append(receipt.Phases, scanPhase) res.ExitCode = 65 // Even on scan failure, write the receipt so operators can // see what blew up. _ = writeReceipt(in.OutputDir, &receipt, startedAt, nil) return res, err } gi := git.Inspect(ctx, in.RepoPath) intake := reporters.BuildIntake(scan, gi) intakePath := filepath.Join(in.OutputDir, "repo-intake.json") if sha, err := reporters.WriteJSON(intakePath, intake); err != nil { scanPhase.Status = "failed" scanPhase.Errors = append(scanPhase.Errors, err.Error()) } else { scanPhase.OutputFiles = []string{"repo-intake.json"} scanPhase.OutputHash = sha } if !gi.HasGit { scanPhase.Status = "degraded" scanPhase.Errors = append(scanPhase.Errors, "no git metadata (not a git repo or git unavailable)") if res.ExitCode == 0 { res.ExitCode = 66 } } receipt.Phases = append(receipt.Phases, scanPhase) res.OutputFiles = append(res.OutputFiles, "repo-intake.json") // --- Phase 1: static scan --- findings := analyzers.Run(scan, in.ReviewProfile) staticOut := reporters.StaticFindings{ GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano), Findings: findings, Summary: reporters.SummarizeFindings(findings), } staticPath := filepath.Join(in.OutputDir, "static-findings.json") staticPhase := reporters.PhaseReceipt{Name: "static_scan", Status: "ok"} if sha, err := reporters.WriteJSON(staticPath, staticOut); err != nil { staticPhase.Status = "failed" staticPhase.Errors = append(staticPhase.Errors, err.Error()) res.ExitCode = 65 } else { staticPhase.OutputFiles = []string{"static-findings.json"} staticPhase.OutputHash = sha } receipt.Phases = append(receipt.Phases, staticPhase) res.OutputFiles = append(res.OutputFiles, "static-findings.json") // --- Phase 2: LLM review (Phase C) --- llmDegraded := true llmPhase := reporters.PhaseReceipt{Name: "llm_review", Status: "skipped"} if !in.EnableLLM { llmPhase.Errors = append(llmPhase.Errors, "LLM review not requested (pass --enable-llm to opt in)") } else { llmFindings, raw, llmErr := runLLMReview(ctx, scan, in) // Always save raw output, even on failure — operator forensics. rawPath := filepath.Join(in.OutputDir, "llm-findings.raw.json") if _, err := reporters.WriteJSON(rawPath, raw); err == nil { llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.raw.json") } if llmErr != nil { llmPhase.Status = "degraded" llmPhase.Errors = append(llmPhase.Errors, llmErr.Error()) } else { normalized := reporters.StaticFindings{ GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano), Findings: llmFindings, Summary: reporters.SummarizeFindings(llmFindings), } if sha, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "llm-findings.normalized.json"), normalized); err == nil { llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.normalized.json") llmPhase.OutputHash = sha llmPhase.Status = "ok" llmDegraded = false findings = append(findings, llmFindings...) res.OutputFiles = append(res.OutputFiles, "llm-findings.raw.json", "llm-findings.normalized.json") } else { llmPhase.Status = "failed" llmPhase.Errors = append(llmPhase.Errors, "write normalized: "+err.Error()) } } } if llmDegraded && res.ExitCode == 0 { res.ExitCode = 66 } receipt.Phases = append(receipt.Phases, llmPhase) // --- Phase 3: validation (Phase D) --- // Cross-checks every LLM-sourced finding against actual file // content + path-traversal protection. Static findings pass // through promoted-to-confirmed (their evidence is already // grep-truthful by construction). Rejected findings land in // rejected-findings.json with per-rejection reason for the // audit trail. validatePhase := reporters.PhaseReceipt{Name: "validation", Status: "ok"} valOut := validators.Validate(in.RepoPath, findings) findings = valOut.Validated // pipeline downstream only sees validated set if sha, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "validated-findings.json"), reporters.StaticFindings{ GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano), Findings: valOut.Validated, Summary: reporters.SummarizeFindings(valOut.Validated), }); err != nil { validatePhase.Status = "failed" validatePhase.Errors = append(validatePhase.Errors, "validated: "+err.Error()) } else { validatePhase.OutputFiles = append(validatePhase.OutputFiles, "validated-findings.json") validatePhase.OutputHash = sha } if len(valOut.Rejected) > 0 { // Scrum fix B2 (Kimi WARN + Opus WARN, 2026-04-30): surface // write errors into validatePhase.Errors. Pre-fix the error // was swallowed (`if _, err := ...; err == nil`), which broke // the receipts-honesty contract — phase status said "ok" while // the rejected-findings audit trail silently vanished. if _, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "rejected-findings.json"), valOut); err != nil { validatePhase.Status = "degraded" validatePhase.Errors = append(validatePhase.Errors, "rejected-findings.json: "+err.Error()) if res.ExitCode == 0 { res.ExitCode = 66 } } else { validatePhase.OutputFiles = append(validatePhase.OutputFiles, "rejected-findings.json") res.OutputFiles = append(res.OutputFiles, "rejected-findings.json") } } res.OutputFiles = append(res.OutputFiles, "validated-findings.json") receipt.Phases = append(receipt.Phases, validatePhase) // --- Phase 4: report generation (markdown) --- if in.EmitScrum { reportPhase := reporters.PhaseReceipt{Name: "report_generation", Status: "ok"} writers := []struct { name string fn func() error }{ {"scrum-test.md", func() error { return reporters.WriteScrumTest(filepath.Join(in.OutputDir, "scrum-test.md"), intake, findings, llmDegraded) }}, {"risk-register.md", func() error { return reporters.WriteRiskRegister(filepath.Join(in.OutputDir, "risk-register.md"), findings) }}, {"claim-coverage-table.md", func() error { return reporters.WriteClaimCoverage(filepath.Join(in.OutputDir, "claim-coverage-table.md"), findings) }}, {"sprint-backlog.md", func() error { return reporters.WriteSprintBacklog(filepath.Join(in.OutputDir, "sprint-backlog.md"), staticOut.Summary) }}, {"acceptance-gates.md", func() error { return reporters.WriteAcceptanceGates(filepath.Join(in.OutputDir, "acceptance-gates.md"), staticOut.Summary) }}, } for _, w := range writers { if err := w.fn(); err != nil { reportPhase.Status = "failed" reportPhase.Errors = append(reportPhase.Errors, w.name+": "+err.Error()) res.ExitCode = 65 continue } reportPhase.OutputFiles = append(reportPhase.OutputFiles, w.name) res.OutputFiles = append(res.OutputFiles, w.name) } receipt.Phases = append(receipt.Phases, reportPhase) } // --- Phase 5: memory (Phase E) --- // Append-only writes under /.memory/. Persists confirmed // findings + run summary so cross-run drift becomes observable. // Per PROMPT.md hard rule: never silently overwrite. memPhase := reporters.PhaseReceipt{Name: "memory_update", Status: "ok"} if in.ReviewProfile.Memory.Enabled { mw, merr := memory.NewWriter(in.RepoPath) if merr != nil { memPhase.Status = "degraded" memPhase.Errors = append(memPhase.Errors, "memory writer: "+merr.Error()) } else { // Confirmed findings only — suspected ones are still being // validated and may not be real. Keeps the .memory/ log // authoritative. confirmed := []analyzers.Finding{} for _, f := range findings { if f.Status == analyzers.StatusConfirmed { confirmed = append(confirmed, f) } } if err := mw.AppendKnownRisks(runID, confirmed); err != nil { memPhase.Status = "degraded" memPhase.Errors = append(memPhase.Errors, "known-risks: "+err.Error()) } runEntry := memory.RunHistoryEntry{ RunID: runID, RepoPath: in.RepoPath, StartedAt: receipt.StartedAt, FinishedAt: time.Now().UTC().Format(time.RFC3339Nano), TotalFindings: staticOut.Summary.Total, Confirmed: staticOut.Summary.Confirmed, Critical: staticOut.Summary.Critical, High: staticOut.Summary.High, Medium: staticOut.Summary.Medium, Low: staticOut.Summary.Low, LLMEnabled: in.EnableLLM, ExitCode: res.ExitCode, } if err := mw.AppendRunHistory(runEntry); err != nil { memPhase.Status = "degraded" memPhase.Errors = append(memPhase.Errors, "run-history: "+err.Error()) } profile := memory.ProjectProfile{ RepoPath: in.RepoPath, LastSeenAt: time.Now().UTC().Format(time.RFC3339Nano), LastSeenCommit: intake.LatestCommit, LanguageBreakdown: intake.LanguageBreakdown, FileCount: intake.FileCount, } if err := mw.WriteProjectProfile(profile); err != nil { memPhase.Status = "degraded" memPhase.Errors = append(memPhase.Errors, "project-profile: "+err.Error()) } memPhase.OutputFiles = []string{ ".memory/known-risks.jsonl", ".memory/run-history.jsonl", ".memory/project-profile.json", } } } else { memPhase.Status = "skipped" memPhase.Errors = append(memPhase.Errors, "memory disabled in review-profile") } receipt.Phases = append(receipt.Phases, memPhase) // --- Receipt --- receipt.Summary = staticOut.Summary if err := writeReceipt(in.OutputDir, &receipt, startedAt, nil); err != nil { return res, err } res.OutputFiles = append(res.OutputFiles, "receipts.json") return res, nil } func writeReceipt(outputDir string, r *reporters.Receipt, startedAt time.Time, _ error) error { r.FinishedAt = time.Now().UTC().Format(time.RFC3339Nano) _ = startedAt // present for future timing fields _, err := reporters.WriteJSON(filepath.Join(outputDir, "receipts.json"), r) return err } // runLLMReview chunks the scan into per-file inputs, calls the // reviewer, and aggregates parsed findings + raw outputs. Returns // (findings, raw-outputs-array-for-receipts, error). The error is // non-nil only when the provider is fundamentally unreachable; // per-chunk parse failures land as ReviewOutput.Error and don't // fail the whole phase. func runLLMReview(ctx context.Context, scan *scanner.Result, in Inputs) ([]analyzers.Finding, []llm.ReviewOutput, error) { prov := llm.NewOllama(in.ModelProfile.BaseURL, time.Duration(in.ModelProfile.TimeoutSeconds)*time.Second) hctx, hcancel := context.WithTimeout(ctx, 5*time.Second) defer hcancel() hs := prov.HealthCheck(hctx, in.ModelProfile.Model, in.ModelProfile.FallbackModel) if !hs.ServerAvailable { return nil, nil, fmt.Errorf("ollama unreachable at %s — Phase 2 cannot run", in.ModelProfile.BaseURL) } if !hs.PrimaryModelAvailable && !hs.FallbackModelAvailable { return nil, nil, fmt.Errorf("neither primary %q nor fallback %q loaded in Ollama", in.ModelProfile.Model, in.ModelProfile.FallbackModel) } model := in.ModelProfile.Model if !hs.PrimaryModelAvailable { model = in.ModelProfile.FallbackModel } r := llm.NewReviewer(prov, model, llm.CompleteOptions{ Temperature: in.ModelProfile.Temperature, MaxTokens: 0, // let model decide TimeoutSeconds: in.ModelProfile.TimeoutSeconds, }) chunks := llm.ChunkInputsFromScan(scan, in.ReviewProfile.Limits.MaxFileBytes, in.ReviewProfile.Limits.MaxLLMChunkChars, func(abs string) string { b, err := os.ReadFile(abs) if err != nil { return "" } return string(b) }) outputs := r.ReviewBatch(ctx, chunks) findings := []analyzers.Finding{} for _, o := range outputs { findings = append(findings, o.Findings...) } return findings, outputs, nil } // cleanOutputDir removes only the files this pipeline emits. We don't // nuke the dir because operators might keep adjacent files there // (e.g. `.gitkeep`); we delete a bounded list so prior-run artifacts // can't masquerade as current data, but operator-owned files stay. func cleanOutputDir(dir string) { if dir == "" { return } known := []string{ "repo-intake.json", "static-findings.json", "llm-findings.raw.json", "llm-findings.normalized.json", "validated-findings.json", "rejected-findings.json", "scrum-test.md", "risk-register.md", "claim-coverage-table.md", "sprint-backlog.md", "acceptance-gates.md", "receipts.json", "model-doctor.json", } for _, name := range known { _ = os.Remove(filepath.Join(dir, name)) } } func newRunID(t time.Time) string { var rb [4]byte _, _ = rand.Read(rb[:]) return t.UTC().Format("20060102T150405") + "-" + hex.EncodeToString(rb[:]) }