Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
evlekht committed Nov 28, 2024
1 parent 01b61db commit cc10514
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
5 changes: 5 additions & 0 deletions migrations/scheduler/2_last_executed_at.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE jobs
DROP last_executed_at;

ALTER TABLE jobs
ADD execute_at BIGINT NOT NULL;
5 changes: 5 additions & 0 deletions migrations/scheduler/2_last_executed_at.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE jobs
DROP execute_at;

ALTER TABLE jobs
ADD last_executed_at BIGINT NOT NULL DEFAULT 0;
22 changes: 20 additions & 2 deletions pkg/database/sqlite/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func New(logger *zap.SugaredLogger, cfg DBConfig, dbName string) (*DB, error) {
DB: db,
}

if err := s.migrate(dbName, cfg.MigrationsPath); err != nil {
if err := s.migrate(dbName, cfg.MigrationsPath, false); err != nil {
return nil, err
}

Expand All @@ -55,7 +55,21 @@ func (s *DB) Close() error {
return nil
}

func (s *DB) migrate(dbName, migrationsPath string) error {
var _ migrate.Logger = (*migrationLogger)(nil)

type migrationLogger struct {
*zap.SugaredLogger
}

func (l *migrationLogger) Printf(format string, v ...interface{}) {
l.Infof(format, v...)
}

func (l *migrationLogger) Verbose() bool {
return false
}

func (s *DB) migrate(dbName, migrationsPath string, logMigrations bool) error {
s.Logger.Infof("Performing db migrations...")

driver, err := sqlite3.WithInstance(s.DB.DB, &sqlite3.Config{})
Expand All @@ -70,6 +84,10 @@ func (s *DB) migrate(dbName, migrationsPath string) error {
return err
}

if logMigrations {
migration.Log = &migrationLogger{s.Logger}
}

version, dirty, err := migration.Version()
if err != nil && !errors.Is(err, migrate.ErrNilVersion) {
s.Logger.Error(err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func TestScheduler_Start(t *testing.T) {
// job that was never executed
// and next execution should be scheduled right when scheduler starts
freshJob := Job{
Name: "fresh_job",
Period: time.Duration(1005),
Name: "fresh_job",
LastExecutedAt: time.Unix(0, 0),
Period: time.Duration(1005),
}

// job that was executed before scheduler starts
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/storage/sqlite/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,17 @@ func (s *storage) prepareJobsStmts(ctx context.Context) error {
upsertJob, err := s.base.DB.PrepareNamedContext(ctx, fmt.Sprintf(`
INSERT INTO %s (
name,
execute_at,
last_executed_at,
period
) VALUES (
:name,
:execute_at,
:last_executed_at,
:period
)
ON CONFLICT(name)
DO UPDATE SET period = excluded.period
DO UPDATE SET
period = excluded.period,
last_executed_at = excluded.last_executed_at
`, jobsTableName))
if err != nil {
s.base.Logger.Error(err)
Expand Down

0 comments on commit cc10514

Please sign in to comment.