// Package pipeline orchestrates the per-phase execution. Each phase // produces JSON / markdown artifacts and a per-phase Receipt entry. // Degraded mode propagates: if Phase C (LLM review) can't run, the // pipeline still ships the static-scan deliverables and marks the // LLM phase degraded — never silently skipped. package pipeline import ( "context" "crypto/rand" "encoding/hex" "fmt" "os" "path/filepath" "time" "local-review-harness/internal/analyzers" "local-review-harness/internal/config" "local-review-harness/internal/git" "local-review-harness/internal/llm" "local-review-harness/internal/reporters" "local-review-harness/internal/scanner" ) // Inputs is the bag the CLI passes to the pipeline. type Inputs struct { RepoPath string ReviewProfile config.ReviewProfile ModelProfile config.ModelProfile OutputDir string EmitScrum bool // true → also emit scrum-test/risk-register/sprint-backlog/acceptance-gates markdown EnableLLM bool // Phase C: actually call the model. Off by default — operators opt in. } // Result is what the CLI shows the operator. type Result struct { OutputFiles []string ExitCode int // 0=ok, 66=degraded, 65=runtime } // RunRepo executes Phase 0 (intake), Phase 1 (static), Phase 4 (report). // Phases 2 (LLM) + 3 (validate) + 5 (memory) ship later — every phase // not run lands in receipts as "skipped" or "degraded". func RunRepo(ctx context.Context, in Inputs) (*Result, error) { startedAt := time.Now().UTC() runID := newRunID(startedAt) res := &Result{ExitCode: 0} receipt := reporters.Receipt{ RunID: runID, RepoPath: in.RepoPath, StartedAt: startedAt.Format(time.RFC3339Nano), } // --- Phase 0: repo intake --- scan, err := scanner.Walk(in.RepoPath, true) scanPhase := reporters.PhaseReceipt{Name: "repo_intake", Status: "ok"} if err != nil { scanPhase.Status = "failed" scanPhase.Errors = append(scanPhase.Errors, err.Error()) receipt.Phases = append(receipt.Phases, scanPhase) res.ExitCode = 65 // Even on scan failure, write the receipt so operators can // see what blew up. _ = writeReceipt(in.OutputDir, &receipt, startedAt, nil) return res, err } gi := git.Inspect(ctx, in.RepoPath) intake := reporters.BuildIntake(scan, gi) intakePath := filepath.Join(in.OutputDir, "repo-intake.json") if sha, err := reporters.WriteJSON(intakePath, intake); err != nil { scanPhase.Status = "failed" scanPhase.Errors = append(scanPhase.Errors, err.Error()) } else { scanPhase.OutputFiles = []string{"repo-intake.json"} scanPhase.OutputHash = sha } if !gi.HasGit { scanPhase.Status = "degraded" scanPhase.Errors = append(scanPhase.Errors, "no git metadata (not a git repo or git unavailable)") if res.ExitCode == 0 { res.ExitCode = 66 } } receipt.Phases = append(receipt.Phases, scanPhase) res.OutputFiles = append(res.OutputFiles, "repo-intake.json") // --- Phase 1: static scan --- findings := analyzers.Run(scan, in.ReviewProfile) staticOut := reporters.StaticFindings{ GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano), Findings: findings, Summary: reporters.SummarizeFindings(findings), } staticPath := filepath.Join(in.OutputDir, "static-findings.json") staticPhase := reporters.PhaseReceipt{Name: "static_scan", Status: "ok"} if sha, err := reporters.WriteJSON(staticPath, staticOut); err != nil { staticPhase.Status = "failed" staticPhase.Errors = append(staticPhase.Errors, err.Error()) res.ExitCode = 65 } else { staticPhase.OutputFiles = []string{"static-findings.json"} staticPhase.OutputHash = sha } receipt.Phases = append(receipt.Phases, staticPhase) res.OutputFiles = append(res.OutputFiles, "static-findings.json") // --- Phase 2: LLM review (Phase C) --- llmDegraded := true llmPhase := reporters.PhaseReceipt{Name: "llm_review", Status: "skipped"} if !in.EnableLLM { llmPhase.Errors = append(llmPhase.Errors, "LLM review not requested (pass --enable-llm to opt in)") } else { llmFindings, raw, llmErr := runLLMReview(ctx, scan, in) // Always save raw output, even on failure — operator forensics. rawPath := filepath.Join(in.OutputDir, "llm-findings.raw.json") if _, err := reporters.WriteJSON(rawPath, raw); err == nil { llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.raw.json") } if llmErr != nil { llmPhase.Status = "degraded" llmPhase.Errors = append(llmPhase.Errors, llmErr.Error()) } else { normalized := reporters.StaticFindings{ GeneratedAt: time.Now().UTC().Format(time.RFC3339Nano), Findings: llmFindings, Summary: reporters.SummarizeFindings(llmFindings), } if sha, err := reporters.WriteJSON(filepath.Join(in.OutputDir, "llm-findings.normalized.json"), normalized); err == nil { llmPhase.OutputFiles = append(llmPhase.OutputFiles, "llm-findings.normalized.json") llmPhase.OutputHash = sha llmPhase.Status = "ok" llmDegraded = false findings = append(findings, llmFindings...) res.OutputFiles = append(res.OutputFiles, "llm-findings.raw.json", "llm-findings.normalized.json") } else { llmPhase.Status = "failed" llmPhase.Errors = append(llmPhase.Errors, "write normalized: "+err.Error()) } } } if llmDegraded && res.ExitCode == 0 { res.ExitCode = 66 } receipt.Phases = append(receipt.Phases, llmPhase) // --- Phase 3: validation (Phase D — also deferred) --- receipt.Phases = append(receipt.Phases, reporters.PhaseReceipt{ Name: "validation", Status: "skipped", Errors: []string{"Phase D not implemented in MVP — depends on Phase C"}, }) // --- Phase 4: report generation (markdown) --- if in.EmitScrum { reportPhase := reporters.PhaseReceipt{Name: "report_generation", Status: "ok"} writers := []struct { name string fn func() error }{ {"scrum-test.md", func() error { return reporters.WriteScrumTest(filepath.Join(in.OutputDir, "scrum-test.md"), intake, findings, llmDegraded) }}, {"risk-register.md", func() error { return reporters.WriteRiskRegister(filepath.Join(in.OutputDir, "risk-register.md"), findings) }}, {"claim-coverage-table.md", func() error { return reporters.WriteClaimCoverage(filepath.Join(in.OutputDir, "claim-coverage-table.md"), findings) }}, {"sprint-backlog.md", func() error { return reporters.WriteSprintBacklog(filepath.Join(in.OutputDir, "sprint-backlog.md"), staticOut.Summary) }}, {"acceptance-gates.md", func() error { return reporters.WriteAcceptanceGates(filepath.Join(in.OutputDir, "acceptance-gates.md"), staticOut.Summary) }}, } for _, w := range writers { if err := w.fn(); err != nil { reportPhase.Status = "failed" reportPhase.Errors = append(reportPhase.Errors, w.name+": "+err.Error()) res.ExitCode = 65 continue } reportPhase.OutputFiles = append(reportPhase.OutputFiles, w.name) res.OutputFiles = append(res.OutputFiles, w.name) } receipt.Phases = append(receipt.Phases, reportPhase) } // --- Phase 5: memory (Phase E — deferred) --- receipt.Phases = append(receipt.Phases, reporters.PhaseReceipt{ Name: "memory_update", Status: "skipped", Errors: []string{"Phase E not implemented in MVP"}, }) // --- Receipt --- receipt.Summary = staticOut.Summary if err := writeReceipt(in.OutputDir, &receipt, startedAt, nil); err != nil { return res, err } res.OutputFiles = append(res.OutputFiles, "receipts.json") return res, nil } func writeReceipt(outputDir string, r *reporters.Receipt, startedAt time.Time, _ error) error { r.FinishedAt = time.Now().UTC().Format(time.RFC3339Nano) _ = startedAt // present for future timing fields _, err := reporters.WriteJSON(filepath.Join(outputDir, "receipts.json"), r) return err } // runLLMReview chunks the scan into per-file inputs, calls the // reviewer, and aggregates parsed findings + raw outputs. Returns // (findings, raw-outputs-array-for-receipts, error). The error is // non-nil only when the provider is fundamentally unreachable; // per-chunk parse failures land as ReviewOutput.Error and don't // fail the whole phase. func runLLMReview(ctx context.Context, scan *scanner.Result, in Inputs) ([]analyzers.Finding, []llm.ReviewOutput, error) { prov := llm.NewOllama(in.ModelProfile.BaseURL, time.Duration(in.ModelProfile.TimeoutSeconds)*time.Second) hctx, hcancel := context.WithTimeout(ctx, 5*time.Second) defer hcancel() hs := prov.HealthCheck(hctx, in.ModelProfile.Model, in.ModelProfile.FallbackModel) if !hs.ServerAvailable { return nil, nil, fmt.Errorf("ollama unreachable at %s — Phase 2 cannot run", in.ModelProfile.BaseURL) } if !hs.PrimaryModelAvailable && !hs.FallbackModelAvailable { return nil, nil, fmt.Errorf("neither primary %q nor fallback %q loaded in Ollama", in.ModelProfile.Model, in.ModelProfile.FallbackModel) } model := in.ModelProfile.Model if !hs.PrimaryModelAvailable { model = in.ModelProfile.FallbackModel } r := llm.NewReviewer(prov, model, llm.CompleteOptions{ Temperature: in.ModelProfile.Temperature, MaxTokens: 0, // let model decide TimeoutSeconds: in.ModelProfile.TimeoutSeconds, }) chunks := llm.ChunkInputsFromScan(scan, in.ReviewProfile.Limits.MaxFileBytes, in.ReviewProfile.Limits.MaxLLMChunkChars, func(abs string) string { b, err := os.ReadFile(abs) if err != nil { return "" } return string(b) }) outputs := r.ReviewBatch(ctx, chunks) findings := []analyzers.Finding{} for _, o := range outputs { findings = append(findings, o.Findings...) } return findings, outputs, nil } func newRunID(t time.Time) string { var rb [4]byte _, _ = rand.Read(rb[:]) return t.UTC().Format("20060102T150405") + "-" + hex.EncodeToString(rb[:]) }