Skip to content

Commit

Permalink
Reduce the number of git calls in parallel mode when merging
Browse files Browse the repository at this point in the history
All Clone() calls that have signaled an interest in merging
before another Clone() checks whether a merge is necessary
can skip their own checks.

This should reduce the thundering herd problem at the
beginning of large paralell runs.
  • Loading branch information
finnag committed Apr 25, 2024
1 parent e46932a commit 7969c67
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions server/events/working_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
const workingDirPrefix = "repos"

var cloneLocks sync.Map
var recheckRequiredMap sync.Map

//go:generate pegomock generate github.com/runatlantis/atlantis/server/events --package mocks -o mocks/mock_working_dir.go WorkingDir
//go:generate pegomock generate github.com/runatlantis/atlantis/server/events --package events WorkingDir
Expand Down Expand Up @@ -97,13 +98,29 @@ type FileWorkspace struct {
func (w *FileWorkspace) Clone(logger logging.SimpleLogging, headRepo models.Repo, p models.PullRequest, workspace string) (string, bool, error) {
cloneDir := w.cloneDir(p.BaseRepo, p, workspace)

// Disable the mustRecheckUpstream flag if we are not using the merge checkout strategy
mustRecheckUpstream := w.CheckForUpstreamChanges && w.CheckoutMerge

if mustRecheckUpstream {
// We atomically set the recheckRequiredMap flag here before grabbing the clone lock.
// If the flag is cleared after we grab the lock, it means some other thread
// did the necessary work late enough that we do not have to do it again.
recheckRequiredMap.Store(cloneDir, struct{}{})
}

// Unconditionally wait for the clone lock here, if anyone else is doing any clone
// operation in this directory, we wait for it to finish before we check anything.
value, _ := cloneLocks.LoadOrStore(cloneDir, new(sync.Mutex))
mutex := value.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
defer func() { w.CheckForUpstreamChanges = false }()

if mustRecheckUpstream {
if _, exists := recheckRequiredMap.Load(cloneDir); !exists {
mustRecheckUpstream = false
w.Logger.Debug("Skipping upstream check. Some other thread has done this for us")
}
}

c := wrappedGitContext{cloneDir, headRepo, p}
// If the directory already exists, check if it's at the right commit.
Expand Down Expand Up @@ -132,11 +149,17 @@ func (w *FileWorkspace) Clone(logger logging.SimpleLogging, headRepo models.Repo
// We're prefix matching here because BitBucket doesn't give us the full
// commit, only a 12 character prefix.
if strings.HasPrefix(currCommit, p.HeadCommit) {
if w.CheckForUpstreamChanges && w.CheckoutMerge && w.recheckDiverged(logger, p, headRepo, cloneDir) {
logger.Info("base branch has been updated, using merge strategy and will clone again")
return cloneDir, true, w.mergeAgain(logger, c)
if mustRecheckUpstream {
w.CheckForUpstreamChanges = false
recheckRequiredMap.Delete(cloneDir)
if w.recheckDiverged(p, headRepo, cloneDir) {
w.Logger.Info("base branch has been updated, using merge strategy and will merge again")
return cloneDir, true, w.mergeAgain(logger, c)
}
w.Logger.Debug("repo is at correct commit %q and there are no upstream changes", p.HeadCommit)
} else {
w.Logger.Debug("repo is at correct commit %q so will not re-clone", p.HeadCommit)
}
logger.Debug("repo is at correct commit '%s' so will not re-clone", p.HeadCommit)
return cloneDir, false, nil
} else {
logger.Debug("repo was already cloned but is not at correct commit, wanted '%s' got '%s'", p.HeadCommit, currCommit)
Expand Down

0 comments on commit 7969c67

Please sign in to comment.