Skip to content

Commit

Permalink
enable N-pollers in the kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Oct 6, 2024
1 parent dd34094 commit 613e59d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 21 deletions.
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Config struct {
// Standard Error I/O channel
StdErr chan<- error

// Size of poller pool in the system
PollerPool int

// Frequency to poll broker api
PollFrequency time.Duration

Expand All @@ -87,6 +90,7 @@ func NewConfig() Config {
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollerPool: 1,
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
Expand Down Expand Up @@ -163,6 +167,13 @@ func WithLogStdErr() Option {
}
}

// Number of poller in the system
func WithPollerPool(n int) Option {
return func(conf *Config) {
conf.PollerPool = n
}
}

// Frequency to poll broker api
func WithPollFrequency(t time.Duration) Option {
return func(conf *Config) {
Expand Down
47 changes: 27 additions & 20 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type Dequeuer struct {
func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
ctx, can := context.WithCancel(context.Background())

// Must not be 0
if config.PollerPool == 0 {
config.PollerPool = 1
}

return &Dequeuer{
Config: config,
context: ctx,
Expand Down Expand Up @@ -114,29 +119,31 @@ func (k *Dequeuer) receive() {
}
}

k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel receive loop started")

exit:
for {
select {
case <-k.context.Done():
break exit
default:
}
for pid := 0; pid < k.Config.PollerPool; pid++ {
k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel poller started", "pid", pid)

exit:
for {
select {
case <-k.context.Done():
break exit
default:
}

select {
case <-k.context.Done():
break exit
case <-time.After(k.Config.PollFrequency):
asker()
select {
case <-k.context.Done():
break exit
case <-time.After(k.Config.PollFrequency):
asker()
}
}
}

k.WaitGroup.Done()
slog.Debug("kernel receive loop stopped")
}()
k.WaitGroup.Done()
slog.Debug("kernel poller stopped", "pid", pid)
}()
}
}

// Dequeue creates pair of channels within kernel to enqueue messages
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package swarm

const Version = "v0.20.1"
const Version = "v0.20.2"

0 comments on commit 613e59d

Please sign in to comment.