Skip to content

Commit

Permalink
feat: Check AdminState in Cron Scheduler Service
Browse files Browse the repository at this point in the history
closes: #4856

- The scheduled job should be stopped if adminState is set to LOCKED
- Modify the timestamp in postgres DB

Signed-off-by: Jack Chen <[email protected]>
  • Loading branch information
jackchenjc committed Aug 9, 2024
1 parent 2b98b00 commit 2583494
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 26 deletions.
8 changes: 4 additions & 4 deletions cmd/support-cron-scheduler/res/db/sql/01-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS scheduler.schedule_job (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
content JSONB NOT NULL,
created timestamptz NOT NULL DEFAULT now(),
modified timestamptz NOT NULL DEFAULT now()
created timestamp NOT NULL DEFAULT now(),
modified timestamp NOT NULL DEFAULT now()
);

-- scheduler.schedule_action_record is used to store the schedule action record
Expand All @@ -19,6 +19,6 @@ CREATE TABLE IF NOT EXISTS scheduler.schedule_action_record (
job_name TEXT NOT NULL,
action JSONB NOT NULL,
status TEXT NOT NULL,
scheduled_at timestamptz NOT NULL,
created timestamptz NOT NULL
scheduled_at timestamp NOT NULL,
created timestamp NOT NULL
);
15 changes: 8 additions & 7 deletions internal/pkg/infrastructure/postgres/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
model "github.com/edgexfoundry/go-mod-core-contracts/v3/models"

pkgCommon "github.com/edgexfoundry/edgex-go/internal/pkg/common"
pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres"
)

Expand Down Expand Up @@ -114,15 +113,16 @@ func addScheduleJob(ctx context.Context, connPool *pgxpool.Pool, scheduleJob mod
}

func updateScheduleJob(ctx context.Context, connPool *pgxpool.Pool, updatedScheduleJob model.ScheduleJob) errors.EdgeX {
updatedScheduleJob.Modified = pkgCommon.MakeTimestamp()
modified := time.Now()
updatedScheduleJob.Modified = modified.UnixMilli()

// Marshal the scheduleJob to store it in the database
updatedScheduleJobJSONBytes, err := json.Marshal(updatedScheduleJob)
if err != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON marshal schedule job for Postgres persistence", err)
}

_, err = connPool.Exec(ctx, sqlUpdateContentByName(scheduleJobTable), updatedScheduleJobJSONBytes, updatedScheduleJob.Modified, updatedScheduleJob.Name)
_, err = connPool.Exec(ctx, sqlUpdateContentByName(scheduleJobTable), updatedScheduleJobJSONBytes, modified, updatedScheduleJob.Name)
if err != nil {
return pgClient.WrapDBError("failed to update schedule job", err)
}
Expand Down Expand Up @@ -156,8 +156,8 @@ func queryScheduleJob(ctx context.Context, connPool *pgxpool.Pool, sql string, a
return job, pgClient.WrapDBError("failed to query scheduler.schedule_job table", err)
}

job.Created = created.Unix()
job.Modified = modified.Unix()
job.Created = created.UnixMilli()
job.Modified = modified.UnixMilli()

job, err = toScheduleJobsModel(job, scheduleJobJSONBytes)
if err != nil {
Expand All @@ -184,8 +184,8 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string,
return nil, pgClient.WrapDBError("failed to scan schedule job", err)
}

job.Created = created.Unix()
job.Modified = modified.Unix()
job.Created = created.UnixMilli()
job.Modified = modified.UnixMilli()

job, err = toScheduleJobsModel(job, scheduleJobJSONBytes)
if err != nil {
Expand Down Expand Up @@ -231,6 +231,7 @@ func toScheduleJobsModel(scheduleJobs model.ScheduleJob, scheduleJobJSONBytes []
}

scheduleJobs.Actions = storedJob.Actions
scheduleJobs.AdminState = storedJob.AdminState
scheduleJobs.Definition = storedJob.Definition
scheduleJobs.Labels = storedJob.Labels
scheduleJobs.Properties = storedJob.Properties
Expand Down
23 changes: 19 additions & 4 deletions internal/support/cronscheduler/application/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ func AddScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Contain
return "", errors.NewCommonEdgeXWrapper(err)
}

err = schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
if job.AdminState == models.Unlocked {
err = schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
}
} else {
lc.Debugf("The scheduled job is created but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId)
return addedJob.Id, nil
}

lc.Debugf("Successfully created the scheduled job. ScheduleJob ID: %s, Correlation-ID: %s", addedJob.Id, correlationId)
Expand Down Expand Up @@ -125,6 +130,16 @@ func PatchScheduleJob(ctx context.Context, dto dtos.UpdateScheduleJob, dic *di.C
return errors.NewCommonEdgeXWrapper(err)
}

if job.AdminState == models.Unlocked {
err = schedulerManager.StartScheduleJobByName(job.Name, correlationId)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
} else {
lc.Debugf("The scheduled job is updated but not started because the admin state is locked. ScheduleJob ID: %s, Correlation-ID: %s", job.Id, correlationId)
return nil
}

lc.Debugf("Successfully patched the scheduled job: %s. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
return nil
}
Expand All @@ -143,7 +158,7 @@ func scheduleJobByDTO(ctx context.Context, dbClient interfaces.DBClient, dto dto
}
}
if dto.Name != nil && *dto.Name != job.Name {
return job, errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("schedule job name '%s' not match the exsting '%s' ", *dto.Name, job.Name), nil)
return job, errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("scheduled job name '%s' not match the exsting '%s' ", *dto.Name, job.Name), nil)
}
return job, nil
}
Expand Down
22 changes: 11 additions & 11 deletions internal/support/cronscheduler/infrastructure/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func NewManager(lc logger.LoggingClient, dic *di.Container, config *config.Confi
func (m *manager) AddScheduleJob(job models.ScheduleJob, correlationId string) errors.EdgeX {
if _, err := m.getSchedulerByJobName(job.Name); err == nil {
return errors.NewCommonEdgeX(errors.KindStatusConflict,
fmt.Sprintf("the schedule job with name: %s already exists", job.Name), nil)
fmt.Sprintf("the scheduled job with name: %s already exists", job.Name), nil)
}

if err := m.addNewJob(job); err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

m.lc.Infof("New schedule job %s was added into the scheduler manager. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
m.lc.Infof("New scheduled job %s was added into the scheduler manager. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
return nil
}

Expand All @@ -69,13 +69,13 @@ func (m *manager) UpdateScheduleJob(job models.ScheduleJob, correlationId string
if err := m.DeleteScheduleJobByName(job.Name, correlationId); err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
m.lc.Debugf("The old schedule job %s was removed from the scheduler manager while updating it. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
m.lc.Debugf("The old scheduled job %s was removed from the scheduler manager while updating it. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)

if err := m.addNewJob(job); err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

m.lc.Debugf("Schedule job %s was updated into the scheduler manager. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
m.lc.Debugf("Scheduled job %s was updated into the scheduler manager. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
return nil
}

Expand All @@ -91,7 +91,7 @@ func (m *manager) DeleteScheduleJobByName(name, correlationId string) errors.Edg
fmt.Sprintf("failed to shutdown and delete the scheduler for job: %s", name), err)
}

m.lc.Debugf("The schedule job %s was stopped and removed from the scheduler manager. Correlation-ID: %s", name, correlationId)
m.lc.Debugf("The scheduled job %s was stopped and removed from the scheduler manager. Correlation-ID: %s", name, correlationId)
return nil
}

Expand All @@ -103,7 +103,7 @@ func (m *manager) StartScheduleJobByName(name, correlationId string) errors.Edge
}

scheduler.Start()
m.lc.Debugf("The schedule job %s was started. Correlation-ID: %s", name, correlationId)
m.lc.Debugf("The scheduled job %s was started. Correlation-ID: %s", name, correlationId)
return nil
}

Expand All @@ -118,7 +118,7 @@ func (m *manager) StopScheduleJobByName(name, correlationId string) errors.EdgeX
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to stop all the actions for job: %s", name), err)
}

m.lc.Debugf("The schedule job %s was stopped in the scheduler manager. Correlation-ID: %s", name, correlationId)
m.lc.Debugf("The scheduled job %s was stopped in the scheduler manager. Correlation-ID: %s", name, correlationId)
return nil
}

Expand All @@ -135,7 +135,7 @@ func (m *manager) TriggerScheduleJobByName(name, correlationId string) errors.Ed
}
}

m.lc.Debugf("The schedule job %s has been triggerred manually. Correlation-ID: %s", name, correlationId)
m.lc.Debugf("The scheduled job %s has been triggerred manually. Correlation-ID: %s", name, correlationId)
return nil
}

Expand All @@ -147,7 +147,7 @@ func (m *manager) Shutdown(correlationId string) errors.EdgeX {
}
}

m.lc.Debugf("All schedule jobs were stopped and removed from the scheduler manager. Correlation-ID: %s", correlationId)
m.lc.Debugf("All scheduled jobs were stopped and removed from the scheduler manager. Correlation-ID: %s", correlationId)
return nil
}

Expand All @@ -157,7 +157,7 @@ func (m *manager) getSchedulerByJobName(name string) (gocron.Scheduler, errors.E
scheduler, exists := m.schedulers[name]
if !exists {
return nil, errors.NewCommonEdgeX(errors.KindStatusConflict,
fmt.Sprintf("the schedule job: %s does not exist", name), nil)
fmt.Sprintf("the scheduled job: %s does not exist", name), nil)
}
return scheduler, nil
}
Expand All @@ -184,7 +184,7 @@ func (m *manager) addNewJob(job models.ScheduleJob) errors.EdgeX {
_, err := scheduler.NewJob(definition, task)
if err != nil {
return errors.NewCommonEdgeX(errors.KindServerError,
fmt.Sprintf("failed to create new schedule aciton for job: %s", job.Name), err)
fmt.Sprintf("failed to create new scheduled aciton for job: %s", job.Name), err)
}
}

Expand Down

0 comments on commit 2583494

Please sign in to comment.