diff --git a/migrations/scheduler/2_last_executed_at.down.sql b/migrations/scheduler/2_last_executed_at.down.sql new file mode 100644 index 00000000..249e69c4 --- /dev/null +++ b/migrations/scheduler/2_last_executed_at.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE jobs +DROP last_executed_at; + +ALTER TABLE jobs +ADD execute_at BIGINT NOT NULL; \ No newline at end of file diff --git a/migrations/scheduler/2_last_executed_at.up.sql b/migrations/scheduler/2_last_executed_at.up.sql new file mode 100644 index 00000000..4c0d0566 --- /dev/null +++ b/migrations/scheduler/2_last_executed_at.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE jobs +DROP execute_at; + +ALTER TABLE jobs +ADD last_executed_at BIGINT NOT NULL DEFAULT 0; diff --git a/pkg/database/sqlite/storage.go b/pkg/database/sqlite/storage.go index 305bad0a..2f783488 100644 --- a/pkg/database/sqlite/storage.go +++ b/pkg/database/sqlite/storage.go @@ -35,7 +35,7 @@ func New(logger *zap.SugaredLogger, cfg DBConfig, dbName string) (*DB, error) { DB: db, } - if err := s.migrate(dbName, cfg.MigrationsPath); err != nil { + if err := s.migrate(dbName, cfg.MigrationsPath, false); err != nil { return nil, err } @@ -55,7 +55,21 @@ func (s *DB) Close() error { return nil } -func (s *DB) migrate(dbName, migrationsPath string) error { +var _ migrate.Logger = (*migrationLogger)(nil) + +type migrationLogger struct { + *zap.SugaredLogger +} + +func (l *migrationLogger) Printf(format string, v ...interface{}) { + l.Infof(format, v...) +} + +func (l *migrationLogger) Verbose() bool { + return false +} + +func (s *DB) migrate(dbName, migrationsPath string, logMigrations bool) error { s.Logger.Infof("Performing db migrations...") driver, err := sqlite3.WithInstance(s.DB.DB, &sqlite3.Config{}) @@ -70,6 +84,10 @@ func (s *DB) migrate(dbName, migrationsPath string) error { return err } + if logMigrations { + migration.Log = &migrationLogger{s.Logger} + } + version, dirty, err := migration.Version() if err != nil && !errors.Is(err, migrate.ErrNilVersion) { s.Logger.Error(err) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 993ab44c..704e1ed5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -49,8 +49,9 @@ func TestScheduler_Start(t *testing.T) { // job that was never executed // and next execution should be scheduled right when scheduler starts freshJob := Job{ - Name: "fresh_job", - Period: time.Duration(1005), + Name: "fresh_job", + LastExecutedAt: time.Unix(0, 0), + Period: time.Duration(1005), } // job that was executed before scheduler starts diff --git a/pkg/scheduler/storage/sqlite/jobs.go b/pkg/scheduler/storage/sqlite/jobs.go index ef3279df..39d5042c 100644 --- a/pkg/scheduler/storage/sqlite/jobs.go +++ b/pkg/scheduler/storage/sqlite/jobs.go @@ -103,15 +103,17 @@ func (s *storage) prepareJobsStmts(ctx context.Context) error { upsertJob, err := s.base.DB.PrepareNamedContext(ctx, fmt.Sprintf(` INSERT INTO %s ( name, - execute_at, + last_executed_at, period ) VALUES ( :name, - :execute_at, + :last_executed_at, :period ) ON CONFLICT(name) - DO UPDATE SET period = excluded.period + DO UPDATE SET + period = excluded.period, + last_executed_at = excluded.last_executed_at `, jobsTableName)) if err != nil { s.base.Logger.Error(err)