From 7969c6724b0747a5e07bf2f7cee782a000676a3d Mon Sep 17 00:00:00 2001 From: Finn Arne Gangstad Date: Tue, 15 Aug 2023 15:11:38 +0200 Subject: [PATCH] Reduce the number of git calls in parallel mode when merging All Clone() calls that have signaled an interest in merging before another Clone() checks whether a merge is necessary can skip their own checks. This should reduce the thundering herd problem at the beginning of large paralell runs. --- server/events/working_dir.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/server/events/working_dir.go b/server/events/working_dir.go index 33bb7a8826..c1aa64a072 100644 --- a/server/events/working_dir.go +++ b/server/events/working_dir.go @@ -31,6 +31,7 @@ import ( const workingDirPrefix = "repos" var cloneLocks sync.Map +var recheckRequiredMap sync.Map //go:generate pegomock generate github.com/runatlantis/atlantis/server/events --package mocks -o mocks/mock_working_dir.go WorkingDir //go:generate pegomock generate github.com/runatlantis/atlantis/server/events --package events WorkingDir @@ -97,13 +98,29 @@ type FileWorkspace struct { func (w *FileWorkspace) Clone(logger logging.SimpleLogging, headRepo models.Repo, p models.PullRequest, workspace string) (string, bool, error) { cloneDir := w.cloneDir(p.BaseRepo, p, workspace) + // Disable the mustRecheckUpstream flag if we are not using the merge checkout strategy + mustRecheckUpstream := w.CheckForUpstreamChanges && w.CheckoutMerge + + if mustRecheckUpstream { + // We atomically set the recheckRequiredMap flag here before grabbing the clone lock. + // If the flag is cleared after we grab the lock, it means some other thread + // did the necessary work late enough that we do not have to do it again. + recheckRequiredMap.Store(cloneDir, struct{}{}) + } + // Unconditionally wait for the clone lock here, if anyone else is doing any clone // operation in this directory, we wait for it to finish before we check anything. value, _ := cloneLocks.LoadOrStore(cloneDir, new(sync.Mutex)) mutex := value.(*sync.Mutex) mutex.Lock() defer mutex.Unlock() - defer func() { w.CheckForUpstreamChanges = false }() + + if mustRecheckUpstream { + if _, exists := recheckRequiredMap.Load(cloneDir); !exists { + mustRecheckUpstream = false + w.Logger.Debug("Skipping upstream check. Some other thread has done this for us") + } + } c := wrappedGitContext{cloneDir, headRepo, p} // If the directory already exists, check if it's at the right commit. @@ -132,11 +149,17 @@ func (w *FileWorkspace) Clone(logger logging.SimpleLogging, headRepo models.Repo // We're prefix matching here because BitBucket doesn't give us the full // commit, only a 12 character prefix. if strings.HasPrefix(currCommit, p.HeadCommit) { - if w.CheckForUpstreamChanges && w.CheckoutMerge && w.recheckDiverged(logger, p, headRepo, cloneDir) { - logger.Info("base branch has been updated, using merge strategy and will clone again") - return cloneDir, true, w.mergeAgain(logger, c) + if mustRecheckUpstream { + w.CheckForUpstreamChanges = false + recheckRequiredMap.Delete(cloneDir) + if w.recheckDiverged(p, headRepo, cloneDir) { + w.Logger.Info("base branch has been updated, using merge strategy and will merge again") + return cloneDir, true, w.mergeAgain(logger, c) + } + w.Logger.Debug("repo is at correct commit %q and there are no upstream changes", p.HeadCommit) + } else { + w.Logger.Debug("repo is at correct commit %q so will not re-clone", p.HeadCommit) } - logger.Debug("repo is at correct commit '%s' so will not re-clone", p.HeadCommit) return cloneDir, false, nil } else { logger.Debug("repo was already cloned but is not at correct commit, wanted '%s' got '%s'", p.HeadCommit, currCommit)