Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
evlekht committed Oct 15, 2024
1 parent 90156dc commit 4f09194
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 64 deletions.
21 changes: 11 additions & 10 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New(logger *zap.SugaredLogger, storage Storage, clock clockwork.Clock) Sche
storage: storage,
logger: logger,
registry: make(map[string]func()),
timers: make(map[string]*timer),
timers: make(map[string]*Timer),
clock: clock,
}
}
Expand All @@ -56,9 +56,9 @@ type scheduler struct {
logger *zap.SugaredLogger
storage Storage
registry map[string]func()
timers map[string]*timer
timers map[string]*Timer
registryLock sync.RWMutex
timersLock sync.RWMutex
timersLock sync.Mutex
clock clockwork.Clock
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *scheduler) Start(ctx context.Context) error {
jobHandler()
}

timer := newTimer(s.clock)
timer := NewTimer(s.clock)
doneCh := timer.StartOnce(timeUntilFirstExecution, handler)
go func() {
<-doneCh
Expand All @@ -116,12 +116,13 @@ func (s *scheduler) Start(ctx context.Context) error {
}

func (s *scheduler) Stop() error {
s.timersLock.RLock()
for _, timer := range s.timers {
timer.Stop()
s.timersLock.Lock()
for jobName, timer := range s.timers {
timer.Stop() // will block until timer is stopped, but not until the job handler is finished
delete(s.timers, jobName)
}
s.timersLock.RUnlock()
// TODO @evlekht await all ongoing job handlers to finish
s.timersLock.Unlock()
// TODO @evlekht await all ongoing job handlers to finish ?
return nil
}

Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *scheduler) getJobHandler(jobName string) (func(), error) {
return jobHandler, nil
}

func (s *scheduler) setJobTimer(jobName string, t *timer) {
func (s *scheduler) setJobTimer(jobName string, t *Timer) {
s.timersLock.Lock()
s.timers[jobName] = t
s.timersLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestScheduler_Start(t *testing.T) {
ctrl := gomock.NewController(t)
storage := NewMockStorage(ctrl)
epsilon := time.Millisecond
timeout := 10000 * time.Millisecond
timeout := 10 * time.Millisecond

earlyJobExecuted := make(chan string)
nowJobExecuted := make(chan string)
Expand Down
110 changes: 60 additions & 50 deletions pkg/scheduler/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,86 @@ import (
"github.com/jonboulle/clockwork"
)

func newTimer(clock clockwork.Clock) *timer {
t := clock.NewTimer(time.Second)
t.Stop()
return &timer{
timer: t,
stopCh: make(chan struct{}, 1),
func NewTimer(clock clockwork.Clock) *Timer {
timer := &Timer{
clock: clock,
timer: clock.NewTimer(time.Second),
stopChan: make(chan time.Time),
}
timer.timer.Stop()
timer.stopped.Store(true)
return timer
}

type timer struct {
timer clockwork.Timer
stopped atomic.Bool
stopCh chan struct{}
type Timer struct {
clock clockwork.Clock
timer clockwork.Timer
stopped atomic.Bool
stopChan chan time.Time
once bool
}

// StartOnce starts the timer once and starts goroutine with [f] call when the timer expires.
// Returns a channel that is closed upon completion.
// Returns a channel that will receive stop-time and will be closed after timer is stopped.
//
// Should not be called on already running timer.
func (t *timer) StartOnce(d time.Duration, f func()) chan struct{} {
t.timer.Reset(d)
stopSignalCh := make(chan struct{})
go func() {
defer close(stopSignalCh)
for {
select {
case <-t.stopCh:
t.stopped.Store(true) // TODO@ test
return
case <-t.timer.Chan():
go f()
t.Stop()
}
}
}()
return stopSignalCh
// Should not be called on already running timer, will panic.
func (t *Timer) StartOnce(d time.Duration, f func()) chan time.Time {
if !t.stopped.Load() {
panic("timer is already running")
}
t.once = true
return t.start(d, f)
}

// Start starts the timer and starts goroutine with [f] call when the timer ticks.
// Returns a channel that is closed after timer is stopped.
// Returns a channel that will receive stop-time and will be closed after timer is stopped.
//
// Should not be called on already running timer.
func (t *timer) Start(d time.Duration, f func()) chan struct{} {
// Should not be called on already running timer, will panic.
func (t *Timer) Start(d time.Duration, f func()) chan time.Time {
if !t.stopped.Load() {
panic("timer is already running")
}
t.once = false
return t.start(d, f)
}

// Stop stops the timer. If the timer is already stopped, it panics.
func (t *Timer) Stop() {
if t.stopped.Load() {
panic("timer is already stopped")
}
t.stopChan <- t.clock.Now()
}

// IsStopped returns true if the timer is stopped.
func (t *Timer) IsStopped() bool {
return t.stopped.Load()
}

func (t *Timer) start(d time.Duration, f func()) chan time.Time {
t.stopped.Store(false)
t.timer.Reset(d)
stopSignalCh := make(chan struct{})
stopTimeChan := make(chan time.Time)

go func() {
defer close(stopSignalCh)
for {
select {
case <-t.stopCh:
t.stopped.Store(true) // TODO@ test
case stopTime := <-t.stopChan:
t.timer.Stop()
t.stopped.Store(true)
stopTimeChan <- stopTime
close(stopTimeChan) // will be blocked until stopTimeCh read, but its fine
return
case <-t.timer.Chan():
go f()
t.timer.Reset(d)
if t.once {
t.Stop()
} else {
t.timer.Reset(d)
}
}
}
}()
return stopSignalCh
}

// Stop stops the timer.
//
// Stop should not be called on already stopped timer.
func (t *timer) Stop() {
t.stopCh <- struct{}{}
t.timer.Stop()
}

// TODO@ test
func (t *timer) IsStopped() bool {
return t.stopped.Load()
return stopTimeChan
}
8 changes: 5 additions & 3 deletions pkg/scheduler/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"github.com/stretchr/testify/require"
)

// TODO@ update tests after implementation changes

func TestTimer_StartOnce(t *testing.T) {
t.Run("function is called after duration", func(t *testing.T) {
require := require.New(t)
clock := clockwork.NewFakeClockAt(time.Unix(0, 100))
timer := newTimer(clock)
timer := NewTimer(clock)
duration := time.Millisecond
timeout := 10 * time.Millisecond
epsilon := time.Millisecond
Expand Down Expand Up @@ -54,7 +56,7 @@ func TestTimer_StartOnce(t *testing.T) {
t.Run("timer is stopped manually", func(t *testing.T) {
require := require.New(t)
clock := clockwork.NewFakeClockAt(time.Unix(0, 100))
timer := newTimer(clock)
timer := NewTimer(clock)
duration := time.Millisecond
timeout := 10 * time.Millisecond
epsilon := time.Millisecond
Expand Down Expand Up @@ -89,7 +91,7 @@ func TestTimer_StartOnce(t *testing.T) {
func TestTimer_Start(t *testing.T) {
require := require.New(t)
clock := clockwork.NewFakeClockAt(time.Unix(0, 100))
timer := newTimer(clock)
timer := NewTimer(clock)

duration := time.Millisecond
timeout := 10 * time.Millisecond
Expand Down

0 comments on commit 4f09194

Please sign in to comment.