Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable N-pollers in the kernel #104

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Config struct {
// Standard Error I/O channel
StdErr chan<- error

// Size of poller pool in the system
PollerPool int

// Frequency to poll broker api
PollFrequency time.Duration

Expand All @@ -87,6 +90,7 @@ func NewConfig() Config {
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollerPool: 1,
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
Expand Down Expand Up @@ -163,6 +167,13 @@ func WithLogStdErr() Option {
}
}

// Number of poller in the system
func WithPollerPool(n int) Option {
return func(conf *Config) {
conf.PollerPool = n
}
}

// Frequency to poll broker api
func WithPollFrequency(t time.Duration) Option {
return func(conf *Config) {
Expand Down
47 changes: 27 additions & 20 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ type Dequeuer struct {
func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
ctx, can := context.WithCancel(context.Background())

// Must not be 0
if config.PollerPool == 0 {
config.PollerPool = 1
}

return &Dequeuer{
Config: config,
context: ctx,
Expand Down Expand Up @@ -114,29 +119,31 @@ func (k *Dequeuer) receive() {
}
}

k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel receive loop started")

exit:
for {
select {
case <-k.context.Done():
break exit
default:
}
for pid := 0; pid < k.Config.PollerPool; pid++ {
k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel poller started", "pid", pid)

exit:
for {
select {
case <-k.context.Done():
break exit
default:
}

select {
case <-k.context.Done():
break exit
case <-time.After(k.Config.PollFrequency):
asker()
select {
case <-k.context.Done():
break exit
case <-time.After(k.Config.PollFrequency):
asker()
}
}
}

k.WaitGroup.Done()
slog.Debug("kernel receive loop stopped")
}()
k.WaitGroup.Done()
slog.Debug("kernel poller stopped", "pid", pid)
}()
}
}

// Dequeue creates pair of channels within kernel to enqueue messages
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package swarm

const Version = "v0.20.1"
const Version = "v0.20.2"
Loading