Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kernel v0.20.1 with minor improvements #94

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
443 changes: 294 additions & 149 deletions README.md

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Msg[T any] struct {
// Error on the message processing
Error error

// I/O Context of the message, as obtained from broker
IOContext any

// Message decoded content
Object T
}
Expand All @@ -47,16 +50,20 @@ type Bag struct {
// Error on the message processing
Error error

// I/O Context of the message, as obtained from broker
IOContext any

// Message raw content
Object []byte
}

func ToMsg[T any](bag Bag, object T) Msg[T] {
return Msg[T]{
Category: bag.Category,
Digest: bag.Digest,
Error: bag.Error,
Object: object,
Category: bag.Category,
Digest: bag.Digest,
Error: bag.Error,
IOContext: bag.IOContext,
Object: object,
}
}

Expand Down
55 changes: 40 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/fogfish/swarm/kernel/backoff"
)

// Environment variable to config kernel
const (
EnvConfigPollFrequency = "CONFIG_SWARM_POLL_FREQUENCY"
EnvConfigTimeToFlight = "CONFIG_SWARM_TIME_TO_FLIGHT"
EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT"
)

// Grade of Service Policy
type Policy int

Expand Down Expand Up @@ -64,22 +71,26 @@ type Config struct {
// Timeout for any network operations
NetworkTimeout time.Duration

// Codec for binary packets
Codec Codec
// Fail fast the message if category is not known to kernel.
FailOnUnknownCategory bool

// PacketCodec for binary packets
PacketCodec Codec
}

func NewConfig() Config {
return Config{
Source: "github.com/fogfish/swarm",
Policy: PolicyAtLeastOnce,
CapOut: 0,
CapDlq: 0,
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
Source: "github.com/fogfish/swarm",
Policy: PolicyAtLeastOnce,
CapOut: 0,
CapDlq: 0,
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
FailOnUnknownCategory: false,
}
}

Expand Down Expand Up @@ -179,9 +190,9 @@ func WithNetworkTimeout(t time.Duration) Option {
// - CONFIG_SWARM_NETWORK_TIMEOUT
func WithConfigFromEnv() Option {
return func(conf *Config) {
conf.PollFrequency = durationFromEnv("CONFIG_SWARM_POLL_FREQUENCY", conf.PollFrequency)
conf.TimeToFlight = durationFromEnv("CONFIG_SWARM_TIME_TO_FLIGHT", conf.TimeToFlight)
conf.NetworkTimeout = durationFromEnv("CONFIG_SWARM_NETWORK_TIMEOUT", conf.NetworkTimeout)
conf.PollFrequency = durationFromEnv(EnvConfigPollFrequency, conf.PollFrequency)
conf.TimeToFlight = durationFromEnv(EnvConfigTimeToFlight, conf.TimeToFlight)
conf.NetworkTimeout = durationFromEnv(EnvConfigNetworkTimeout, conf.NetworkTimeout)
}
}

Expand Down Expand Up @@ -225,3 +236,17 @@ func WithPolicyAtLeastOnce(n int) Option {
conf.CapAck = n
}
}

// Configure codec for binary packets
func WithPacketCodec(codec Codec) Option {
return func(conf *Config) {
conf.PacketCodec = codec
}
}

// Fail fast the message if category is not known to kernel.
func WithFailOnUnknownCategory() Option {
return func(conf *Config) {
conf.FailOnUnknownCategory = true
}
}
4 changes: 2 additions & 2 deletions dequeue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Dequeuer, category ...string) (<-chan swarm.Msg[s

// Create pair of channels to receive and acknowledge pure binary
func Bytes(q *kernel.Dequeuer, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) {
if q.Config.Codec != nil {
return kernel.Dequeue(q, cat, q.Config.Codec)
if q.Config.PacketCodec != nil {
return kernel.Dequeue(q, cat, q.Config.PacketCodec)
}

return kernel.Dequeue(q, cat, encoding.NewCodecByte())
Expand Down
4 changes: 2 additions & 2 deletions enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Enqueuer, category ...string) (snd chan<- swarm.E

// Create pair of channels to emit pure binaries
func Bytes(q *kernel.Enqueuer, cat string) (snd chan<- []byte, dlq <-chan []byte) {
if q.Config.Codec != nil {
return kernel.Enqueue(q, cat, q.Config.Codec)
if q.Config.PacketCodec != nil {
return kernel.Enqueue(q, cat, q.Config.PacketCodec)
}

return kernel.Enqueue(q, cat, encoding.NewCodecByte())
Expand Down
4 changes: 2 additions & 2 deletions enqueue/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ type EmitterBytes struct {
// Creates synchronous emitter
func NewBytes(q *kernel.Enqueuer, category string) *EmitterBytes {
var codec swarm.Codec
if q.Config.Codec != nil {
codec = q.Config.Codec
if q.Config.PacketCodec != nil {
codec = q.Config.PacketCodec
} else {
codec = encoding.NewCodecByte()
}
Expand Down
27 changes: 26 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,35 @@

package swarm

import "github.com/fogfish/faults"
import (
"fmt"
"time"

"github.com/fogfish/faults"
)

const (
ErrServiceIO = faults.Type("service i/o failed")
ErrEnqueue = faults.Type("enqueue is failed")
ErrDequeue = faults.Type("dequeue is failed")
)

type errTimeout struct {
op string
timer time.Duration
}

func ErrTimeout(op string, timer time.Duration) error {
return errTimeout{
op: op,
timer: timer,
}
}

func (err errTimeout) Error() string {
return fmt.Sprintf("timeout %s after %s", err.op, err.timer)
}

func (err errTimeout) Timeout() time.Duration {
return err.timer
}
3 changes: 1 addition & 2 deletions kernel/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package kernel

import (
"context"
"fmt"
"time"

"github.com/fogfish/swarm"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (s *Bridge) Dispatch(seq []swarm.Bag) error {
case err := <-s.session:
return err
case <-time.After(s.timeToFlight):
return fmt.Errorf("ack timeout")
return swarm.ErrTimeout("ack", s.timeToFlight)
}
}

Expand Down
8 changes: 8 additions & 0 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kernel

import (
"context"
"fmt"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -102,6 +103,13 @@ func (k *Dequeuer) receive() {
k.Config.StdErr <- err
return
}
} else {
if k.Config.FailOnUnknownCategory {
slog.Error("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
k.Cathode.Err(k.context, bag.Digest, swarm.ErrDequeue.New(fmt.Errorf("unknown category %s ", bag.Category)))
} else {
slog.Warn("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
}
}
}
}
Expand Down
24 changes: 23 additions & 1 deletion kernel/cathode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ func TestDequeuer(t *testing.T) {
codec := encoding.NewCodecJson[string]()
none := mockCathode(nil, nil)
pass := mockCathode(make(chan string),
[]swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}},
[]swarm.Bag{
{
Category: "test",
Digest: "1",
IOContext: "context",
Object: []byte(`"1"`),
},
},
)

t.Run("Kernel", func(t *testing.T) {
Expand Down Expand Up @@ -60,6 +67,21 @@ func TestDequeuer(t *testing.T) {
k.Close()
})

t.Run("Dequeue.1.Context", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond})
rcv, ack := Dequeue(k, "test", codec)
go k.Await()

msg := <-rcv
ack <- msg
it.Then(t).Should(
it.Equal(string(<-pass.ack), `1`),
it.Equal(msg.IOContext.(string), "context"),
)

k.Close()
})

t.Run("Backlog", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond})
rcv, ack := Dequeue(k, "test", codec)
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package swarm

const Version = "v0.20.0-alpha"
const Version = "v0.20.1"
Loading