diff --git a/config.go b/config.go index c444ca8..5343ff5 100644 --- a/config.go +++ b/config.go @@ -62,6 +62,9 @@ type Config struct { // Standard Error I/O channel StdErr chan<- error + // Size of poller pool in the system + PollerPool int + // Frequency to poll broker api PollFrequency time.Duration @@ -87,6 +90,7 @@ func NewConfig() Config { CapRcv: 0, CapAck: 0, Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), + PollerPool: 1, PollFrequency: 10 * time.Millisecond, TimeToFlight: 5 * time.Second, NetworkTimeout: 5 * time.Second, @@ -163,6 +167,13 @@ func WithLogStdErr() Option { } } +// Number of poller in the system +func WithPollerPool(n int) Option { + return func(conf *Config) { + conf.PollerPool = n + } +} + // Frequency to poll broker api func WithPollFrequency(t time.Duration) Option { return func(conf *Config) { diff --git a/kernel/cathode.go b/kernel/cathode.go index d73a08d..0095c50 100644 --- a/kernel/cathode.go +++ b/kernel/cathode.go @@ -54,6 +54,11 @@ type Dequeuer struct { func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer { ctx, can := context.WithCancel(context.Background()) + // Must not be 0 + if config.PollerPool == 0 { + config.PollerPool = 1 + } + return &Dequeuer{ Config: config, context: ctx, @@ -114,29 +119,31 @@ func (k *Dequeuer) receive() { } } - k.WaitGroup.Add(1) - go func() { - slog.Debug("kernel receive loop started") - - exit: - for { - select { - case <-k.context.Done(): - break exit - default: - } + for pid := 0; pid < k.Config.PollerPool; pid++ { + k.WaitGroup.Add(1) + go func() { + slog.Debug("kernel poller started", "pid", pid) + + exit: + for { + select { + case <-k.context.Done(): + break exit + default: + } - select { - case <-k.context.Done(): - break exit - case <-time.After(k.Config.PollFrequency): - asker() + select { + case <-k.context.Done(): + break exit + case <-time.After(k.Config.PollFrequency): + asker() + } } - } - k.WaitGroup.Done() - slog.Debug("kernel receive loop stopped") - }() + k.WaitGroup.Done() + slog.Debug("kernel poller stopped", "pid", pid) + }() + } } // Dequeue creates pair of channels within kernel to enqueue messages diff --git a/version.go b/version.go index c16284b..c83c9d7 100644 --- a/version.go +++ b/version.go @@ -8,4 +8,4 @@ package swarm -const Version = "v0.20.1" +const Version = "v0.20.2"