diff --git a/go.mod b/go.mod index 20c855bc94..a3191bab0b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50 github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12 - github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37 + github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38 github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31 github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.9 github.com/fxamacker/cbor/v2 v2.7.0 diff --git a/go.sum b/go.sum index 2419f2315f..926cc45296 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50 h1:i6VyieS5P7olGlhG1Wt github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50/go.mod h1:oOuvWXdu6YaB2J17pe4X0ey66AZFyTzOmAZDQxPGGmM= github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12 h1:JGZ9fsyCZOgbNkg+qdW9JN63NKIEX95v5zJhCVdlp10= github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12/go.mod h1:v7CvWGVmTh8dKItDNtfdBnYTeLhfZP5YmFiLsGJL9KU= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37 h1:+LgoPgGqubjTrxCQBBms054gGBvnWSWfto666JRBL5Q= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.37/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38 h1:tLhD+x0wvvMr11KFof7E95pKMBuntxCYjwC6XEhCJxg= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE= github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31 h1:mC0ZguoK8HjVxeD7dIiXRqKswM0y7gnPQJt1fLOh/v4= github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31/go.mod h1:gcHtufkjd6oa3ZLqfzp66bCyCPx8MZe8Pwzh+2ITFnw= github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.13 h1:LkaF2eOpSz4eUiGpah4a9r+cB/A0Pea3Nh7aTU9hlKs= diff --git a/internal/support/cronscheduler/application/schedulejob.go b/internal/support/cronscheduler/application/schedulejob.go index 9a182cba10..511c05b849 100644 --- a/internal/support/cronscheduler/application/schedulejob.go +++ b/internal/support/cronscheduler/application/schedulejob.go @@ -8,6 +8,7 @@ package application import ( "context" "fmt" + "time" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v3/di" @@ -38,15 +39,7 @@ func AddScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Contain return "", errors.NewCommonEdgeXWrapper(err) } - if job.AdminState == models.Unlocked { - err = schedulerManager.StartScheduleJobByName(job.Name, correlationId) - if err != nil { - return "", errors.NewCommonEdgeXWrapper(err) - } - } else { - lc.Debugf("The scheduled job is created but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId) - return addedJob.Id, nil - } + arrangeScheduleJob(ctx, job, dic) lc.Debugf("Successfully created the scheduled job. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId) return addedJob.Id, nil @@ -130,15 +123,7 @@ func PatchScheduleJob(ctx context.Context, dto dtos.UpdateScheduleJob, dic *di.C return errors.NewCommonEdgeXWrapper(err) } - if job.AdminState == models.Unlocked { - err = schedulerManager.StartScheduleJobByName(job.Name, correlationId) - if err != nil { - return errors.NewCommonEdgeXWrapper(err) - } - } else { - lc.Debugf("The scheduled job is updated but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId) - return nil - } + arrangeScheduleJob(ctx, job, dic) lc.Debugf("Successfully patched the scheduled job: %s. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId) return nil @@ -209,24 +194,66 @@ func LoadScheduleJobsToSchedulerManager(ctx context.Context, dic *di.Container) return errors.NewCommonEdgeXWrapper(err) } - if job.AdminState == models.Unlocked { - err := schedulerManager.StartScheduleJobByName(job.Name, correlationId) - if err != nil { - return errors.NewCommonEdgeXWrapper(err) - } + arrangeScheduleJob(ctx, job, dic) + if job.AdminState == models.Unlocked { // Get the latest schedule action records by job name and generate missed schedule action records latestRecords := getLatestRecordsByJobName(job.Name, allLatestRecords) err = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords) if err != nil { return errors.NewCommonEdgeXWrapper(err) } - } else { - lc.Debugf("The scheduled job is loaded but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId) - return nil } - lc.Debugf("Successfully loaded and started the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId) + + lc.Debugf("Successfully loaded the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId) } return nil } + +// handleScheduleJob handles the schedule job based on the startTimestamp and endTimestamp +func arrangeScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Container) { + schedulerManager := container.SchedulerManagerFrom(dic.Get) + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + correlationId := correlation.FromContext(ctx) + + if job.AdminState != models.Unlocked { + lc.Debugf("The scheduled job is ready but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId) + return + } + + startTimestamp := job.Definition.GetBaseScheduleDef().StartTimestamp + endTimestamp := job.Definition.GetBaseScheduleDef().EndTimestamp + + durationUntilStart := time.Until(time.UnixMilli(startTimestamp)) + if durationUntilStart < 0 { + lc.Debugf("The startTimestamp is expired for the scheduled job: %s, which will be started immediately. Correlation-ID: %s", job.Name, correlationId) + durationUntilStart = 0 + } else if durationUntilStart > 0 { + lc.Debugf("The scheduled job: %s will be started at %v (timestamp: %v). Correlation-ID: %s", job.Name, time.UnixMilli(startTimestamp), startTimestamp, correlationId) + } + + time.AfterFunc(durationUntilStart, func() { + err := schedulerManager.StartScheduleJobByName(job.Name, correlationId) + if err != nil { + lc.Errorf("Failed to start the scheduled job: %s based on startTimestamp. Error: %v, Correlation-ID: %s", job.Name, err, correlationId) + } + }) + + if endTimestamp != 0 { + durationUntilEnd := time.Until(time.UnixMilli(endTimestamp)) + if durationUntilEnd < 0 { + lc.Debugf("The endTimestamp is expired for the scheduled job: %s, which will be stopped immediately. Correlation-ID: %s", job.Name, correlationId) + durationUntilEnd = 0 + } else if durationUntilEnd > 0 { + lc.Debugf("The scheduled job: %s will be stopped at %v (timestamp: %v). Correlation-ID: %s", job.Name, time.UnixMilli(endTimestamp), endTimestamp, correlationId) + } + + time.AfterFunc(durationUntilEnd, func() { + err := schedulerManager.StopScheduleJobByName(job.Name, correlationId) + if err != nil { + lc.Errorf("Failed to stop the scheduled job: %s based on endTimestamp. Error: %v, Correlation-ID: %s", job.Name, err, correlationId) + } + }) + } +}