Skip to content

Commit

Permalink
feat: Add StartTimestamp and EndTimestamp for Cron Scheduler (#4868)
Browse files Browse the repository at this point in the history
* feat: Add StartTimestamp and EndTimestamp for Cron Scheduler

closes #4863

Signed-off-by: Jack Chen <[email protected]>

* fix: Address Review Comments

Signed-off-by: Jack Chen <[email protected]>

---------

Signed-off-by: Jack Chen <[email protected]>
  • Loading branch information
jackchenjc authored Aug 20, 2024
1 parent 236833f commit f6f99ba
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31
github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.9
github.com/fxamacker/cbor/v2 v2.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50 h1:i6VyieS5P7olGlhG1Wt
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50/go.mod h1:oOuvWXdu6YaB2J17pe4X0ey66AZFyTzOmAZDQxPGGmM=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12 h1:JGZ9fsyCZOgbNkg+qdW9JN63NKIEX95v5zJhCVdlp10=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12/go.mod h1:v7CvWGVmTh8dKItDNtfdBnYTeLhfZP5YmFiLsGJL9KU=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37 h1:+LgoPgGqubjTrxCQBBms054gGBvnWSWfto666JRBL5Q=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38 h1:tLhD+x0wvvMr11KFof7E95pKMBuntxCYjwC6XEhCJxg=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31 h1:mC0ZguoK8HjVxeD7dIiXRqKswM0y7gnPQJt1fLOh/v4=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31/go.mod h1:gcHtufkjd6oa3ZLqfzp66bCyCPx8MZe8Pwzh+2ITFnw=
github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.13 h1:LkaF2eOpSz4eUiGpah4a9r+cB/A0Pea3Nh7aTU9hlKs=
Expand Down
81 changes: 54 additions & 27 deletions internal/support/cronscheduler/application/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package application
import (
"context"
"fmt"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
Expand Down Expand Up @@ -38,15 +39,7 @@ func AddScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Contain
return "", errors.NewCommonEdgeXWrapper(err)
}

if job.AdminState == models.Unlocked {
err = schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
}
} else {
lc.Debugf("The scheduled job is created but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId)
return addedJob.Id, nil
}
arrangeScheduleJob(ctx, job, dic)

lc.Debugf("Successfully created the scheduled job. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId)
return addedJob.Id, nil
Expand Down Expand Up @@ -130,15 +123,7 @@ func PatchScheduleJob(ctx context.Context, dto dtos.UpdateScheduleJob, dic *di.C
return errors.NewCommonEdgeXWrapper(err)
}

if job.AdminState == models.Unlocked {
err = schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
} else {
lc.Debugf("The scheduled job is updated but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId)
return nil
}
arrangeScheduleJob(ctx, job, dic)

lc.Debugf("Successfully patched the scheduled job: %s. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
return nil
Expand Down Expand Up @@ -209,24 +194,66 @@ func LoadScheduleJobsToSchedulerManager(ctx context.Context, dic *di.Container)
return errors.NewCommonEdgeXWrapper(err)
}

if job.AdminState == models.Unlocked {
err := schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
arrangeScheduleJob(ctx, job, dic)

if job.AdminState == models.Unlocked {
// Get the latest schedule action records by job name and generate missed schedule action records
latestRecords := getLatestRecordsByJobName(job.Name, allLatestRecords)
err = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
} else {
lc.Debugf("The scheduled job is loaded but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId)
return nil
}
lc.Debugf("Successfully loaded and started the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId)

lc.Debugf("Successfully loaded the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId)
}

return nil
}

// handleScheduleJob handles the schedule job based on the startTimestamp and endTimestamp
func arrangeScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Container) {
schedulerManager := container.SchedulerManagerFrom(dic.Get)
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
correlationId := correlation.FromContext(ctx)

if job.AdminState != models.Unlocked {
lc.Debugf("The scheduled job is ready but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId)
return
}

startTimestamp := job.Definition.GetBaseScheduleDef().StartTimestamp
endTimestamp := job.Definition.GetBaseScheduleDef().EndTimestamp

durationUntilStart := time.Until(time.UnixMilli(startTimestamp))
if durationUntilStart < 0 {
lc.Debugf("The startTimestamp is expired for the scheduled job: %s, which will be started immediately. Correlation-ID: %s", job.Name, correlationId)
durationUntilStart = 0
} else if durationUntilStart > 0 {
lc.Debugf("The scheduled job: %s will be started at %v (timestamp: %v). Correlation-ID: %s", job.Name, time.UnixMilli(startTimestamp), startTimestamp, correlationId)
}

time.AfterFunc(durationUntilStart, func() {
err := schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
lc.Errorf("Failed to start the scheduled job: %s based on startTimestamp. Error: %v, Correlation-ID: %s", job.Name, err, correlationId)
}
})

if endTimestamp != 0 {
durationUntilEnd := time.Until(time.UnixMilli(endTimestamp))
if durationUntilEnd < 0 {
lc.Debugf("The endTimestamp is expired for the scheduled job: %s, which will be stopped immediately. Correlation-ID: %s", job.Name, correlationId)
durationUntilEnd = 0
} else if durationUntilEnd > 0 {
lc.Debugf("The scheduled job: %s will be stopped at %v (timestamp: %v). Correlation-ID: %s", job.Name, time.UnixMilli(endTimestamp), endTimestamp, correlationId)
}

time.AfterFunc(durationUntilEnd, func() {
err := schedulerManager.StopScheduleJobByName(job.Name, correlationId)
if err != nil {
lc.Errorf("Failed to stop the scheduled job: %s based on endTimestamp. Error: %v, Correlation-ID: %s", job.Name, err, correlationId)
}
})
}
}

0 comments on commit f6f99ba

Please sign in to comment.