Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditionally Enabled Work Types #125

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions pkg/rsqueue/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ type Queue interface {
type QueueSupportedTypes interface {
Enabled() []uint64
SetEnabled(typeId uint64, enabled bool)
SetEnabledConditional(typeId uint64, enabled func() bool)
DisableAll()
}

type Enabled struct {
Always bool
Conditional func() bool
}

type DefaultQueueSupportedTypes struct {
types map[uint64]bool
types map[uint64]Enabled
mutex sync.RWMutex
}

Expand All @@ -94,7 +100,7 @@ func (d *DefaultQueueSupportedTypes) Enabled() []uint64 {
defer d.mutex.RUnlock()
results := make([]uint64, 0)
for i, enabled := range d.types {
if enabled {
if enabled.Always || (enabled.Conditional != nil && enabled.Conditional()) {
results = append(results, i)
}
}
Expand All @@ -103,18 +109,27 @@ func (d *DefaultQueueSupportedTypes) Enabled() []uint64 {

func (d *DefaultQueueSupportedTypes) SetEnabled(typeId uint64, enabled bool) {
if d.types == nil {
d.types = make(map[uint64]bool)
d.types = make(map[uint64]Enabled)
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.types[typeId] = Enabled{Always: enabled}
}

func (d *DefaultQueueSupportedTypes) SetEnabledConditional(typeId uint64, enabled func() bool) {
if d.types == nil {
d.types = make(map[uint64]Enabled)
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.types[typeId] = enabled
d.types[typeId] = Enabled{Conditional: enabled}
}

func (d *DefaultQueueSupportedTypes) DisableAll() {
d.mutex.Lock()
defer d.mutex.Unlock()
for i := range d.types {
d.types[i] = false
d.types[i] = Enabled{Always: false}
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/rsqueue/runnerfactory/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,24 @@ func (r *RunnerFactory) Add(workType uint64, runner queue.WorkRunner) {
r.types.SetEnabled(workType, true)
}

func (r *RunnerFactory) AddConditional(workType uint64, enabled func() bool, runner queue.WorkRunner) {
r.runners[workType] = runner
r.types.SetEnabledConditional(workType, enabled)
}

// Run runs work if the work type is configured. Note that this doesn't check to
// see if the work type is enabled (in r.types).
func (r *RunnerFactory) Run(work queue.RecursableWork) error {

runner, ok := r.runners[work.WorkType]
if !ok {
return fmt.Errorf("Invalid work type %d", work.WorkType)
return fmt.Errorf("invalid work type %d", work.WorkType)
}

return runner.Run(work)
}

// Stops all the runners in the factory. After each runner is stopped,
// Stop all the runners in the factory. After each runner is stopped,
// it is marked as disabled so that we won't attempt to grab future
// work for that runner from the queue.
func (r *RunnerFactory) Stop(timeout time.Duration) error {
Expand Down
26 changes: 25 additions & 1 deletion pkg/rsqueue/runnerfactory/runnerfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,31 @@ func (s *RunnerFactorySuite) TestNewRunner(c *check.C) {
Work: []byte{},
WorkType: 2,
})
c.Assert(err, check.ErrorMatches, "Invalid work type 2")
c.Assert(err, check.ErrorMatches, "invalid work type 2")
}

func (s *RunnerFactorySuite) TestRunnerConditional(c *check.C) {
types := &queue.DefaultQueueSupportedTypes{}
r := NewRunnerFactory(RunnerFactoryConfig{SupportedTypes: types})
c.Check(r, check.DeepEquals, &RunnerFactory{
runners: make(map[uint64]queue.WorkRunner),
types: types,
})

yes := func() bool {
return true
}
no := func() bool {
return false
}

// Add a runner
fr1 := &FakeRunnerOne{}
fr2 := &FakeRunnerTwo{}
r.AddConditional(0, yes, fr1)
r.AddConditional(1, no, fr2)
c.Check(r.runners, check.HasLen, 2)
c.Check(r.types.Enabled(), check.DeepEquals, []uint64{0})
}

func (s *RunnerFactorySuite) TestStop(c *check.C) {
Expand Down
Loading