From 401618e6e0278b26dfc5abac9fefcce79af24ab8 Mon Sep 17 00:00:00 2001 From: Nikita Vanyasin Date: Thu, 28 Jan 2021 11:43:44 +0300 Subject: [PATCH 1/2] spool: remove unused code (condition is always false) --- pkg/jobmon/spool.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/jobmon/spool.go b/pkg/jobmon/spool.go index 11e4f55..50d4782 100644 --- a/pkg/jobmon/spool.go +++ b/pkg/jobmon/spool.go @@ -219,13 +219,6 @@ func (s *SpoolManager) getLock(jobID string) error { } } - if err != nil { - if jobID != "" { - err = errors.Wrapf(err, "job %s failed, could not get lock", jobID) - } else { - err = errors.Wrap(err, "could not get lock") - } - } return err } From 4b47fb6d7427c4c673f0849789884a6382e343a5 Mon Sep 17 00:00:00 2001 From: Nikita Vanyasin Date: Thu, 28 Jan 2021 19:14:33 +0300 Subject: [PATCH 2/2] Optimize usage of lockfile for jobmon spool Logging improved. --- pkg/jobmon/spool.go | 55 ++++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/pkg/jobmon/spool.go b/pkg/jobmon/spool.go index 50d4782..b967c4b 100644 --- a/pkg/jobmon/spool.go +++ b/pkg/jobmon/spool.go @@ -29,23 +29,21 @@ var ErrJobAlreadyRunning = errors.New("A job with same ID is already running") type SpoolManager struct { dirPath string - lock *lockfile.Lockfile logger *logrus.Logger } // NewSpoolManager creates a new object to manage jobmon spool // dirPath must be absolute path func NewSpoolManager(dirPath string, logger *logrus.Logger) *SpoolManager { - lockFile, _ := lockfile.New(fmt.Sprintf("%s/spool.lock", dirPath)) - return &SpoolManager{dirPath, &lockFile, logger} + return &SpoolManager{dirPath, logger} } func (s *SpoolManager) NewJob(r *JobRun, forcedRun bool) (string, error) { - err := s.getLock(r.ID) + l, err := s.getLock(r.ID) if err != nil { return "", err } - defer s.releaseLock() + defer s.releaseLock(l) duplicateRunEntries, err := s.findDuplicateRuns(r.ID) if err != nil { @@ -78,11 +76,11 @@ func (s *SpoolManager) NewJob(r *JobRun, forcedRun bool) (string, error) { } func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error { - err := s.getLock(r.ID) + l, err := s.getLock(r.ID) if err != nil { return err } - defer s.releaseLock() + defer s.releaseLock(l) filePath := s.getFilePath(uniqID) newFilePath := s.getFilePath(getUniqJobRunID(r.ID, true, r.StartedAt)) @@ -95,14 +93,8 @@ func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error { } func (s *SpoolManager) GetFinishedJobs() ([]string, []*JobRun, error) { - err := s.getLock("") - if err != nil { - return nil, nil, err - } - pattern := fmt.Sprintf("%s/%s_*_*.%s", s.dirPath, markerFinished, jsonExtension) fileNames, err := filepath.Glob(pattern) - s.releaseLock() if err != nil { return nil, nil, err } @@ -136,17 +128,10 @@ func (s *SpoolManager) readEntryFile(path string) (*JobRun, error) { } func (s *SpoolManager) RemoveJobs(ids []string) error { - err := s.getLock("") - if err != nil { - return err - } - defer s.releaseLock() - var filePaths []string for _, uniqID := range ids { filePaths = append(filePaths, s.getFilePath(uniqID)) } - return removeFiles(filePaths) } @@ -196,34 +181,46 @@ func (s *SpoolManager) findDuplicateRuns(jobID string) ([]string, error) { return matches, nil } -func (s *SpoolManager) getLock(jobID string) error { +func (s *SpoolManager) getLock(jobID string) (*lockfile.Lockfile, error) { err := s.ensureSpoolDirExists() if err != nil { - return err + return nil, err + } + + l, err := lockfile.New(fmt.Sprintf("%s/job_%s.lock", s.dirPath, encodeJobID(jobID))) + if err != nil { + return nil, err } retryLimit := 20 retry := 0 retryIn := 500 * time.Millisecond for { - err = s.lock.TryLock() + err = l.TryLock() if err != nil { retry++ if retry >= retryLimit { break } - s.logger.Errorf("job %s: could not get lock try %d/%d, retrying in %v", jobID, retry, retryLimit, retryIn) + ownerProc, getOwnerErr := l.GetOwner() + ownerInfo := "" + if getOwnerErr != nil { + ownerInfo = fmt.Sprintf("could not get lock owner info: %s", getOwnerErr) + } else if ownerProc != nil { + ownerInfo = fmt.Sprintf("process with id %d", ownerProc.Pid) + } + s.logger.Errorf("job %s: could not get lock. Lockfile owner: %s. Attempt %d of %d, retrying in %v", jobID, ownerInfo, retry, retryLimit, retryIn) time.Sleep(retryIn) } else { - return nil + return &l, nil } } - return err + return nil, err } -func (s *SpoolManager) releaseLock() { - err := s.lock.Unlock() +func (s *SpoolManager) releaseLock(l *lockfile.Lockfile) { + err := l.Unlock() if err != nil { s.logger.WithError(err).Error("could not release lock") } @@ -246,6 +243,8 @@ func getUniqJobRunID(jobID string, isJobFinished bool, jobStartedAt common.Times return strings.Join(parts, "_") } +// encodeJobID returns hex-encoded string for specified value. +// Max result length is len(id)*2. func encodeJobID(id string) string { return hex.EncodeToString([]byte(id)) }