Skip to content

Commit

Permalink
Merge pull request #280 from cloudradar-monitoring/improve-jobmon-spo…
Browse files Browse the repository at this point in the history
…ol-multitasking

Improve jobmon spool multitasking
  • Loading branch information
nikita-vanyasin authored Jan 29, 2021
2 parents 2c18320 + 4b47fb6 commit 8a1ebf0
Showing 1 changed file with 27 additions and 35 deletions.
62 changes: 27 additions & 35 deletions pkg/jobmon/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,21 @@ var ErrJobAlreadyRunning = errors.New("A job with same ID is already running")

type SpoolManager struct {
dirPath string
lock *lockfile.Lockfile
logger *logrus.Logger
}

// NewSpoolManager creates a new object to manage jobmon spool
// dirPath must be absolute path
func NewSpoolManager(dirPath string, logger *logrus.Logger) *SpoolManager {
lockFile, _ := lockfile.New(fmt.Sprintf("%s/spool.lock", dirPath))
return &SpoolManager{dirPath, &lockFile, logger}
return &SpoolManager{dirPath, logger}
}

func (s *SpoolManager) NewJob(r *JobRun, forcedRun bool) (string, error) {
err := s.getLock(r.ID)
l, err := s.getLock(r.ID)
if err != nil {
return "", err
}
defer s.releaseLock()
defer s.releaseLock(l)

duplicateRunEntries, err := s.findDuplicateRuns(r.ID)
if err != nil {
Expand Down Expand Up @@ -78,11 +76,11 @@ func (s *SpoolManager) NewJob(r *JobRun, forcedRun bool) (string, error) {
}

func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error {
err := s.getLock(r.ID)
l, err := s.getLock(r.ID)
if err != nil {
return err
}
defer s.releaseLock()
defer s.releaseLock(l)

filePath := s.getFilePath(uniqID)
newFilePath := s.getFilePath(getUniqJobRunID(r.ID, true, r.StartedAt))
Expand All @@ -95,14 +93,8 @@ func (s *SpoolManager) FinishJob(uniqID string, r *JobRun) error {
}

func (s *SpoolManager) GetFinishedJobs() ([]string, []*JobRun, error) {
err := s.getLock("")
if err != nil {
return nil, nil, err
}

pattern := fmt.Sprintf("%s/%s_*_*.%s", s.dirPath, markerFinished, jsonExtension)
fileNames, err := filepath.Glob(pattern)
s.releaseLock()
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -136,17 +128,10 @@ func (s *SpoolManager) readEntryFile(path string) (*JobRun, error) {
}

func (s *SpoolManager) RemoveJobs(ids []string) error {
err := s.getLock("")
if err != nil {
return err
}
defer s.releaseLock()

var filePaths []string
for _, uniqID := range ids {
filePaths = append(filePaths, s.getFilePath(uniqID))
}

return removeFiles(filePaths)
}

Expand Down Expand Up @@ -196,41 +181,46 @@ func (s *SpoolManager) findDuplicateRuns(jobID string) ([]string, error) {
return matches, nil
}

func (s *SpoolManager) getLock(jobID string) error {
func (s *SpoolManager) getLock(jobID string) (*lockfile.Lockfile, error) {
err := s.ensureSpoolDirExists()
if err != nil {
return err
return nil, err
}

l, err := lockfile.New(fmt.Sprintf("%s/job_%s.lock", s.dirPath, encodeJobID(jobID)))
if err != nil {
return nil, err
}

retryLimit := 20
retry := 0
retryIn := 500 * time.Millisecond
for {
err = s.lock.TryLock()
err = l.TryLock()
if err != nil {
retry++
if retry >= retryLimit {
break
}
s.logger.Errorf("job %s: could not get lock try %d/%d, retrying in %v", jobID, retry, retryLimit, retryIn)
ownerProc, getOwnerErr := l.GetOwner()
ownerInfo := "<unknown>"
if getOwnerErr != nil {
ownerInfo = fmt.Sprintf("could not get lock owner info: %s", getOwnerErr)
} else if ownerProc != nil {
ownerInfo = fmt.Sprintf("process with id %d", ownerProc.Pid)
}
s.logger.Errorf("job %s: could not get lock. Lockfile owner: %s. Attempt %d of %d, retrying in %v", jobID, ownerInfo, retry, retryLimit, retryIn)
time.Sleep(retryIn)
} else {
return nil
return &l, nil
}
}

if err != nil {
if jobID != "" {
err = errors.Wrapf(err, "job %s failed, could not get lock", jobID)
} else {
err = errors.Wrap(err, "could not get lock")
}
}
return err
return nil, err
}

func (s *SpoolManager) releaseLock() {
err := s.lock.Unlock()
func (s *SpoolManager) releaseLock(l *lockfile.Lockfile) {
err := l.Unlock()
if err != nil {
s.logger.WithError(err).Error("could not release lock")
}
Expand All @@ -253,6 +243,8 @@ func getUniqJobRunID(jobID string, isJobFinished bool, jobStartedAt common.Times
return strings.Join(parts, "_")
}

// encodeJobID returns hex-encoded string for specified value.
// Max result length is len(id)*2.
func encodeJobID(id string) string {
return hex.EncodeToString([]byte(id))
}
Expand Down

0 comments on commit 8a1ebf0

Please sign in to comment.