Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
evlekht committed Oct 14, 2024
1 parent 776406a commit 2d9f410
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 67 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jmoiron/sqlx v1.4.0
github.com/jonboulle/clockwork v0.4.0
github.com/klauspost/compress v1.17.10
github.com/mattn/go-sqlite3 v1.14.23
github.com/spf13/cobra v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down
3 changes: 2 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/chain4travel/camino-messenger-bot/config"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jonboulle/clockwork"
"maunium.net/go/mautrix/id"

"github.com/chain4travel/camino-messenger-bot/internal/compression"
Expand Down Expand Up @@ -180,7 +181,7 @@ func NewApp(ctx context.Context, cfg *config.Config, logger *zap.SugaredLogger)
return nil, err
}

scheduler := scheduler.New(logger, storage)
scheduler := scheduler.New(logger, storage, clockwork.NewRealClock())
scheduler.RegisterJobHandler(cashInJobName, func() {
_ = chequeHandler.CashIn(context.Background())
})
Expand Down
125 changes: 125 additions & 0 deletions pkg/scheduler/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -41,12 +42,13 @@ type Scheduler interface {
RegisterJobHandler(jobName string, jobHandler func())
}

func New(logger *zap.SugaredLogger, storage Storage) Scheduler {
func New(logger *zap.SugaredLogger, storage Storage, clock clockwork.Clock) Scheduler {
return &scheduler{
storage: storage,
logger: logger,
registry: make(map[string]func()),
timers: make(map[string]*timer),
clock: clock,
}
}

Expand All @@ -57,6 +59,7 @@ type scheduler struct {
timers map[string]*timer
registryLock sync.RWMutex
timersLock sync.RWMutex
clock clockwork.Clock
}

// Start starts the scheduler. Jobs that are already due are executed immediately.
Expand Down Expand Up @@ -84,7 +87,7 @@ func (s *scheduler) Start(ctx context.Context) error {
jobName := job.Name
period := job.Period

now := time.Now()
now := s.clock.Now()
timeUntilFirstExecution := time.Duration(0)
if job.ExecuteAt.After(now) {
timeUntilFirstExecution = job.ExecuteAt.Sub(now)
Expand All @@ -99,7 +102,7 @@ func (s *scheduler) Start(ctx context.Context) error {
jobHandler()
}

timer := newTimer()
timer := newTimer(s.clock)
doneCh := timer.StartOnce(timeUntilFirstExecution, handler)
go func() {
<-doneCh
Expand Down Expand Up @@ -138,7 +141,7 @@ func (s *scheduler) Schedule(ctx context.Context, period time.Duration, jobName
return err
}

executeAt := time.Now().Add(period)
executeAt := s.clock.Now().Add(period)

if job != nil {
job.Period = period
Expand Down Expand Up @@ -181,7 +184,7 @@ func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string)
return err
}

job.ExecuteAt = time.Now().Add(job.Period)
job.ExecuteAt = s.clock.Now().Add(job.Period)

if err := s.storage.UpsertJob(ctx, session, job); err != nil {
s.logger.Errorf("failed to store scheduled job: %v", err)
Expand Down
Loading

0 comments on commit 2d9f410

Please sign in to comment.