diff --git a/go.mod b/go.mod index 72a82fe..f3165ef 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,8 @@ require ( github.com/fogfish/golem/pure v0.10.1 github.com/fogfish/guid/v2 v2.0.4 github.com/fogfish/it v1.0.0 + github.com/fogfish/it/v2 v2.0.2 + github.com/fogfish/logger/v3 v3.1.1 ) require github.com/fogfish/golem/hseq v1.2.0 // indirect diff --git a/go.sum b/go.sum index a942ac3..35d27be 100644 --- a/go.sum +++ b/go.sum @@ -14,3 +14,5 @@ github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o= github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4= +github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE= +github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= diff --git a/kernel/bridge.go b/kernel/bridge.go new file mode 100644 index 0000000..a5fb03d --- /dev/null +++ b/kernel/bridge.go @@ -0,0 +1,74 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "fmt" + "time" + + "github.com/fogfish/swarm" +) + +// Bridge Lambda's main function to [Cathode] interface +type Bridge struct { + timeToFlight time.Duration + inflight map[string]struct{} + session chan error + ch chan []swarm.Bag +} + +func NewBridge(timeToFlight time.Duration) *Bridge { + return &Bridge{ + ch: make(chan []swarm.Bag), + session: make(chan error), + timeToFlight: timeToFlight, + } +} + +// Dispatch the batch of messages in the context of Lambda handler +func (s *Bridge) Dispatch(seq []swarm.Bag) error { + s.inflight = map[string]struct{}{} + for _, bag := range seq { + s.inflight[bag.Ctx.Digest] = struct{}{} + } + + s.ch <- seq + + select { + case err := <-s.session: + return err + case <-time.After(s.timeToFlight): + return fmt.Errorf("ack timeout") + } +} + +func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error) { + select { + case <-ctx.Done(): + return nil, nil + case bag := <-s.ch: + return bag, nil + } +} + +func (s *Bridge) Ack(ctx context.Context, digest string) error { + delete(s.inflight, digest) + if len(s.inflight) == 0 { + s.session <- nil + } + + return nil +} + +func (s *Bridge) Err(ctx context.Context, digest string, err error) error { + delete(s.inflight, digest) + s.session <- err + return nil +} diff --git a/kernel/bridge_test.go b/kernel/bridge_test.go new file mode 100644 index 0000000..ea30172 --- /dev/null +++ b/kernel/bridge_test.go @@ -0,0 +1,108 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/logger/v3" + "github.com/fogfish/swarm" +) + +func init() { + slog.SetDefault( + logger.New( + logger.WithSourceShorten(), + logger.WithoutTimestamp(), + logger.WithLogLevel(slog.LevelDebug), + ), + ) +} + +func TestBridge(t *testing.T) { + + seq := []swarm.Bag{ + {Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}, + {Ctx: &swarm.Context{Category: "test", Digest: "2"}, Object: []byte(`"2"`)}, + } + + t.Run("None", func(t *testing.T) { + k := NewDequeuer(mockSpawner(seq), swarm.Config{PollFrequency: 1 * time.Second}) + go k.Await() + k.Close() + }) + + t.Run("Dequeue.1", func(t *testing.T) { + mock := mockSpawner(seq) + k := NewDequeuer(mock, swarm.Config{PollFrequency: 10 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + go k.Await() + + ack <- <-rcv + ack <- <-rcv + + k.Close() + + it.Then(t).Should( + it.Seq(mock.ack).Equal(`1`, `2`), + ) + }) + + t.Run("Timeout", func(t *testing.T) { + mock := mockSpawner(seq) + k := NewDequeuer(mock, swarm.Config{PollFrequency: 0 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + go k.Await() + + ack <- <-rcv + <-rcv + time.Sleep(1 * time.Millisecond) + + k.Close() + time.Sleep(1 * time.Millisecond) + + it.Then(t).ShouldNot( + it.Nil(mock.err), + ) + }) + +} + +//------------------------------------------------------------------------------ + +type spawner struct { + *Bridge + seq []swarm.Bag + ack []string + err error +} + +func mockSpawner(seq []swarm.Bag) *spawner { + return &spawner{ + Bridge: NewBridge(100 * time.Millisecond), + seq: seq, + } +} + +func (s *spawner) Ack(ctx context.Context, digest string) error { + if err := s.Bridge.Ack(ctx, digest); err != nil { + return err + } + + s.ack = append(s.ack, digest) + return nil +} + +func (s *spawner) Run() { + s.err = s.Bridge.Dispatch(s.seq) +} diff --git a/kernel/cathode.go b/kernel/cathode.go new file mode 100644 index 0000000..09ff5bd --- /dev/null +++ b/kernel/cathode.go @@ -0,0 +1,191 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/fogfish/swarm" +) + +type Cathode interface { + Ack(ctx context.Context, digest string) error + Err(ctx context.Context, digest string, err error) error + Ask(ctx context.Context) ([]swarm.Bag, error) +} + +// Decode message from wire format +type Decoder[T any] interface{ Decode([]byte) (T, error) } + +type Dequeuer struct { + sync.WaitGroup + sync.RWMutex + + // Control-plane stop channel used by go routines to stop I/O on data channels + context context.Context + cancel context.CancelFunc + + // Kernel configuration + Config swarm.Config + + // event router, binds category with destination channel + router map[string]interface{ Route(swarm.Bag) error } + + // Cathode is the reader port on message broker + Cathode Cathode +} + +func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer { + ctx, can := context.WithCancel(context.Background()) + + return &Dequeuer{ + Config: config, + context: ctx, + cancel: can, + router: map[string]interface{ Route(swarm.Bag) error }{}, + Cathode: cathode, + } +} + +// Close enqueuer +func (k *Dequeuer) Close() { + k.cancel() + k.WaitGroup.Wait() +} + +func (k *Dequeuer) Await() { + if spawner, ok := k.Cathode.(interface{ Run() }); ok { + go spawner.Run() + } + + k.receive() + <-k.context.Done() + k.WaitGroup.Wait() +} + +// internal infinite receive loop. +// waiting for message from event buses and queues and schedules it for delivery. +func (k *Dequeuer) receive() { + asker := func() { + seq, err := k.Cathode.Ask(k.context) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + + for i := 0; i < len(seq); i++ { + bag := seq[i] + + k.RWMutex.RLock() + r, has := k.router[bag.Ctx.Category] + k.RWMutex.RUnlock() + + if has { + err := r.Route(bag) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + } + } + } + + k.WaitGroup.Add(1) + go func() { + slog.Debug("kernel dequeue loop started") + + exit: + for { + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case <-time.After(k.Config.PollFrequency): + asker() + } + } + + k.WaitGroup.Done() + slog.Debug("kernel dequeue loop stopped") + }() +} + +// Dequeue creates pair of channels within kernel to enqueue messages +func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) { + rcv := make(chan swarm.Msg[T], k.Config.CapRcv) + ack := make(chan swarm.Msg[T], k.Config.CapAck) + + k.RWMutex.Lock() + k.router[cat] = router[T]{ch: rcv, codec: codec} + k.RWMutex.Unlock() + + // emitter routine + acks := func(msg swarm.Msg[T]) { + if msg.Ctx.Error == nil { + err := k.Cathode.Ack(k.context, msg.Ctx.Digest) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + } + } else { + err := k.Cathode.Err(k.context, msg.Ctx.Digest, msg.Ctx.Error) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + } + } + } + + k.WaitGroup.Add(1) + go func() { + slog.Debug("kernel dequeue started", "cat", cat) + + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case msg := <-ack: + acks(msg) + } + } + + backlog := len(ack) + close(ack) + + if backlog != 0 { + for msg := range ack { + acks(msg) + } + } + + k.WaitGroup.Done() + slog.Debug("kernel dequeue stopped", "cat", cat) + }() + + return rcv, ack +} diff --git a/kernel/cathode_test.go b/kernel/cathode_test.go new file mode 100644 index 0000000..35149ba --- /dev/null +++ b/kernel/cathode_test.go @@ -0,0 +1,95 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" +) + +func TestDequeuer(t *testing.T) { + none := mockCathode(nil, nil) + pass := mockCathode(make(chan string), + []swarm.Bag{{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}}, + ) + + t.Run("None", func(t *testing.T) { + k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second}) + go k.Await() + k.Close() + }) + + t.Run("Idle", func(t *testing.T) { + k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second}) + Dequeue(k, "test", swarm.NewCodecJson[string]()) + go k.Await() + k.Close() + }) + + t.Run("Dequeue.1", func(t *testing.T) { + k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + go k.Await() + + ack <- <-rcv + it.Then(t).Should( + it.Equal(string(<-pass.ack), `1`), + ) + + k.Close() + }) + + t.Run("Backlog", func(t *testing.T) { + k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + go k.Await() + + ack <- <-rcv + ack <- <-rcv + ack <- <-rcv + ack <- <-rcv + go k.Close() + + it.Then(t).Should( + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + ) + }) +} + +//------------------------------------------------------------------------------ + +type cathode struct { + seq []swarm.Bag + ack chan string +} + +func mockCathode(ack chan string, seq []swarm.Bag) cathode { + return cathode{seq: seq, ack: ack} +} + +func (c cathode) Ack(ctx context.Context, digest string) error { + c.ack <- digest + return nil +} + +func (c cathode) Err(ctx context.Context, digest string, err error) error { + c.ack <- digest + return nil +} + +func (c cathode) Ask(context.Context) ([]swarm.Bag, error) { + return c.seq, nil +} diff --git a/kernel/emitter.go b/kernel/emitter.go new file mode 100644 index 0000000..801c812 --- /dev/null +++ b/kernel/emitter.go @@ -0,0 +1,129 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "sync" + + "github.com/fogfish/swarm" +) + +// Emitter defines on-the-wire protocol for [swarm.Bag] +type Emitter interface { + Enq(context.Context, swarm.Bag) error +} + +// Encodes message into wire format +type Encoder[T any] interface{ Encode(T) ([]byte, error) } + +// Messaging Egress port +type Enqueuer struct { + sync.WaitGroup + + // Control-plane stop channel used by go routines to stop I/O on data channels + context context.Context + cancel context.CancelFunc + + // Kernel configuration + Config swarm.Config + + // Emitter is the writer port on message broker + Emitter Emitter +} + +func NewEnqueuer(emitter Emitter, config swarm.Config) *Enqueuer { + ctx, can := context.WithCancel(context.Background()) + + return &Enqueuer{ + Config: config, + context: ctx, + cancel: can, + Emitter: emitter, + } +} + +// Close enqueuer +func (k *Enqueuer) Close() { + k.cancel() + k.WaitGroup.Wait() +} + +// Await enqueue +func (k *Enqueuer) Await() { + <-k.context.Done() + k.WaitGroup.Wait() +} + +// Enqueue creates pair of channels within kernel to enqueue messages +func Enqueue[T any](k *Enqueuer, cat string, codec Encoder[T]) ( /*snd*/ chan<- T /*dlq*/, <-chan T) { + snd := make(chan T, k.Config.CapOut) + dlq := make(chan T, k.Config.CapDLQ) + + // emitter routine + emit := func(obj T) { + msg, err := codec.Encode(obj) + if err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + + ctx := swarm.NewContext(context.Background(), cat, "") + bag := swarm.Bag{Ctx: ctx, Object: msg} + + if err := k.Emitter.Enq(context.Background(), bag); err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + } + + k.WaitGroup.Add(1) + go func() { + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case obj := <-snd: + emit(obj) + } + } + + backlog := len(snd) + close(snd) + + if backlog != 0 { + for obj := range snd { + emit(obj) + } + } + + k.WaitGroup.Done() + }() + + return snd, dlq +} diff --git a/kernel/emitter_test.go b/kernel/emitter_test.go new file mode 100644 index 0000000..44817ec --- /dev/null +++ b/kernel/emitter_test.go @@ -0,0 +1,134 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" +) + +func TestEnqueuer(t *testing.T) { + none := mockEmitter(0, nil) + pass := mockEmitter(10, make(chan []byte)) + + t.Run("None", func(t *testing.T) { + k := NewEnqueuer(none, swarm.Config{}) + k.Close() + }) + + t.Run("Idle", func(t *testing.T) { + k := NewEnqueuer(none, swarm.Config{}) + Enqueue(k, "test", swarm.NewCodecJson[string]()) + k.Close() + }) + + t.Run("Enqueue.1", func(t *testing.T) { + k := NewEnqueuer(pass, swarm.Config{}) + snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + + snd <- "1" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"1"`), + ) + + k.Close() + }) + + t.Run("Enqueue.N", func(t *testing.T) { + k := NewEnqueuer(pass, swarm.Config{}) + snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + + snd <- "1" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"1"`), + ) + + snd <- "2" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"2"`), + ) + + snd <- "3" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"3"`), + ) + + snd <- "4" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"4"`), + ) + + k.Close() + }) + + t.Run("Backlog", func(t *testing.T) { + k := NewEnqueuer(pass, swarm.Config{CapOut: 4}) + snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd <- "1" + snd <- "2" + snd <- "3" + snd <- "4" + go k.Close() + + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"1"`), + it.Equal(string(<-pass.ch), `"2"`), + it.Equal(string(<-pass.ch), `"3"`), + it.Equal(string(<-pass.ch), `"4"`), + ) + }) + + t.Run("Queues.N", func(t *testing.T) { + k := NewEnqueuer(pass, swarm.Config{}) + a, _ := Enqueue(k, "a", swarm.NewCodecJson[string]()) + b, _ := Enqueue(k, "b", swarm.NewCodecJson[string]()) + c, _ := Enqueue(k, "c", swarm.NewCodecJson[string]()) + + a <- "a" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"a"`), + ) + + b <- "b" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"b"`), + ) + + c <- "c" + it.Then(t).Should( + it.Equal(string(<-pass.ch), `"c"`), + ) + + k.Close() + }) +} + +//------------------------------------------------------------------------------ + +type emitter struct { + ms int + ch chan []byte +} + +func mockEmitter(ms int, ch chan []byte) emitter { + return emitter{ + ms: ms, + ch: ch, + } +} + +func (e emitter) Enq(ctx context.Context, bag swarm.Bag) error { + time.Sleep(time.Duration(e.ms) * time.Microsecond) + e.ch <- bag.Object + return nil +} diff --git a/kernel/kernel.go b/kernel/kernel.go index f28adb8..1a6bfb9 100644 --- a/kernel/kernel.go +++ b/kernel/kernel.go @@ -8,363 +8,24 @@ package kernel -import ( - "context" - "errors" - "log/slog" - "sync" - "time" - - "github.com/fogfish/swarm" -) - -type Codec[T any] interface { - Encode(T) ([]byte, error) - Decode([]byte) (T, error) -} - -type Emitter interface { - Enq(swarm.Bag) error -} - -type Cathode interface { - Ack(digest string) error - Ask() ([]swarm.Bag, error) -} - -type Spawner interface { - Spawn(*Kernel) error -} - -// Event Kernel builds an infrastructure for integrating message brokers, -// events busses into Golang channel environment. -// The implementation follows the pattern, defined by -// https://go101.org/article/channel-closing.html type Kernel struct { - sync.WaitGroup - sync.RWMutex - - // Kernel configuration - Config swarm.Config - - // Control-plane stop channel. It is used to notify the kernel to terminate. - // Kernel notifies control plane of individual routines. - mainStop chan struct{} - - // Control-plane stop channel used by go routines to stop I/O on data channels - ctrlStop chan struct{} - - // Control-plane for ack of batch elements - ctrlAcks chan *swarm.Context - - // event router, binds category with destination channel - router map[string]interface{ Route(swarm.Bag) error } - - // The flag indicates if Await loop is started - waiting bool - - // On the wire protocol emitter (writer) and cathode (receiver) - Emitter Emitter - Cathode Cathode + *Enqueuer + *Dequeuer } -// New routing and dispatch kernel -func New(emitter Emitter, cathode Cathode, config swarm.Config) *Kernel { +func New(enqueuer *Enqueuer, dequeuer *Dequeuer) *Kernel { return &Kernel{ - Config: config, - mainStop: make(chan struct{}, 1), // MUST BE buffered - ctrlStop: make(chan struct{}), - - router: map[string]interface{ Route(swarm.Bag) error }{}, - - Emitter: emitter, - Cathode: cathode, - } -} - -// internal infinite receive loop. -// waiting for message from event buses and queues and schedules it for delivery. -func (k *Kernel) receive() { - k.WaitGroup.Add(1) - - asker := func() { - seq, err := k.Cathode.Ask() - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - return - } - - for i := 0; i < len(seq); i++ { - bag := seq[i] - - k.RWMutex.RLock() - r, has := k.router[bag.Ctx.Category] - k.RWMutex.RUnlock() - - if has { - err := r.Route(bag) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - return - } - } - } + Enqueuer: enqueuer, + Dequeuer: dequeuer, } - - go func() { - exit: - for { - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case <-time.After(k.Config.PollFrequency): - asker() - } - } - - slog.Debug("Free kernel infinite loop") - k.WaitGroup.Done() - }() - - slog.Debug("Init kernel infinite loop") } -// Close event delivery infrastructure func (k *Kernel) Close() { - k.mainStop <- struct{}{} - if !k.waiting { - close(k.ctrlStop) - k.WaitGroup.Wait() - } + k.Dequeuer.Close() + k.Enqueuer.Close() } -// Await for event delivery func (k *Kernel) Await() { - k.waiting = true - - if spawner, ok := k.Cathode.(Spawner); ok { - spawner.Spawn(k) - } else { - k.receive() - } - - <-k.mainStop - close(k.ctrlStop) - k.WaitGroup.Wait() -} - -// Dispatches batch of messages -func (k *Kernel) Dispatch(seq []swarm.Bag, timeout time.Duration) error { - k.WaitGroup.Add(1) - k.ctrlAcks = make(chan *swarm.Context, len(seq)) - - wnd := map[string]struct{}{} - for i := 0; i < len(seq); i++ { - bag := seq[i] - wnd[bag.Ctx.Digest] = struct{}{} - - k.RWMutex.RLock() - r, has := k.router[bag.Ctx.Category] - k.RWMutex.RUnlock() - - if has { - err := r.Route(bag) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - continue - } - } - } - - var err error - -exit: - for { - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case ack := <-k.ctrlAcks: - if err == nil && ack.Error != nil { - err = ack.Error - } - - delete(wnd, ack.Digest) - if len(wnd) == 0 { - break exit - } - case <-time.After(timeout): - err = errors.New("ack timeout") - break exit - } - } - - close(k.ctrlAcks) - k.ctrlAcks = nil - - return err -} - -// Enqueue channels for kernel -func Enqueue[T any](k *Kernel, cat string, codec Codec[T]) ( /*snd*/ chan<- T /*dlq*/, <-chan T) { - snd := make(chan T, k.Config.CapOut) - dlq := make(chan T, k.Config.CapDLQ) - - // emitter routine - emit := func(obj T) { - msg, err := codec.Encode(obj) - if err != nil { - dlq <- obj - if k.Config.StdErr != nil { - k.Config.StdErr <- err - } - return - } - - ctx := swarm.NewContext(context.Background(), cat, "") - bag := swarm.Bag{Ctx: ctx, Object: msg} - - if err := k.Emitter.Enq(bag); err != nil { - dlq <- obj - if k.Config.StdErr != nil { - k.Config.StdErr <- err - } - return - } - - slog.Debug("emitted ", "cat", cat, "object", obj) - } - - k.WaitGroup.Add(1) - go func() { - exit: - for { - // The try-receive operation here is to - // try to exit the sender goroutine as - // early as possible. Try-receive and - // try-send select blocks are specially - // optimized by the standard Go - // compiler, so they are very efficient. - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case obj := <-snd: - emit(obj) - } - } - - backlog := len(snd) - close(snd) - - slog.Debug("Free enqueue kernel", "cat", cat, "backlog", backlog) - if backlog != 0 { - for obj := range snd { - emit(obj) - } - } - k.WaitGroup.Done() - }() - - slog.Debug("Init enqueue kernel", "cat", cat) - - return snd, dlq -} - -type router[T any] struct { - ch chan swarm.Msg[T] - codec Codec[T] -} - -func (a router[T]) Route(bag swarm.Bag) error { - obj, err := a.codec.Decode(bag.Object) - if err != nil { - return err - } - - msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj} - a.ch <- msg - return nil -} - -// Enqueue channels for kernel -func Dequeue[T any](k *Kernel, cat string, codec Codec[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) { - rcv := make(chan swarm.Msg[T], k.Config.CapRcv) - ack := make(chan swarm.Msg[T], k.Config.CapAck) - - k.RWMutex.Lock() - k.router[cat] = router[T]{ch: rcv, codec: codec} - k.RWMutex.Unlock() - - // emitter routine - acks := func(msg swarm.Msg[T]) { - if msg.Ctx.Error == nil { - err := k.Cathode.Ack(msg.Ctx.Digest) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - } - - slog.Debug("acked ", "cat", cat, "object", msg.Object) - } - - if k.ctrlAcks != nil { - k.ctrlAcks <- msg.Ctx - } - } - - k.WaitGroup.Add(1) - go func() { - exit: - for { - // The try-receive operation here is to - // try to exit the sender goroutine as - // early as possible. Try-receive and - // try-send select blocks are specially - // optimized by the standard Go - // compiler, so they are very efficient. - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case msg := <-ack: - acks(msg) - } - } - - backlog := len(ack) - close(ack) - - slog.Debug("Free dequeue kernel", "cat", cat, "backlog", backlog) - if backlog != 0 { - for msg := range ack { - acks(msg) - } - } - k.WaitGroup.Done() - }() - - slog.Debug("Init dequeue kernel", "cat", cat) - - return rcv, ack + k.Dequeuer.Await() + k.Enqueuer.Await() } diff --git a/kernel/router.go b/kernel/router.go new file mode 100644 index 0000000..cda5c60 --- /dev/null +++ b/kernel/router.go @@ -0,0 +1,28 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import "github.com/fogfish/swarm" + +// Router is typed pair of message channel and codec +type router[T any] struct { + ch chan swarm.Msg[T] + codec Decoder[T] +} + +func (a router[T]) Route(bag swarm.Bag) error { + obj, err := a.codec.Decode(bag.Object) + if err != nil { + return err + } + + msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj} + a.ch <- msg + return nil +} diff --git a/kernel/router_test.go b/kernel/router_test.go new file mode 100644 index 0000000..7c03262 --- /dev/null +++ b/kernel/router_test.go @@ -0,0 +1,28 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "testing" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" +) + +func TestRoute(t *testing.T) { + r := router[string]{ + ch: make(chan swarm.Msg[string], 1), + codec: swarm.NewCodecJson[string](), + } + + r.Route(swarm.Bag{Object: []byte(`"1"`)}) + it.Then(t).Should( + it.Equal((<-r.ch).Object, `1`), + ) +} diff --git a/version.go b/version.go index 667fd39..df0021d 100644 --- a/version.go +++ b/version.go @@ -8,4 +8,4 @@ package swarm -const Version = "v0.16.0" +const Version = "v0.19.0"