diff --git a/docs/exec-plans/completed/step-3-of-10-implement-the-wtl-engine-and-phaseddeliverypolicy-for-the-git-impac.md b/docs/exec-plans/completed/step-3-of-10-implement-the-wtl-engine-and-phaseddeliverypolicy-for-the-git-impac.md new file mode 100644 index 0000000..622175a --- /dev/null +++ b/docs/exec-plans/completed/step-3-of-10-implement-the-wtl-engine-and-phaseddeliverypolicy-for-the-git-impac.md @@ -0,0 +1,45 @@ +# Step 3 of 10 - Implement the WTL engine and PhasedDeliveryPolicy for the git-impact tool + +## Goal +Implement the git-impact WTL execution engine and phased-delivery control flow so analysis progresses across Source Check, Collect, Link, Score, and Report with deterministic directive handling, observer lifecycle hooks, wait/resume behavior, and test coverage. + +## Background +- `SPEC.md` section 3 defines the git-impact architecture as a single WTL run split into ordered phases. +- `SPEC.md` section 3.1 defines phased directives and explicit `wait` behavior where terminal user input resumes the run. +- `internal/wtl` already provides the repository's baseline engine/policy loop pattern and testing style for directives and loop exhaustion. +- This step introduces `internal/gitimpact` as the git-impact-specific phased engine surface for later CLI and TUI integration. + +## Milestones +| ID | Milestone | Status | Exit criteria | +| --- | --- | --- | --- | +| M1 | Define gitimpact engine contract | completed | `internal/gitimpact/engine.go` defines phases, directives, turn result, handler interface, run context, and analysis data structs with compile-safe types. | +| M2 | Implement phased-delivery loop mechanics | completed | `Engine.Run` executes ordered phase progression with retry limits, continue semantics, wait/resume callback flow, and completion/exhaustion paths. | +| M3 | Add observer integration surface | completed | Observer callbacks are wired for turn start, phase advance, wait entered/resolved, run complete, and run exhausted lifecycle points. | +| M4 | Add tests for core control flow | completed | `internal/gitimpact/engine_test.go` validates phase progression, retry logic, and wait handling behavior with deterministic assertions. | +| M5 | Verify repository health | completed | `go build ./...` and `go test ./...` both succeed with the new git-impact engine package included. | + +## Current progress +- Implemented `internal/gitimpact/engine.go` with phase/directive enums, run context/data structs, phased engine loop, retry cap (default 3), wait/resume callback flow, and analysis result completion. +- Added `internal/gitimpact/observer.go` with observer lifecycle hooks and `WaitHandler`. +- Added `internal/gitimpact/engine_test.go` coverage for ordered phase progression, retry exhaustion, and wait handling with observer assertions. +- Verification completed: + - `GOCACHE=/tmp/go-build-cache go build ./...` + - `GOCACHE=/tmp/go-build-cache go test ./...` + +## Key decisions +- Mirror the existing `internal/wtl` pattern for loop mechanics while specializing directives/phases for git-impact. +- Keep wait handling callback-driven (`WaitHandler`) so later TUI or terminal prompt adapters can plug in without changing engine logic. +- Use explicit ordered phase list in engine control flow instead of implicit policy state to keep progression auditable. +- Keep retry handling phase-local with a fixed maximum of 3 retries per phase directive path. +- Emit observer lifecycle callbacks directly from the engine (`OnTurnStarted`, `OnPhaseAdvanced`, `OnWaitEntered`, `OnWaitResolved`, `OnRunCompleted`, `OnRunExhausted`) to support Bubble Tea bridge wiring in later steps. + +## Remaining issues +- Exact runtime behavior for non-terminal `DirectiveContinue` in a phase without external state mutation may need refinement in later steps if handlers do not naturally converge. +- Domain types are introduced minimally in this step and may be expanded when collectors/linkers/scorers are implemented. + +## Links +- Product spec: `SPEC.md` +- WTL package reference: `internal/wtl/engine.go` +- WTL policy reference: `internal/wtl/policy.go` +- WTL tests reference: `internal/wtl/engine_test.go` +- Plans policy: `docs/PLANS.md` diff --git a/internal/gitimpact/engine.go b/internal/gitimpact/engine.go new file mode 100644 index 0000000..3331f39 --- /dev/null +++ b/internal/gitimpact/engine.go @@ -0,0 +1,305 @@ +package gitimpact + +import ( + "context" + "errors" + "fmt" +) + +const defaultMaxRetries = 3 + +// Phase identifies the current stage of the git-impact analysis run. +type Phase string + +const ( + PhaseSourceCheck Phase = "source_check" + PhaseCollect Phase = "collect" + PhaseLink Phase = "link" + PhaseScore Phase = "score" + PhaseReport Phase = "report" +) + +// Directive instructs the engine what to do after a turn. +type Directive string + +const ( + DirectiveAdvancePhase Directive = "advance_phase" + DirectiveContinue Directive = "continue" + DirectiveRetry Directive = "retry" + DirectiveWait Directive = "wait" + DirectiveComplete Directive = "complete" +) + +// TurnResult is the phase handler output consumed by the engine loop. +type TurnResult struct { + Directive Directive + WaitMessage string + Output string + Error error +} + +// PhaseHandler executes one phase turn. +type PhaseHandler interface { + Handle(ctx context.Context, runCtx *RunContext) (*TurnResult, error) +} + +// Config holds analysis configuration. +type Config struct{} + +// AnalysisContext holds request context for one run. +type AnalysisContext struct { + LastWaitResponse string +} + +// VelenClient encapsulates Velen interactions. +type VelenClient struct{} + +// PR is a pull request record. +type PR struct{} + +// Release is a release record. +type Release struct{} + +// Deployment is an inferred deployment record. +type Deployment struct{} + +// FeatureGroup is a grouped feature result. +type FeatureGroup struct{} + +// AmbiguousDeployment captures unresolved deployment mappings. +type AmbiguousDeployment struct{} + +// PRImpact stores scored impact per PR. +type PRImpact struct{} + +// ContributorStats stores contributor rollups. +type ContributorStats struct{} + +// RunContext is mutable state shared across phase turns. +type RunContext struct { + Config *Config + AnalysisCtx *AnalysisContext + VelenClient *VelenClient + Phase Phase + Iteration int + CollectedData *CollectedData + LinkedData *LinkedData + ScoredData *ScoredData +} + +// CollectedData stores source collection outputs. +type CollectedData struct { + PRs []PR + Tags []string + Releases []Release + RawOutput string +} + +// LinkedData stores deployment-linking outputs. +type LinkedData struct { + Deployments []Deployment + FeatureGroups []FeatureGroup + AmbiguousItems []AmbiguousDeployment +} + +// ScoredData stores scoring outputs. +type ScoredData struct { + PRImpacts []PRImpact + ContributorStats []ContributorStats +} + +// AnalysisResult is the terminal run payload. +type AnalysisResult struct { + Output string + Phase Phase + Iteration int + CollectedData *CollectedData + LinkedData *LinkedData + ScoredData *ScoredData +} + +// Engine executes ordered git-impact phases using phased-delivery directives. +type Engine struct { + Handlers map[Phase]PhaseHandler + Observer Observer + WaitHandler WaitHandler + MaxRetries int +} + +var phaseOrder = []Phase{ + PhaseSourceCheck, + PhaseCollect, + PhaseLink, + PhaseScore, + PhaseReport, +} + +// Run executes the phased-delivery policy for one analysis run. +func (e *Engine) Run(ctx context.Context, runCtx *RunContext) (*AnalysisResult, error) { + if runCtx == nil { + err := errors.New("run context is required") + e.notifyRunExhausted(err) + return nil, err + } + + phaseIndex, err := resolveStartPhase(runCtx.Phase) + if err != nil { + e.notifyRunExhausted(err) + return nil, err + } + + maxRetries := e.MaxRetries + if maxRetries <= 0 { + maxRetries = defaultMaxRetries + } + + retries := 0 + for { + if err := ctx.Err(); err != nil { + e.notifyRunExhausted(err) + return nil, err + } + + phase := phaseOrder[phaseIndex] + runCtx.Phase = phase + runCtx.Iteration++ + e.notifyTurnStarted(phase, runCtx.Iteration) + + handler, ok := e.Handlers[phase] + if !ok || handler == nil { + err := fmt.Errorf("no handler registered for phase %q", phase) + e.notifyRunExhausted(err) + return nil, err + } + + turnResult, err := handler.Handle(ctx, runCtx) + if err != nil { + e.notifyRunExhausted(err) + return nil, err + } + if turnResult == nil { + err := fmt.Errorf("phase %q returned nil result", phase) + e.notifyRunExhausted(err) + return nil, err + } + + switch turnResult.Directive { + case DirectiveAdvancePhase: + retries = 0 + nextIndex := phaseIndex + 1 + if nextIndex >= len(phaseOrder) { + result := newAnalysisResult(runCtx, turnResult.Output) + e.notifyRunCompleted(result) + return result, nil + } + from := phaseOrder[phaseIndex] + to := phaseOrder[nextIndex] + phaseIndex = nextIndex + runCtx.Phase = to + e.notifyPhaseAdvanced(from, to) + case DirectiveComplete: + result := newAnalysisResult(runCtx, turnResult.Output) + e.notifyRunCompleted(result) + return result, nil + case DirectiveRetry: + if retries >= maxRetries { + err := fmt.Errorf("phase %q exceeded max retries (%d)", phase, maxRetries) + if turnResult.Error != nil { + err = fmt.Errorf("%w: %v", err, turnResult.Error) + } + e.notifyRunExhausted(err) + return nil, err + } + retries++ + case DirectiveWait: + if e.WaitHandler == nil { + err := errors.New("wait directive received but wait handler is not configured") + e.notifyRunExhausted(err) + return nil, err + } + e.notifyWaitEntered(turnResult.WaitMessage) + response, err := e.WaitHandler(turnResult.WaitMessage) + if err != nil { + e.notifyRunExhausted(err) + return nil, err + } + if runCtx.AnalysisCtx == nil { + runCtx.AnalysisCtx = &AnalysisContext{} + } + runCtx.AnalysisCtx.LastWaitResponse = response + e.notifyWaitResolved(response) + retries = 0 + case DirectiveContinue: + retries = 0 + default: + err := fmt.Errorf("unsupported directive %q", turnResult.Directive) + e.notifyRunExhausted(err) + return nil, err + } + } +} + +func resolveStartPhase(start Phase) (int, error) { + if start == "" { + return 0, nil + } + for i, phase := range phaseOrder { + if phase == start { + return i, nil + } + } + return 0, fmt.Errorf("unsupported start phase %q", start) +} + +func newAnalysisResult(runCtx *RunContext, output string) *AnalysisResult { + return &AnalysisResult{ + Output: output, + Phase: runCtx.Phase, + Iteration: runCtx.Iteration, + CollectedData: runCtx.CollectedData, + LinkedData: runCtx.LinkedData, + ScoredData: runCtx.ScoredData, + } +} + +func (e *Engine) notifyTurnStarted(phase Phase, iteration int) { + if e.Observer == nil { + return + } + e.Observer.OnTurnStarted(phase, iteration) +} + +func (e *Engine) notifyPhaseAdvanced(from, to Phase) { + if e.Observer == nil { + return + } + e.Observer.OnPhaseAdvanced(from, to) +} + +func (e *Engine) notifyWaitEntered(message string) { + if e.Observer == nil { + return + } + e.Observer.OnWaitEntered(message) +} + +func (e *Engine) notifyWaitResolved(response string) { + if e.Observer == nil { + return + } + e.Observer.OnWaitResolved(response) +} + +func (e *Engine) notifyRunCompleted(result *AnalysisResult) { + if e.Observer == nil { + return + } + e.Observer.OnRunCompleted(result) +} + +func (e *Engine) notifyRunExhausted(err error) { + if e.Observer == nil { + return + } + e.Observer.OnRunExhausted(err) +} diff --git a/internal/gitimpact/engine_test.go b/internal/gitimpact/engine_test.go new file mode 100644 index 0000000..bbc9d36 --- /dev/null +++ b/internal/gitimpact/engine_test.go @@ -0,0 +1,196 @@ +package gitimpact + +import ( + "context" + "errors" + "strings" + "testing" +) + +type phaseHandlerFunc func(ctx context.Context, runCtx *RunContext) (*TurnResult, error) + +func (fn phaseHandlerFunc) Handle(ctx context.Context, runCtx *RunContext) (*TurnResult, error) { + return fn(ctx, runCtx) +} + +type recordingObserver struct { + turns []Phase + iterations []int + advances [][2]Phase + waitMessages []string + waitResponses []string + completed *AnalysisResult + exhaustedErr error +} + +func (o *recordingObserver) OnTurnStarted(phase Phase, iteration int) { + o.turns = append(o.turns, phase) + o.iterations = append(o.iterations, iteration) +} + +func (o *recordingObserver) OnPhaseAdvanced(from, to Phase) { + o.advances = append(o.advances, [2]Phase{from, to}) +} + +func (o *recordingObserver) OnWaitEntered(message string) { + o.waitMessages = append(o.waitMessages, message) +} + +func (o *recordingObserver) OnWaitResolved(response string) { + o.waitResponses = append(o.waitResponses, response) +} + +func (o *recordingObserver) OnRunCompleted(result *AnalysisResult) { + o.completed = result +} + +func (o *recordingObserver) OnRunExhausted(err error) { + o.exhaustedErr = err +} + +func TestEngineRun_PhaseProgression(t *testing.T) { + t.Parallel() + + observer := &recordingObserver{} + engine := &Engine{ + Observer: observer, + Handlers: map[Phase]PhaseHandler{ + PhaseSourceCheck: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseCollect: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseLink: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseScore: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseReport: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveComplete, Output: "analysis complete"}, nil + }), + }, + } + + runCtx := &RunContext{} + result, err := engine.Run(context.Background(), runCtx) + if err != nil { + t.Fatalf("run returned error: %v", err) + } + if result == nil { + t.Fatal("expected non-nil result") + } + if result.Output != "analysis complete" { + t.Fatalf("expected complete output, got %q", result.Output) + } + if result.Phase != PhaseReport { + t.Fatalf("expected final phase %q, got %q", PhaseReport, result.Phase) + } + if result.Iteration != 5 { + t.Fatalf("expected 5 iterations, got %d", result.Iteration) + } + if len(observer.turns) != 5 { + t.Fatalf("expected 5 turn-start events, got %d", len(observer.turns)) + } + if len(observer.advances) != 4 { + t.Fatalf("expected 4 phase-advanced events, got %d", len(observer.advances)) + } + if observer.exhaustedErr != nil { + t.Fatalf("did not expect exhausted event, got %v", observer.exhaustedErr) + } + if observer.completed == nil { + t.Fatal("expected run-completed event") + } +} + +func TestEngineRun_RetryExhaustion(t *testing.T) { + t.Parallel() + + observer := &recordingObserver{} + collectCalls := 0 + engine := &Engine{ + Observer: observer, + MaxRetries: 3, + Handlers: map[Phase]PhaseHandler{ + PhaseSourceCheck: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseCollect: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + collectCalls++ + return &TurnResult{Directive: DirectiveRetry, Error: errors.New("transient failure")}, nil + }), + }, + } + + _, err := engine.Run(context.Background(), &RunContext{}) + if err == nil { + t.Fatal("expected retry exhaustion error") + } + if !strings.Contains(err.Error(), "exceeded max retries") { + t.Fatalf("expected max retries error, got %v", err) + } + if collectCalls != 4 { + t.Fatalf("expected 4 collect attempts (3 retries + 1 exhausted attempt), got %d", collectCalls) + } + if observer.exhaustedErr == nil { + t.Fatal("expected exhausted observer event") + } +} + +func TestEngineRun_WaitHandling(t *testing.T) { + t.Parallel() + + observer := &recordingObserver{} + sourceCheckCalls := 0 + waitMessages := make([]string, 0, 1) + engine := &Engine{ + Observer: observer, + WaitHandler: func(message string) (string, error) { + waitMessages = append(waitMessages, message) + return "proceed", nil + }, + Handlers: map[Phase]PhaseHandler{ + PhaseSourceCheck: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + sourceCheckCalls++ + if sourceCheckCalls == 1 { + return &TurnResult{Directive: DirectiveWait, WaitMessage: "confirm source mapping"}, nil + } + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseCollect: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseLink: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseScore: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveAdvancePhase}, nil + }), + PhaseReport: phaseHandlerFunc(func(context.Context, *RunContext) (*TurnResult, error) { + return &TurnResult{Directive: DirectiveComplete, Output: "done"}, nil + }), + }, + } + + runCtx := &RunContext{} + result, err := engine.Run(context.Background(), runCtx) + if err != nil { + t.Fatalf("run returned error: %v", err) + } + if result.Output != "done" { + t.Fatalf("expected done output, got %q", result.Output) + } + if len(waitMessages) != 1 || waitMessages[0] != "confirm source mapping" { + t.Fatalf("unexpected wait handler messages: %#v", waitMessages) + } + if len(observer.waitMessages) != 1 || observer.waitMessages[0] != "confirm source mapping" { + t.Fatalf("unexpected wait-entered events: %#v", observer.waitMessages) + } + if len(observer.waitResponses) != 1 || observer.waitResponses[0] != "proceed" { + t.Fatalf("unexpected wait-resolved events: %#v", observer.waitResponses) + } + if runCtx.AnalysisCtx == nil || runCtx.AnalysisCtx.LastWaitResponse != "proceed" { + t.Fatalf("expected wait response stored in analysis context, got %#v", runCtx.AnalysisCtx) + } +} diff --git a/internal/gitimpact/observer.go b/internal/gitimpact/observer.go new file mode 100644 index 0000000..d27beb5 --- /dev/null +++ b/internal/gitimpact/observer.go @@ -0,0 +1,14 @@ +package gitimpact + +// Observer receives run lifecycle notifications from the phased engine. +type Observer interface { + OnTurnStarted(phase Phase, iteration int) + OnPhaseAdvanced(from, to Phase) + OnWaitEntered(message string) + OnWaitResolved(response string) + OnRunCompleted(result *AnalysisResult) + OnRunExhausted(err error) +} + +// WaitHandler requests external input when the engine enters wait state. +type WaitHandler func(message string) (string, error)