From 4254e9a73ead54b1262f6a01cd8f036c780d97e9 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Tue, 10 Dec 2024 23:36:39 -0500 Subject: [PATCH 1/9] git pkg: add option to pull --rebase before push Signed-off-by: Kent Rancourt --- internal/controller/git/errors.go | 14 ++++++ internal/controller/git/errors_test.go | 44 +++++++++++++++++++ internal/controller/git/work_tree.go | 59 +++++++++++++++++++++++--- 3 files changed, 112 insertions(+), 5 deletions(-) create mode 100644 internal/controller/git/errors.go create mode 100644 internal/controller/git/errors_test.go diff --git a/internal/controller/git/errors.go b/internal/controller/git/errors.go new file mode 100644 index 000000000..440280cf2 --- /dev/null +++ b/internal/controller/git/errors.go @@ -0,0 +1,14 @@ +package git + +import ( + "errors" +) + +// ErrMergeConflict is returned when a merge conflict occurs. +var ErrMergeConflict = errors.New("merge conflict") + +// IsMergeConflict returns true if the error is a merge conflict or wraps one +// and false otherwise. +func IsMergeConflict(err error) bool { + return errors.Is(err, ErrMergeConflict) +} diff --git a/internal/controller/git/errors_test.go b/internal/controller/git/errors_test.go new file mode 100644 index 000000000..6facdba44 --- /dev/null +++ b/internal/controller/git/errors_test.go @@ -0,0 +1,44 @@ +package git + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsMergeConflict(t *testing.T) { + testCases := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "not a a merge conflict", + err: errors.New("something went wrong"), + expected: false, + }, + { + name: "a merge conflict", + err: ErrMergeConflict, + expected: true, + }, + { + name: "a wrapped merge conflict", + err: fmt.Errorf("an error occurred: %w", ErrMergeConflict), + expected: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + actual := IsMergeConflict(testCase.err) + require.Equal(t, testCase.expected, actual) + }) + } +} diff --git a/internal/controller/git/work_tree.go b/internal/controller/git/work_tree.go index 9ff77ec43..673c9d71a 100644 --- a/internal/controller/git/work_tree.go +++ b/internal/controller/git/work_tree.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "os" + "path/filepath" "strings" "time" @@ -57,6 +58,9 @@ type WorkTree interface { GetDiffPathsForCommitID(commitID string) ([]string, error) // IsAncestor returns true if parent branch is an ancestor of child IsAncestor(parent string, child string) (bool, error) + // IsRebasing returns a bool indicating whether the working tree is currently + // in the middle of a rebase operation. + IsRebasing() (bool, error) // LastCommitID returns the ID (sha) of the most recent commit to the current // branch. LastCommitID() (string, error) @@ -316,6 +320,25 @@ func (w *workTree) IsAncestor(parent string, child string) (bool, error) { return false, fmt.Errorf("error testing ancestry of branches %q, %q: %w", parent, child, err) } +func (w *workTree) IsRebasing() (bool, error) { + res, err := libExec.Exec(w.buildGitCommand("rev-parse", "--git-path", "rebase-merge")) + if err != nil { + return false, fmt.Errorf("error determining rebase status: %w", err) + } + rebaseMerge := filepath.Join(w.dir, strings.TrimSpace(string(res))) + if _, err = os.Stat(rebaseMerge); err == nil { + return true, nil + } + if res, err = libExec.Exec(w.buildGitCommand("rev-parse", "--git-path", "rebase-apply")); err != nil { + return false, fmt.Errorf("error determining rebase status: %w", err) + } + rebaseApply := filepath.Join(w.dir, strings.TrimSpace(string(res))) + if _, err = os.Stat(rebaseApply); err == nil { + return true, nil + } + return false, nil +} + func (w *workTree) LastCommitID() (string, error) { shaBytes, err := libExec.Exec(w.buildGitCommand("rev-parse", "HEAD")) if err != nil { @@ -481,18 +504,44 @@ type PushOptions struct { // TargetBranch specifies the branch to push to. If empty, the current branch // will be pushed to a remote branch by the same name. TargetBranch string + // PullRebase indicates whether to pull and rebase before pushing. This can + // be useful when pushing changes to a remote branch that has been updated + // in the time since the local branch was last pulled. + PullRebase bool } func (w *workTree) Push(opts *PushOptions) error { if opts == nil { opts = &PushOptions{} } - args := []string{"push", "origin"} - if opts.TargetBranch != "" { - args = append(args, fmt.Sprintf("HEAD:%s", opts.TargetBranch)) - } else { - args = append(args, "HEAD") + targetBranch := opts.TargetBranch + if targetBranch == "" { + var err error + if targetBranch, err = w.CurrentBranch(); err != nil { + return err + } + } + if opts.PullRebase { + exists, err := w.RemoteBranchExists(targetBranch) + if err != nil { + return err + } + // We only want to pull and rebase if the remote branch exists. + if exists { + if _, err = libExec.Exec(w.buildGitCommand("pull", "--rebase", "origin", targetBranch)); err != nil { + // The error we're most concerned with is a merge conflict requiring + // manual resolution, because it's an error that no amount of retries + // will fix. If we find that a rebase is in progress, this is what + // has happened. + if isRebasing, isRebasingErr := w.IsRebasing(); isRebasingErr == nil && isRebasing { + return ErrMergeConflict + } + // If we get to here, the error isn't a merge conflict. + return fmt.Errorf("error pulling and rebasing branch: %w", err) + } + } } + args := []string{"push", "origin", fmt.Sprintf("HEAD:%s", targetBranch)} if opts.Force { args = append(args, "--force") } From 0af45af8fbc078cd3b1c2198a937f2b33fd74469 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Tue, 10 Dec 2024 23:37:55 -0500 Subject: [PATCH 2/9] directives: add concept of a terminal (non-retryable) error Signed-off-by: Kent Rancourt --- internal/directives/errors.go | 24 +++++++++++++++ internal/directives/errors_test.go | 47 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 internal/directives/errors.go create mode 100644 internal/directives/errors_test.go diff --git a/internal/directives/errors.go b/internal/directives/errors.go new file mode 100644 index 000000000..92ed0133d --- /dev/null +++ b/internal/directives/errors.go @@ -0,0 +1,24 @@ +package directives + +import "errors" + +// terminalError wraps another error to indicate to the step execution engine +// that the step that produced the error should not be retried. +type terminalError struct { + err error +} + +// Error implements the error interface. +func (e *terminalError) Error() string { + if e.err == nil { + return "" + } + return e.err.Error() +} + +// isTerminal returns true if the error is a terminal error or wraps one and +// false otherwise. +func isTerminal(err error) bool { + te := &terminalError{} + return errors.As(err, &te) +} diff --git a/internal/directives/errors_test.go b/internal/directives/errors_test.go new file mode 100644 index 000000000..3159f7e0b --- /dev/null +++ b/internal/directives/errors_test.go @@ -0,0 +1,47 @@ +package directives + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsTerminal(t *testing.T) { + testCases := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "not a terminal error", + err: errors.New("something went wrong"), + expected: false, + }, + { + name: "a terminal error", + err: &terminalError{err: errors.New("something went wrong")}, + expected: true, + }, + { + name: "a wrapped terminal error", + err: fmt.Errorf( + "an error occurred: %w", + &terminalError{err: errors.New("something went wrong")}, + ), + expected: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + actual := isTerminal(testCase.err) + require.Equal(t, testCase.expected, actual) + }) + } +} From ed48355652b8bb51ba3e16d4ee8ed5ef50cbe9aa Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Tue, 10 Dec 2024 23:38:52 -0500 Subject: [PATCH 3/9] make steps return terminal errors under specific circumstances Signed-off-by: Kent Rancourt --- internal/directives/git_pr_waiter.go | 6 ++---- internal/directives/git_pr_waiter_test.go | 4 ++-- internal/directives/git_pusher.go | 13 +++++++++++-- internal/directives/http_requester.go | 3 ++- internal/directives/http_requester_test.go | 2 ++ 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/internal/directives/git_pr_waiter.go b/internal/directives/git_pr_waiter.go index 748bdc581..565d15459 100644 --- a/internal/directives/git_pr_waiter.go +++ b/internal/directives/git_pr_waiter.go @@ -114,10 +114,8 @@ func (g *gitPRWaiter) runPromotionStep( return PromotionStepResult{Status: kargoapi.PromotionPhaseRunning}, nil } if !pr.Merged { - return PromotionStepResult{ - Status: kargoapi.PromotionPhaseFailed, - Message: fmt.Sprintf("pull request %d was closed without being merged", prNumber), - }, err + return PromotionStepResult{Status: kargoapi.PromotionPhaseFailed}, + &terminalError{err: fmt.Errorf("pull request %d was closed without being merged", prNumber)} } return PromotionStepResult{ Status: kargoapi.PromotionPhaseSucceeded, diff --git a/internal/directives/git_pr_waiter_test.go b/internal/directives/git_pr_waiter_test.go index e4610f374..f56507532 100644 --- a/internal/directives/git_pr_waiter_test.go +++ b/internal/directives/git_pr_waiter_test.go @@ -150,8 +150,8 @@ func Test_gitPRWaiter_runPromotionStep(t *testing.T) { }, }, assertions: func(t *testing.T, res PromotionStepResult, err error) { - require.NoError(t, err) - require.Contains(t, res.Message, "closed without being merged") + require.ErrorContains(t, err, "closed without being merged") + require.True(t, isTerminal(err)) require.Equal(t, kargoapi.PromotionPhaseFailed, res.Status) }, }, diff --git a/internal/directives/git_pusher.go b/internal/directives/git_pusher.go index 9f835e72a..262968182 100644 --- a/internal/directives/git_pusher.go +++ b/internal/directives/git_pusher.go @@ -106,8 +106,11 @@ func (g *gitPushPusher) runPromotionStep( fmt.Errorf("error loading working tree from %s: %w", cfg.Path, err) } pushOpts := &git.PushOptions{ - // Start with whatever was specified in the config, which may be empty + // Start with whatever was specified in the config, which may be empty. TargetBranch: cfg.TargetBranch, + // Attempt to rebase on top of the state of the remote branch to help + // avoid conflicts. + PullRebase: true, } // If we're supposed to generate a target branch name, do so if cfg.GenerateTargetBranch { @@ -116,7 +119,7 @@ func (g *gitPushPusher) runPromotionStep( } targetBranch := pushOpts.TargetBranch if targetBranch == "" { - // If retBranch is still empty, we want to set it to the current branch + // If targetBranch is still empty, we want to set it to the current branch // because we will want to return the branch that was pushed to, but we // don't want to mess with the options any further. if targetBranch, err = workTree.CurrentBranch(); err != nil { @@ -125,6 +128,12 @@ func (g *gitPushPusher) runPromotionStep( } } if err = workTree.Push(pushOpts); err != nil { + if git.IsMergeConflict(err) { + // Special case: A merge conflict requires manual resolution and no amount + // of retries will fix that. + return PromotionStepResult{Status: kargoapi.PromotionPhaseFailed}, + &terminalError{err: err} + } return PromotionStepResult{Status: kargoapi.PromotionPhaseErrored}, fmt.Errorf("error pushing commits to remote: %w", err) } diff --git a/internal/directives/http_requester.go b/internal/directives/http_requester.go index 6423e26ba..ac9bd0dfc 100644 --- a/internal/directives/http_requester.go +++ b/internal/directives/http_requester.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -115,7 +116,7 @@ func (h *httpRequester) runPromotionStep( }, nil case failure: return PromotionStepResult{Status: kargoapi.PromotionPhaseFailed}, - fmt.Errorf("HTTP response met failure criteria") + &terminalError{err: errors.New("HTTP response met failure criteria")} default: return PromotionStepResult{Status: kargoapi.PromotionPhaseRunning}, nil } diff --git a/internal/directives/http_requester_test.go b/internal/directives/http_requester_test.go index f5bc8b320..6a1df26a0 100644 --- a/internal/directives/http_requester_test.go +++ b/internal/directives/http_requester_test.go @@ -334,6 +334,7 @@ func Test_httpRequester_runPromotionStep(t *testing.T) { }, assertions: func(t *testing.T, res PromotionStepResult, err error) { require.ErrorContains(t, err, "HTTP response met failure criteria") + require.True(t, isTerminal(err)) require.Equal(t, kargoapi.PromotionPhaseFailed, res.Status) }, }, @@ -346,6 +347,7 @@ func Test_httpRequester_runPromotionStep(t *testing.T) { }, assertions: func(t *testing.T, res PromotionStepResult, err error) { require.ErrorContains(t, err, "HTTP response met failure criteria") + require.True(t, isTerminal(err)) require.Equal(t, kargoapi.PromotionPhaseFailed, res.Status) }, }, From 6b588b2b0cacee6b144bc0fa369d8ebf9749fa6e Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Tue, 10 Dec 2024 23:39:24 -0500 Subject: [PATCH 4/9] refactor step execution to distinguish between terminal and retryable errors/failures Signed-off-by: Kent Rancourt --- internal/directives/simple_engine_promote.go | 79 +++++++++++++----- .../directives/simple_engine_promote_test.go | 82 +++++++++++++++++-- 2 files changed, 136 insertions(+), 25 deletions(-) diff --git a/internal/directives/simple_engine_promote.go b/internal/directives/simple_engine_promote.go index d262e83b8..0cf8fe2a4 100644 --- a/internal/directives/simple_engine_promote.go +++ b/internal/directives/simple_engine_promote.go @@ -112,29 +112,70 @@ func (e *SimpleEngine) executeSteps( // Execute the step result, err := e.executeStep(ctx, promoCtx, step, reg, workDir, state) + stepExecMeta.Status = result.Status + stepExecMeta.Message = result.Message + state[step.Alias] = result.Output + + switch result.Status { + case kargoapi.PromotionPhaseErrored, kargoapi.PromotionPhaseFailed, + kargoapi.PromotionPhaseRunning, kargoapi.PromotionPhaseSucceeded: + default: + // Deal with statuses that no step should have returned. + stepExecMeta.FinishedAt = ptr.To(metav1.Now()) + return PromotionResult{ + Status: kargoapi.PromotionPhaseErrored, + CurrentStep: i, + StepExecutionMetadata: stepExecMetas, + State: state, + HealthCheckSteps: healthChecks, + }, fmt.Errorf("step %d returned an invalid status", i) + } + + // Reconcile status and err... if err != nil { - // Let a hard error take precedence over the result status and message. - stepExecMeta.Status = kargoapi.PromotionPhaseErrored + if stepExecMeta.Status != kargoapi.PromotionPhaseFailed { + // All states other than Errored and Failed should be mutually exclusive + // with a hard error. If we got to here, a step has violated this + // assumption. We will prioritize the error over the status and change + // the status to Errored. + stepExecMeta.Status = kargoapi.PromotionPhaseErrored + } + // Let the hard error take precedence over the message. stepExecMeta.Message = err.Error() - } else { - stepExecMeta.Status = result.Status - stepExecMeta.Message = result.Message + } else if result.Status == kargoapi.PromotionPhaseErrored { + // A nil err should be mutually exclusive with an Errored status. If we + // got to here, a step has violated this assumption. We will prioritize + // the Errored status over the nil error and create an error. + message := stepExecMeta.Message + if message == "" { + message = "no details provided" + } + err = fmt.Errorf("step %d errored: %s", i, message) } - state[step.Alias] = result.Output - if stepExecMeta.Status == kargoapi.PromotionPhaseSucceeded { + // At this point, we've sorted out any discrepancies between the status and + // err. + + switch { + case stepExecMeta.Status == kargoapi.PromotionPhaseSucceeded: + // Best case scenario: The step succeeded. stepExecMeta.FinishedAt = ptr.To(metav1.Now()) if healthCheck := result.HealthCheckStep; healthCheck != nil { healthChecks = append(healthChecks, *healthCheck) } continue // Move on to the next step - } - - // Treat errors and logical failures the same for now. - // TODO(krancour): In the future, we should fail without retry for logical - // failures and unrecoverable errors and retry only those errors with a - // chance of recovery. - if stepExecMeta.Status != kargoapi.PromotionPhaseRunning { + case isTerminal(err): + // This is an unrecoverable error. + stepExecMeta.FinishedAt = ptr.To(metav1.Now()) + return PromotionResult{ + Status: stepExecMeta.Status, + CurrentStep: i, + StepExecutionMetadata: stepExecMetas, + State: state, + HealthCheckSteps: healthChecks, + }, fmt.Errorf("an unrecoverable error occurred: %w", err) + case err != nil: + // If we get to here, the error is POTENTIALLY recoverable. stepExecMeta.ErrorCount++ // Check if the error threshold has been met. errorThreshold := step.GetErrorThreshold(reg.Runner) @@ -154,8 +195,8 @@ func (e *SimpleEngine) executeSteps( } } - // If we get to here, the step is either running (waiting for some external - // condition to be met) or it errored/failed but did not meet the error + // If we get to here, the step is either Running (waiting for some external + // condition to be met) or it Errored/Failed but did not meet the error // threshold. Now we need to check if the timeout has elapsed. A nil timeout // or any non-positive timeout interval are treated as NO timeout, although // a nil timeout really shouldn't happen. @@ -172,8 +213,8 @@ func (e *SimpleEngine) executeSteps( }, fmt.Errorf("step %d timeout of %s has elapsed", i, timeout.String()) } - if stepExecMeta.Status != kargoapi.PromotionPhaseRunning { - // Treat the error/failure as if the step is still running so that the + if err != nil { + // Treat Errored/Failed as if the step is still running so that the // Promotion will be requeued. The step will be retried on the next // reconciliation. stepExecMeta.Message += "; step will be retried" @@ -186,7 +227,7 @@ func (e *SimpleEngine) executeSteps( }, nil } - // If we get to here, the step is still running (waiting for some external + // If we get to here, the step is still Running (waiting for some external // condition to be met). stepExecMeta.ErrorCount = 0 // Reset the error count return PromotionResult{ diff --git a/internal/directives/simple_engine_promote_test.go b/internal/directives/simple_engine_promote_test.go index 28625a6ac..72815e4b2 100644 --- a/internal/directives/simple_engine_promote_test.go +++ b/internal/directives/simple_engine_promote_test.go @@ -235,7 +235,35 @@ func TestSimpleEngine_executeSteps(t *testing.T) { }, }, { - name: "error on step execution; error threshold met", + name: "terminal error on step execution", + steps: []PromotionStep{ + {Kind: "success-step", Alias: "step1"}, + {Kind: "terminal-error-step", Alias: "step2"}, + }, + assertions: func(t *testing.T, result PromotionResult, err error) { + assert.ErrorContains(t, err, "an unrecoverable error occurred") + assert.Equal(t, kargoapi.PromotionPhaseErrored, result.Status) + assert.Equal(t, int64(1), result.CurrentStep) + assert.Len(t, result.StepExecutionMetadata, 2) + assert.Equal(t, kargoapi.PromotionPhaseSucceeded, result.StepExecutionMetadata[0].Status) + assert.NotNil(t, result.StepExecutionMetadata[0].StartedAt) + assert.NotNil(t, result.StepExecutionMetadata[0].FinishedAt) + assert.Equal(t, kargoapi.PromotionPhaseErrored, result.StepExecutionMetadata[1].Status) + assert.NotNil(t, result.StepExecutionMetadata[1].StartedAt) + assert.NotNil(t, result.StepExecutionMetadata[1].FinishedAt) + assert.Contains(t, result.StepExecutionMetadata[1].Message, "something went wrong") + + // Verify first step output is preserved in state + assert.Equal(t, State{ + "step1": map[string]any{ + "key": "value", + }, + "step2": map[string]any(nil), + }, result.State) + }, + }, + { + name: "non-terminal error on step execution; error threshold met", steps: []PromotionStep{ {Kind: "success-step", Alias: "step1"}, {Kind: "error-step", Alias: "step2"}, @@ -263,7 +291,7 @@ func TestSimpleEngine_executeSteps(t *testing.T) { }, }, { - name: "error on step execution; error threshold not met", + name: "non-terminal error on step execution; error threshold not met", steps: []PromotionStep{ { Kind: "error-step", @@ -283,6 +311,37 @@ func TestSimpleEngine_executeSteps(t *testing.T) { assert.Contains(t, result.StepExecutionMetadata[0].Message, "will be retried") }, }, + { + name: "non-terminal error on step execution; timeout elapsed", + promoCtx: PromotionContext{ + StepExecutionMetadata: kargoapi.StepExecutionMetadataList{{ + // Start time is set to an hour ago + StartedAt: ptr.To(metav1.NewTime(time.Now().Add(-time.Hour))), + }}, + }, + steps: []PromotionStep{ + { + Kind: "error-step", + Retry: &kargoapi.PromotionStepRetry{ + ErrorThreshold: 3, + Timeout: &metav1.Duration{ + Duration: time.Hour, + }, + }, + }, + }, + assertions: func(t *testing.T, result PromotionResult, err error) { + assert.ErrorContains(t, err, "timeout") + assert.ErrorContains(t, err, "has elapsed") + assert.Equal(t, kargoapi.PromotionPhaseErrored, result.Status) + assert.Equal(t, int64(0), result.CurrentStep) + assert.Len(t, result.StepExecutionMetadata, 1) + assert.Equal(t, kargoapi.PromotionPhaseErrored, result.StepExecutionMetadata[0].Status) + assert.NotNil(t, result.StepExecutionMetadata[0].StartedAt) + assert.NotNil(t, result.StepExecutionMetadata[0].FinishedAt) + assert.Equal(t, uint32(1), result.StepExecutionMetadata[0].ErrorCount) + }, + }, { name: "step is still running; timeout elapsed", promoCtx: PromotionContext{ @@ -306,6 +365,10 @@ func TestSimpleEngine_executeSteps(t *testing.T) { assert.ErrorContains(t, err, "has elapsed") assert.Equal(t, kargoapi.PromotionPhaseErrored, result.Status) assert.Equal(t, int64(0), result.CurrentStep) + assert.Len(t, result.StepExecutionMetadata, 1) + assert.Equal(t, kargoapi.PromotionPhaseRunning, result.StepExecutionMetadata[0].Status) + assert.NotNil(t, result.StepExecutionMetadata[0].StartedAt) + assert.NotNil(t, result.StepExecutionMetadata[0].FinishedAt) }, }, { @@ -318,13 +381,12 @@ func TestSimpleEngine_executeSteps(t *testing.T) { assert.Len(t, result.StepExecutionMetadata, 1) assert.Equal(t, kargoapi.PromotionPhaseRunning, result.StepExecutionMetadata[0].Status) assert.NotNil(t, result.StepExecutionMetadata[0].StartedAt) + assert.Nil(t, result.StepExecutionMetadata[0].FinishedAt) }, }, { - name: "context cancellation", - steps: []PromotionStep{ - {Kind: "context-waiter", Alias: "step1"}, - }, + name: "context cancellation", + steps: []PromotionStep{{Kind: "context-waiter"}}, assertions: func(t *testing.T, result PromotionResult, err error) { assert.ErrorContains(t, err, "met error threshold") assert.Equal(t, kargoapi.PromotionPhaseErrored, result.Status) @@ -369,6 +431,14 @@ func TestSimpleEngine_executeSteps(t *testing.T) { }, &StepRunnerPermissions{}, ) + testRegistry.RegisterPromotionStepRunner( + &mockPromotionStepRunner{ + name: "terminal-error-step", + runResult: PromotionStepResult{Status: kargoapi.PromotionPhaseErrored}, + runErr: &terminalError{err: errors.New("something went wrong")}, + }, + &StepRunnerPermissions{}, + ) testRegistry.RegisterPromotionStepRunner( &mockPromotionStepRunner{ name: "context-waiter", From 4233dd583dfac5ee5a115f24f93b22c1456dfcc1 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Wed, 11 Dec 2024 09:36:43 -0500 Subject: [PATCH 5/9] Update internal/controller/git/work_tree.go Signed-off-by: Kent Rancourt Co-authored-by: Hidde Beydals --- internal/controller/git/work_tree.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/controller/git/work_tree.go b/internal/controller/git/work_tree.go index 673c9d71a..79f37e595 100644 --- a/internal/controller/git/work_tree.go +++ b/internal/controller/git/work_tree.go @@ -326,14 +326,20 @@ func (w *workTree) IsRebasing() (bool, error) { return false, fmt.Errorf("error determining rebase status: %w", err) } rebaseMerge := filepath.Join(w.dir, strings.TrimSpace(string(res))) - if _, err = os.Stat(rebaseMerge); err == nil { + if _, err = os.Stat(rebaseMerge); !os.IsNotExist(err) { + if err != nil { + return false, err + } return true, nil } if res, err = libExec.Exec(w.buildGitCommand("rev-parse", "--git-path", "rebase-apply")); err != nil { return false, fmt.Errorf("error determining rebase status: %w", err) } rebaseApply := filepath.Join(w.dir, strings.TrimSpace(string(res))) - if _, err = os.Stat(rebaseApply); err == nil { + if _, err = os.Stat(rebaseApply); !os.IsNotExist(err) { + if err != nil { + return false, err + } return true, nil } return false, nil From 15d0580c50a6c3d5bb803703c514a9d1544bcf62 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Wed, 11 Dec 2024 13:39:37 -0500 Subject: [PATCH 6/9] make git push detect non-fast-forwards Signed-off-by: Kent Rancourt --- internal/controller/git/errors.go | 10 +++++++ internal/controller/git/errors_test.go | 37 +++++++++++++++++++++++++- internal/controller/git/work_tree.go | 11 +++++++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/internal/controller/git/errors.go b/internal/controller/git/errors.go index 440280cf2..fea260bc7 100644 --- a/internal/controller/git/errors.go +++ b/internal/controller/git/errors.go @@ -12,3 +12,13 @@ var ErrMergeConflict = errors.New("merge conflict") func IsMergeConflict(err error) bool { return errors.Is(err, ErrMergeConflict) } + +// ErrNonFastForward is returned when a push is rejected because it is not a +// fast-forward or needs to be fetched first. +var ErrNonFastForward = errors.New("non-fast-forward") + +// IsNonFastForward returns true if the error is a non-fast-forward or wraps one +// and false otherwise. +func IsNonFastForward(err error) bool { + return errors.Is(err, ErrNonFastForward) +} diff --git a/internal/controller/git/errors_test.go b/internal/controller/git/errors_test.go index 6facdba44..f92a72092 100644 --- a/internal/controller/git/errors_test.go +++ b/internal/controller/git/errors_test.go @@ -20,7 +20,7 @@ func TestIsMergeConflict(t *testing.T) { expected: false, }, { - name: "not a a merge conflict", + name: "not a merge conflict", err: errors.New("something went wrong"), expected: false, }, @@ -42,3 +42,38 @@ func TestIsMergeConflict(t *testing.T) { }) } } + +func TestIsNonFastForward(t *testing.T) { + testCases := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "not a non-fast-forward error", + err: errors.New("something went wrong"), + expected: false, + }, + { + name: "a non-fast-forward error", + err: ErrNonFastForward, + expected: true, + }, + { + name: "a wrapped fast forward error", + err: fmt.Errorf("an error occurred: %w", ErrNonFastForward), + expected: true, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + actual := IsNonFastForward(testCase.err) + require.Equal(t, testCase.expected, actual) + }) + } +} diff --git a/internal/controller/git/work_tree.go b/internal/controller/git/work_tree.go index 79f37e595..214691b43 100644 --- a/internal/controller/git/work_tree.go +++ b/internal/controller/git/work_tree.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "regexp" "strings" "time" @@ -516,6 +517,11 @@ type PushOptions struct { PullRebase bool } +// https://regex101.com/r/aNYjHP/1 +// +// nolint: lll +var nonFastForwardRegex = regexp.MustCompile(`(?m)^\s*!\s+\[(?:remote )?rejected].+\((?:non-fast-forward|fetch first|cannot lock ref.*)\)\s*$`) + func (w *workTree) Push(opts *PushOptions) error { if opts == nil { opts = &PushOptions{} @@ -551,7 +557,10 @@ func (w *workTree) Push(opts *PushOptions) error { if opts.Force { args = append(args, "--force") } - if _, err := libExec.Exec(w.buildGitCommand(args...)); err != nil { + if res, err := libExec.Exec(w.buildGitCommand(args...)); err != nil { + if nonFastForwardRegex.MatchString(string(res)) { + return fmt.Errorf("error pushing branch: %w", ErrNonFastForward) + } return fmt.Errorf("error pushing branch: %w", err) } return nil From 4e0092844be242dbc5e9c0bbbf3f2a47bb8a5506 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Wed, 11 Dec 2024 14:02:51 -0500 Subject: [PATCH 7/9] make git-push retry internally on non-fast-forward errors Signed-off-by: Kent Rancourt --- internal/directives/git_pusher.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/internal/directives/git_pusher.go b/internal/directives/git_pusher.go index 262968182..8e2b1aee8 100644 --- a/internal/directives/git_pusher.go +++ b/internal/directives/git_pusher.go @@ -3,9 +3,12 @@ package directives import ( "context" "fmt" + "time" securejoin "github.com/cyphar/filepath-securejoin" "github.com/xeipuuv/gojsonschema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/controller/git" @@ -127,7 +130,20 @@ func (g *gitPushPusher) runPromotionStep( fmt.Errorf("error getting current branch: %w", err) } } - if err = workTree.Push(pushOpts); err != nil { + + if err = retry.OnError( + wait.Backoff{ // TODO(krancour): Make this at least partially configurable + Duration: 1 * time.Second, + Factor: 2, + Steps: 10, + Cap: 2 * time.Minute, + Jitter: 0.1, + }, + git.IsNonFastForward, + func() error { + return workTree.Push(pushOpts) + }, + ); err != nil { if git.IsMergeConflict(err) { // Special case: A merge conflict requires manual resolution and no amount // of retries will fix that. @@ -137,6 +153,7 @@ func (g *gitPushPusher) runPromotionStep( return PromotionStepResult{Status: kargoapi.PromotionPhaseErrored}, fmt.Errorf("error pushing commits to remote: %w", err) } + commitID, err := workTree.LastCommitID() if err != nil { return PromotionStepResult{Status: kargoapi.PromotionPhaseErrored}, From efa806f7fdd0c32d1f6105685702c94e6375d9f4 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Mon, 16 Dec 2024 13:14:19 -0500 Subject: [PATCH 8/9] make max attempts cofigurable Signed-off-by: Kent Rancourt --- docs/docs/35-references/10-promotion-steps.md | 20 +++++++++++++- internal/directives/git_pusher.go | 27 ++++++++++++------- internal/directives/git_pusher_test.go | 19 +++++++++++++ .../directives/schemas/git-push-config.json | 6 +++++ internal/directives/zz_config_types.go | 6 +++++ ui/src/gen/directives/git-push-config.json | 6 +++++ 6 files changed, 74 insertions(+), 10 deletions(-) diff --git a/docs/docs/35-references/10-promotion-steps.md b/docs/docs/35-references/10-promotion-steps.md index f168c38cc..df9a15b94 100644 --- a/docs/docs/35-references/10-promotion-steps.md +++ b/docs/docs/35-references/10-promotion-steps.md @@ -924,7 +924,24 @@ steps: ### `git-push` `git-push` pushes the committed changes in a specified working tree to a -specified branch in the remote repository. This step typically follows a `git-commit` step and is often followed by a `git-open-pr` step. +specified branch in the remote repository. This step typically follows a +`git-commit` step and is often followed by a `git-open-pr` step. + +This step also implements its own, internal retry logic. If a push fails, with +the cause determined to be the presence of new commits in the remote branch that +are not present in the local branch, the step will attempt to rebase before +retrying the push. Any merge conflict requiring manual resolution will +immediately halt further attempts. + +:::info +This step's internal retry logic is helpful in scenarios when concurrent +Promotions to multiple Stages may all write to the same branch of the same +repository. + +Because conflicts requiring manual resolution will halt further attempts, it is +recommended to design your Promotion processes such that Promotions to multiple +Stages that write to the same branch do not write to the same files. +::: #### `git-push` Configuration @@ -932,6 +949,7 @@ specified branch in the remote repository. This step typically follows a `git-co |------|------|----------|-------------| | `path` | `string` | Y | Path to a Git working tree containing committed changes. | | `targetBranch` | `string` | N | The branch to push to in the remote repository. Mutually exclusive with `generateTargetBranch=true`. If neither of these is provided, the target branch will be the same as the branch currently checked out in the working tree. | +| `maxAttempts` | `int32` | N | The maximum number of attempts to make when pushing to the remote repository. Default is 50. | | `generateTargetBranch` | `boolean` | N | Whether to push to a remote branch named like `kargo///promotion`. If such a branch does not already exist, it will be created. A value of 'true' is mutually exclusive with `targetBranch`. If neither of these is provided, the target branch will be the currently checked out branch. This option is useful when a subsequent promotion step will open a pull request against a Stage-specific branch. In such a case, the generated target branch pushed to by the `git-push` step can later be utilized as the source branch of the pull request. | #### `git-push` Examples diff --git a/internal/directives/git_pusher.go b/internal/directives/git_pusher.go index 8e2b1aee8..04d47f0bf 100644 --- a/internal/directives/git_pusher.go +++ b/internal/directives/git_pusher.go @@ -3,11 +3,9 @@ package directives import ( "context" "fmt" - "time" securejoin "github.com/cyphar/filepath-securejoin" "github.com/xeipuuv/gojsonschema" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" kargoapi "github.com/akuity/kargo/api/v1alpha1" @@ -131,14 +129,25 @@ func (g *gitPushPusher) runPromotionStep( } } + backoff := retry.DefaultBackoff + if cfg.MaxAttempts != nil { + // Note, the docs for this field say: + // + // The remaining number of iterations in which the duration + // parameter may change... + // + // This is misleading, as it implies that the total number of attempts may + // exceed the value of Steps and that Steps only dictates the maximum number + // of adjustments to the interval between retries. + // + // Reading the implementation of retry.DefaultBackoff reveals that Steps + // is indeed the maximum number of attempts. + backoff.Steps = int(*cfg.MaxAttempts) + } else { + backoff.Steps = 50 + } if err = retry.OnError( - wait.Backoff{ // TODO(krancour): Make this at least partially configurable - Duration: 1 * time.Second, - Factor: 2, - Steps: 10, - Cap: 2 * time.Minute, - Jitter: 0.1, - }, + backoff, git.IsNonFastForward, func() error { return workTree.Push(pushOpts) diff --git a/internal/directives/git_pusher_test.go b/internal/directives/git_pusher_test.go index 7b5618c64..f128de166 100644 --- a/internal/directives/git_pusher_test.go +++ b/internal/directives/git_pusher_test.go @@ -3,6 +3,7 @@ package directives import ( "context" "fmt" + "math" "net/http/httptest" "os" "path/filepath" @@ -37,6 +38,24 @@ func Test_gitPusher_validate(t *testing.T) { "path: String length must be greater than or equal to 1", }, }, + { + name: "maxAttempts < 1", + config: Config{ + "maxAttempts": 0, + }, + expectedProblems: []string{ + "maxAttempts: Must be greater than or equal to 1", + }, + }, + { + name: fmt.Sprintf("maxAttempts > %d", math.MaxInt32), + config: Config{ + "maxAttempts": math.MaxInt32 + 1, + }, + expectedProblems: []string{ + fmt.Sprintf("maxAttempts: Must be less than or equal to %.9e", float64(math.MaxInt32)), + }, + }, { name: "just generateTargetBranch is true", config: Config{ // Should be completely valid diff --git a/internal/directives/schemas/git-push-config.json b/internal/directives/schemas/git-push-config.json index 7d1266109..fd18ff5a1 100644 --- a/internal/directives/schemas/git-push-config.json +++ b/internal/directives/schemas/git-push-config.json @@ -9,6 +9,12 @@ "type": "boolean", "description": "Indicates whether to push to a new remote branch. A value of 'true' is mutually exclusive with 'targetBranch'. If neither of these is provided, the target branch will be the currently checked out branch." }, + "maxAttempts": { + "type": "integer", + "description": "This step implements its own internal retry logic for cases where a push is determined to have failed due to the remote branch having commits that that are not present locally. Each attempt, including the first, rebases prior to pushing. This field configures the maximum number of attempts to push to the remote repository. If not specified, the default is 50.", + "minimum": 1, + "maximum": 2147483647 + }, "path": { "type": "string", "description": "The path to a working directory of a local repository.", diff --git a/internal/directives/zz_config_types.go b/internal/directives/zz_config_types.go index e09b38c60..5ffe2e456 100644 --- a/internal/directives/zz_config_types.go +++ b/internal/directives/zz_config_types.go @@ -211,6 +211,12 @@ type GitPushConfig struct { // with 'targetBranch'. If neither of these is provided, the target branch will be the // currently checked out branch. GenerateTargetBranch bool `json:"generateTargetBranch,omitempty"` + // This step implements its own internal retry logic for cases where a push is determined to + // have failed due to the remote branch having commits that that are not present locally. + // Each attempt, including the first, rebases prior to pushing. This field configures the + // maximum number of attempts to push to the remote repository. If not specified, the + // default is 50. + MaxAttempts *int64 `json:"maxAttempts,omitempty"` // The path to a working directory of a local repository. Path string `json:"path"` // The target branch to push to. Mutually exclusive with 'generateTargetBranch=true'. If diff --git a/ui/src/gen/directives/git-push-config.json b/ui/src/gen/directives/git-push-config.json index 1532cfe35..903e62dd4 100644 --- a/ui/src/gen/directives/git-push-config.json +++ b/ui/src/gen/directives/git-push-config.json @@ -8,6 +8,12 @@ "type": "boolean", "description": "Indicates whether to push to a new remote branch. A value of 'true' is mutually exclusive with 'targetBranch'. If neither of these is provided, the target branch will be the currently checked out branch." }, + "maxAttempts": { + "type": "integer", + "description": "This step implements its own internal retry logic for cases where a push is determined to have failed due to the remote branch having commits that that are not present locally. Each attempt, including the first, rebases prior to pushing. This field configures the maximum number of attempts to push to the remote repository. If not specified, the default is 50.", + "minimum": 1, + "maximum": 2147483647 + }, "path": { "type": "string", "description": "The path to a working directory of a local repository.", From 54b2a27b94fc52c54d56a51cb9102b1a61df5fb0 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Tue, 17 Dec 2024 11:48:01 -0500 Subject: [PATCH 9/9] git-push branch locks + retry tweaks Signed-off-by: Kent Rancourt --- internal/directives/git_pusher.go | 65 ++++++++++++++++++++------ internal/directives/git_pusher_test.go | 1 + 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/internal/directives/git_pusher.go b/internal/directives/git_pusher.go index 04d47f0bf..505a251f3 100644 --- a/internal/directives/git_pusher.go +++ b/internal/directives/git_pusher.go @@ -3,9 +3,12 @@ package directives import ( "context" "fmt" + "sync" + "time" securejoin "github.com/cyphar/filepath-securejoin" "github.com/xeipuuv/gojsonschema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" kargoapi "github.com/akuity/kargo/api/v1alpha1" @@ -28,12 +31,16 @@ func init() { // pushes commits from a local Git repository to a remote Git repository. type gitPushPusher struct { schemaLoader gojsonschema.JSONLoader + branchMus map[string]*sync.Mutex + masterMu sync.Mutex } // newGitPusher returns an implementation of the PromotionStepRunner interface // that pushes commits from a local Git repository to a remote Git repository. func newGitPusher() PromotionStepRunner { - r := &gitPushPusher{} + r := &gitPushPusher{ + branchMus: map[string]*sync.Mutex{}, + } r.schemaLoader = getConfigSchemaLoader(r.Name()) return r } @@ -113,24 +120,24 @@ func (g *gitPushPusher) runPromotionStep( // avoid conflicts. PullRebase: true, } - // If we're supposed to generate a target branch name, do so + // If we're supposed to generate a target branch name, do so. if cfg.GenerateTargetBranch { + // TargetBranch and GenerateTargetBranch are mutually exclusive, so we're + // never overwriting a user-specified target branch here. pushOpts.TargetBranch = fmt.Sprintf("kargo/promotion/%s", stepCtx.Promotion) pushOpts.Force = true } - targetBranch := pushOpts.TargetBranch - if targetBranch == "" { + if pushOpts.TargetBranch == "" { // If targetBranch is still empty, we want to set it to the current branch // because we will want to return the branch that was pushed to, but we // don't want to mess with the options any further. - if targetBranch, err = workTree.CurrentBranch(); err != nil { + if pushOpts.TargetBranch, err = workTree.CurrentBranch(); err != nil { return PromotionStepResult{Status: kargoapi.PromotionPhaseErrored}, fmt.Errorf("error getting current branch: %w", err) } } - backoff := retry.DefaultBackoff - if cfg.MaxAttempts != nil { + backoff := wait.Backoff{ // Note, the docs for this field say: // // The remaining number of iterations in which the duration @@ -140,17 +147,26 @@ func (g *gitPushPusher) runPromotionStep( // exceed the value of Steps and that Steps only dictates the maximum number // of adjustments to the interval between retries. // - // Reading the implementation of retry.DefaultBackoff reveals that Steps - // is indeed the maximum number of attempts. + // Reading the implementation of retry.DefaultBackoff reveals that Steps is + // indeed the maximum number of attempts. + Steps: 10, + Duration: time.Second, + Factor: 1.5, + Jitter: 0.5, + Cap: 30 * time.Second, + } + if cfg.MaxAttempts != nil { backoff.Steps = int(*cfg.MaxAttempts) - } else { - backoff.Steps = 50 } if err = retry.OnError( backoff, git.IsNonFastForward, func() error { - return workTree.Push(pushOpts) + // This will obtain a lock on the repo + branch before performing a + // pull/rebase + push. This means retries should only ever be necessary + // when there are multiple sharded controllers concurrently executing + // Promotions that push to the same branch. + return g.push(workTree, pushOpts) }, ); err != nil { if git.IsMergeConflict(err) { @@ -171,8 +187,31 @@ func (g *gitPushPusher) runPromotionStep( return PromotionStepResult{ Status: kargoapi.PromotionPhaseSucceeded, Output: map[string]any{ - stateKeyBranch: targetBranch, + stateKeyBranch: pushOpts.TargetBranch, stateKeyCommit: commitID, }, }, nil } + +// push obtains a repo + branch lock before pushing to the remote. This helps +// reduce the likelihood of conflicts when multiple Promotions that push to +// the same branch are running concurrently. +func (g *gitPushPusher) push(workTree git.WorkTree, pushOpts *git.PushOptions) error { + branchKey := g.getBranchKey(workTree.URL(), pushOpts.TargetBranch) + if _, exists := g.branchMus[branchKey]; !exists { + g.masterMu.Lock() + // Double-check to make sure it wasn't created while we were waiting for the + // lock. + if _, exists = g.branchMus[branchKey]; !exists { + g.branchMus[branchKey] = &sync.Mutex{} + } + g.masterMu.Unlock() + } + g.branchMus[branchKey].Lock() + defer g.branchMus[branchKey].Unlock() + return workTree.Push(pushOpts) +} + +func (g *gitPushPusher) getBranchKey(repoURL, branch string) string { + return fmt.Sprintf("%s:%s", repoURL, branch) +} diff --git a/internal/directives/git_pusher_test.go b/internal/directives/git_pusher_test.go index f128de166..868e7571f 100644 --- a/internal/directives/git_pusher_test.go +++ b/internal/directives/git_pusher_test.go @@ -203,6 +203,7 @@ func Test_gitPusher_runPromotionStep(t *testing.T) { r := newGitPusher() runner, ok := r.(*gitPushPusher) require.True(t, ok) + require.NotNil(t, runner.branchMus) res, err := runner.runPromotionStep( context.Background(),