Skip to content

Commit

Permalink
kernel v0.20.1 with minor improvements (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Sep 30, 2024
1 parent 8907d68 commit c50a685
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 179 deletions.
443 changes: 294 additions & 149 deletions README.md

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Msg[T any] struct {
// Error on the message processing
Error error

// I/O Context of the message, as obtained from broker
IOContext any

// Message decoded content
Object T
}
Expand All @@ -47,16 +50,20 @@ type Bag struct {
// Error on the message processing
Error error

// I/O Context of the message, as obtained from broker
IOContext any

// Message raw content
Object []byte
}

func ToMsg[T any](bag Bag, object T) Msg[T] {
return Msg[T]{
Category: bag.Category,
Digest: bag.Digest,
Error: bag.Error,
Object: object,
Category: bag.Category,
Digest: bag.Digest,
Error: bag.Error,
IOContext: bag.IOContext,
Object: object,
}
}

Expand Down
55 changes: 40 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/fogfish/swarm/kernel/backoff"
)

// Environment variable to config kernel
const (
EnvConfigPollFrequency = "CONFIG_SWARM_POLL_FREQUENCY"
EnvConfigTimeToFlight = "CONFIG_SWARM_TIME_TO_FLIGHT"
EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT"
)

// Grade of Service Policy
type Policy int

Expand Down Expand Up @@ -64,22 +71,26 @@ type Config struct {
// Timeout for any network operations
NetworkTimeout time.Duration

// Codec for binary packets
Codec Codec
// Fail fast the message if category is not known to kernel.
FailOnUnknownCategory bool

// PacketCodec for binary packets
PacketCodec Codec
}

func NewConfig() Config {
return Config{
Source: "github.com/fogfish/swarm",
Policy: PolicyAtLeastOnce,
CapOut: 0,
CapDlq: 0,
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
Source: "github.com/fogfish/swarm",
Policy: PolicyAtLeastOnce,
CapOut: 0,
CapDlq: 0,
CapRcv: 0,
CapAck: 0,
Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5),
PollFrequency: 10 * time.Millisecond,
TimeToFlight: 5 * time.Second,
NetworkTimeout: 5 * time.Second,
FailOnUnknownCategory: false,
}
}

Expand Down Expand Up @@ -179,9 +190,9 @@ func WithNetworkTimeout(t time.Duration) Option {
// - CONFIG_SWARM_NETWORK_TIMEOUT
func WithConfigFromEnv() Option {
return func(conf *Config) {
conf.PollFrequency = durationFromEnv("CONFIG_SWARM_POLL_FREQUENCY", conf.PollFrequency)
conf.TimeToFlight = durationFromEnv("CONFIG_SWARM_TIME_TO_FLIGHT", conf.TimeToFlight)
conf.NetworkTimeout = durationFromEnv("CONFIG_SWARM_NETWORK_TIMEOUT", conf.NetworkTimeout)
conf.PollFrequency = durationFromEnv(EnvConfigPollFrequency, conf.PollFrequency)
conf.TimeToFlight = durationFromEnv(EnvConfigTimeToFlight, conf.TimeToFlight)
conf.NetworkTimeout = durationFromEnv(EnvConfigNetworkTimeout, conf.NetworkTimeout)
}
}

Expand Down Expand Up @@ -225,3 +236,17 @@ func WithPolicyAtLeastOnce(n int) Option {
conf.CapAck = n
}
}

// Configure codec for binary packets
func WithPacketCodec(codec Codec) Option {
return func(conf *Config) {
conf.PacketCodec = codec
}
}

// Fail fast the message if category is not known to kernel.
func WithFailOnUnknownCategory() Option {
return func(conf *Config) {
conf.FailOnUnknownCategory = true
}
}
4 changes: 2 additions & 2 deletions dequeue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Dequeuer, category ...string) (<-chan swarm.Msg[s

// Create pair of channels to receive and acknowledge pure binary
func Bytes(q *kernel.Dequeuer, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) {
if q.Config.Codec != nil {
return kernel.Dequeue(q, cat, q.Config.Codec)
if q.Config.PacketCodec != nil {
return kernel.Dequeue(q, cat, q.Config.PacketCodec)
}

return kernel.Dequeue(q, cat, encoding.NewCodecByte())
Expand Down
4 changes: 2 additions & 2 deletions enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Enqueuer, category ...string) (snd chan<- swarm.E

// Create pair of channels to emit pure binaries
func Bytes(q *kernel.Enqueuer, cat string) (snd chan<- []byte, dlq <-chan []byte) {
if q.Config.Codec != nil {
return kernel.Enqueue(q, cat, q.Config.Codec)
if q.Config.PacketCodec != nil {
return kernel.Enqueue(q, cat, q.Config.PacketCodec)
}

return kernel.Enqueue(q, cat, encoding.NewCodecByte())
Expand Down
4 changes: 2 additions & 2 deletions enqueue/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ type EmitterBytes struct {
// Creates synchronous emitter
func NewBytes(q *kernel.Enqueuer, category string) *EmitterBytes {
var codec swarm.Codec
if q.Config.Codec != nil {
codec = q.Config.Codec
if q.Config.PacketCodec != nil {
codec = q.Config.PacketCodec
} else {
codec = encoding.NewCodecByte()
}
Expand Down
27 changes: 26 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,35 @@

package swarm

import "github.com/fogfish/faults"
import (
"fmt"
"time"

"github.com/fogfish/faults"
)

const (
ErrServiceIO = faults.Type("service i/o failed")
ErrEnqueue = faults.Type("enqueue is failed")
ErrDequeue = faults.Type("dequeue is failed")
)

type errTimeout struct {
op string
timer time.Duration
}

func ErrTimeout(op string, timer time.Duration) error {
return errTimeout{
op: op,
timer: timer,
}
}

func (err errTimeout) Error() string {
return fmt.Sprintf("timeout %s after %s", err.op, err.timer)
}

func (err errTimeout) Timeout() time.Duration {
return err.timer
}
3 changes: 1 addition & 2 deletions kernel/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package kernel

import (
"context"
"fmt"
"time"

"github.com/fogfish/swarm"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (s *Bridge) Dispatch(seq []swarm.Bag) error {
case err := <-s.session:
return err
case <-time.After(s.timeToFlight):
return fmt.Errorf("ack timeout")
return swarm.ErrTimeout("ack", s.timeToFlight)
}
}

Expand Down
8 changes: 8 additions & 0 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kernel

import (
"context"
"fmt"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -102,6 +103,13 @@ func (k *Dequeuer) receive() {
k.Config.StdErr <- err
return
}
} else {
if k.Config.FailOnUnknownCategory {
slog.Error("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
k.Cathode.Err(k.context, bag.Digest, swarm.ErrDequeue.New(fmt.Errorf("unknown category %s ", bag.Category)))
} else {
slog.Warn("Unknown category", "cat", bag.Category, "kernel", k.Config.Source)
}
}
}
}
Expand Down
24 changes: 23 additions & 1 deletion kernel/cathode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ func TestDequeuer(t *testing.T) {
codec := encoding.NewCodecJson[string]()
none := mockCathode(nil, nil)
pass := mockCathode(make(chan string),
[]swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}},
[]swarm.Bag{
{
Category: "test",
Digest: "1",
IOContext: "context",
Object: []byte(`"1"`),
},
},
)

t.Run("Kernel", func(t *testing.T) {
Expand Down Expand Up @@ -60,6 +67,21 @@ func TestDequeuer(t *testing.T) {
k.Close()
})

t.Run("Dequeue.1.Context", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond})
rcv, ack := Dequeue(k, "test", codec)
go k.Await()

msg := <-rcv
ack <- msg
it.Then(t).Should(
it.Equal(string(<-pass.ack), `1`),
it.Equal(msg.IOContext.(string), "context"),
)

k.Close()
})

t.Run("Backlog", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond})
rcv, ack := Dequeue(k, "test", codec)
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package swarm

const Version = "v0.20.0-alpha"
const Version = "v0.20.1"

0 comments on commit c50a685

Please sign in to comment.