From 803b8ab41da7698ae9c48f62efa02b591e90a509 Mon Sep 17 00:00:00 2001 From: fogfish Date: Sat, 28 Sep 2024 22:24:54 +0300 Subject: [PATCH] Split kernel on distinct emitter (writer) and cathode (reader). (#79) * Split kernel on distinct emitter (writer) and cathode (reader). * Release high-level dequeue api * Release high-level enqueue api * Release high-level synchronous enqueue api * Improve test coverage for kernel * Refactor codecs into kernel * Refactor definition of message context * Refactor event using side-car for meta & data (envelop is not scalable) * Remove WithService config option * Rename config CapDlq --- bag.go | 79 ++++-- config.go | 26 +- dequeue/dequeue.go | 41 +++ dequeue/dequeue_test.go | 112 +++++++++ enqueue/enqueue.go | 41 +++ enqueue/enqueue_test.go | 105 ++++++++ enqueue/writer.go | 159 ++++++++++++ enqueue/writer_test.go | 71 ++++++ event.go | 40 ++- go.mod | 3 +- go.sum | 4 +- kernel/bridge.go | 84 +++++++ kernel/bridge_test.go | 269 ++++++++++++++++++++ kernel/cathode.go | 198 +++++++++++++++ kernel/cathode_test.go | 106 ++++++++ kernel/emitter.go | 129 ++++++++++ kernel/emitter_test.go | 212 ++++++++++++++++ codec.go => kernel/encoding/codec.go | 25 +- kernel/kernel.go | 357 ++------------------------- kernel/router.go | 38 +++ kernel/router_test.go | 30 +++ service.go | 6 - version.go | 2 +- 23 files changed, 1714 insertions(+), 423 deletions(-) create mode 100644 dequeue/dequeue.go create mode 100644 dequeue/dequeue_test.go create mode 100644 enqueue/enqueue.go create mode 100644 enqueue/enqueue_test.go create mode 100644 enqueue/writer.go create mode 100644 enqueue/writer_test.go create mode 100644 kernel/bridge.go create mode 100644 kernel/bridge_test.go create mode 100644 kernel/cathode.go create mode 100644 kernel/cathode_test.go create mode 100644 kernel/emitter.go create mode 100644 kernel/emitter_test.go rename codec.go => kernel/encoding/codec.go (75%) create mode 100644 kernel/router.go create mode 100644 kernel/router_test.go diff --git a/bag.go b/bag.go index c1fa5f1..360625d 100644 --- a/bag.go +++ b/bag.go @@ -8,12 +8,14 @@ package swarm -import "context" - -// Global context for the message -type Context struct { - context.Context +import ( + "reflect" + "strings" +) +// Msg is a generic envelop type for incoming messages. +// It contains both decoded object and its digest used to acknowledge message. +type Msg[T any] struct { // Message category ~ topic Category string @@ -22,32 +24,69 @@ type Context struct { // Error on the message processing Error error -} - -func NewContext(ctx context.Context, cat, digest string) *Context { - return &Context{ - Context: ctx, - Category: cat, - Digest: digest, - } -} -// Msg is a generic envelop type for incoming messages. -// It contains both decoded object and its digest used to acknowledge message. -type Msg[T any] struct { - Ctx *Context + // Message decoded content Object T } // Fail message with error func (msg Msg[T]) Fail(err error) Msg[T] { - msg.Ctx.Error = err + msg.Error = err return msg } // Bag is an abstract container for octet stream. // Bag is used by the transport to abstract message on the wire. type Bag struct { - Ctx *Context + // Message category ~ topic + Category string + + // Unique brief summary of the message + Digest string + + // Error on the message processing + Error error + + // Message raw content Object []byte } + +func ToMsg[T any](bag Bag, object T) Msg[T] { + return Msg[T]{ + Category: bag.Category, + Digest: bag.Digest, + Error: bag.Error, + Object: object, + } +} + +// func ToBag[T any](msg Msg[T], object []byte) Bag { +// return Bag{ +// Category: msg.Category, +// Digest: msg.Digest, +// Error: msg.Error, +// Object: object, +// } +// } + +// TypeOf returns normalized name of the type T. +func TypeOf[T any](category ...string) string { + if len(category) > 0 { + return category[0] + } + + typ := reflect.TypeOf(new(T)).Elem() + cat := typ.Name() + if typ.Kind() == reflect.Ptr { + cat = typ.Elem().Name() + } + + seq := strings.Split(strings.Trim(cat, "]"), "[") + tkn := make([]string, len(seq)) + for i, s := range seq { + r := strings.Split(s, ".") + tkn[i] = r[len(r)-1] + } + + return strings.Join(tkn, "[") + strings.Repeat("]", len(tkn)-1) +} diff --git a/config.go b/config.go index 2c5f1b8..ce348a0 100644 --- a/config.go +++ b/config.go @@ -30,10 +30,12 @@ type Retry interface { Retry(f func() error) error } -type Config struct { - // Instance of AWS Service, used to overwrite default client - Service any +type Codec interface { + Encode([]byte) ([]byte, error) + Decode([]byte) ([]byte, error) +} +type Config struct { // Source is a direct performer of the event. // A software service that emits action to the stream. Source string @@ -43,7 +45,7 @@ type Config struct { // Queue capacity (enhance with individual capacities) CapOut int - CapDLQ int + CapDlq int CapRcv int CapAck int @@ -61,6 +63,9 @@ type Config struct { // Timeout for any network operations NetworkTimeout time.Duration + + // Codec for binary packets + Codec Codec } func NewConfig() Config { @@ -68,7 +73,7 @@ func NewConfig() Config { Source: "github.com/fogfish/swarm", Policy: PolicyAtLeastOnce, CapOut: 0, - CapDLQ: 0, + CapDlq: 0, CapRcv: 0, CapAck: 0, Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), @@ -81,13 +86,6 @@ func NewConfig() Config { // Configuration option for queueing broker type Option func(conf *Config) -// Configure AWS Service for broker instance -func WithService(service any) Option { - return func(conf *Config) { - conf.Service = service - } -} - // Source is a direct performer of the event. // A software service that emits action to the stream. func WithSource(agent string) Option { @@ -209,7 +207,7 @@ func WithPolicyAtMostOnce(n int) Option { return func(conf *Config) { conf.Policy = PolicyAtMostOnce conf.CapOut = n - conf.CapDLQ = n + conf.CapDlq = n conf.CapRcv = n conf.CapAck = n } @@ -222,7 +220,7 @@ func WithPolicyAtLeastOnce(n int) Option { return func(conf *Config) { conf.Policy = PolicyAtLeastOnce conf.CapOut = 0 - conf.CapDLQ = 0 + conf.CapDlq = 0 conf.CapRcv = n conf.CapAck = n } diff --git a/dequeue/dequeue.go b/dequeue/dequeue.go new file mode 100644 index 0000000..6c74059 --- /dev/null +++ b/dequeue/dequeue.go @@ -0,0 +1,41 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package dequeue + +import ( + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel" + "github.com/fogfish/swarm/kernel/encoding" +) + +// Creates pair of channels to receive and acknowledge messages of type T +func Typed[T any](q *kernel.Dequeuer, category ...string) (rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { + return kernel.Dequeue(q, + swarm.TypeOf[T](category...), + encoding.NewCodecJson[T](), + ) +} + +// Creates pair of channels to receive and acknowledge events of type T +func Event[M, T any](q *kernel.Dequeuer, category ...string) (<-chan swarm.Msg[swarm.Event[M, T]], chan<- swarm.Msg[swarm.Event[M, T]]) { + cat := swarm.TypeOf[T](category...) + + return kernel.Dequeue(q, cat, + encoding.NewCodecEvent[M, T](q.Config.Source, cat), + ) +} + +// Create pair of channels to receive and acknowledge pure binary +func Bytes(q *kernel.Dequeuer, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) { + if q.Config.Codec != nil { + return kernel.Dequeue(q, cat, q.Config.Codec) + } + + return kernel.Dequeue(q, cat, encoding.NewCodecByte()) +} diff --git a/dequeue/dequeue_test.go b/dequeue/dequeue_test.go new file mode 100644 index 0000000..e262808 --- /dev/null +++ b/dequeue/dequeue_test.go @@ -0,0 +1,112 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package dequeue_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/dequeue" + "github.com/fogfish/swarm/kernel" +) + +// controls yield time before kernel is closed +const yield_before_close = 5 * time.Millisecond + +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func TestDequeueType(t *testing.T) { + user := User{ID: "id", Text: "user"} + + k := kernel.NewDequeuer(mockCathode(user), swarm.Config{}) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + var msg swarm.Msg[User] + rcv, ack := dequeue.Typed[User](k) + + go func() { + msg = <-rcv + ack <- msg + }() + k.Await() + + it.Then(t).Should( + it.Equal(msg.Category, "User"), + it.Equal(msg.Digest, "1"), + it.Equal(msg.Object.ID, "id"), + it.Equal(msg.Object.Text, "user"), + ) +} + +func TestDequeueBytes(t *testing.T) { + user := User{ID: "id", Text: "user"} + + k := kernel.NewDequeuer(mockCathode(user), swarm.Config{}) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + var msg swarm.Msg[[]byte] + rcv, ack := dequeue.Bytes(k, "User") + + go func() { + msg = <-rcv + ack <- msg + }() + k.Await() + + it.Then(t).Should( + it.Equal(msg.Category, "User"), + it.Equal(msg.Digest, "1"), + it.Equal(string(msg.Object), `{"id":"id","text":"user"}`), + ) +} + +//------------------------------------------------------------------------------ + +type cathode struct { + cat string + user User +} + +func mockCathode(user User) cathode { + return cathode{ + cat: swarm.TypeOf[User](), + user: user, + } +} + +func (c cathode) Ack(ctx context.Context, digest string) error { + return nil +} + +func (c cathode) Err(ctx context.Context, digest string, err error) error { + return nil +} + +func (c cathode) Ask(context.Context) ([]swarm.Bag, error) { + data, err := json.Marshal(c.user) + if err != nil { + return nil, err + } + + bag := []swarm.Bag{{Category: c.cat, Digest: "1", Object: data}} + return bag, nil +} diff --git a/enqueue/enqueue.go b/enqueue/enqueue.go new file mode 100644 index 0000000..a38f60d --- /dev/null +++ b/enqueue/enqueue.go @@ -0,0 +1,41 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package enqueue + +import ( + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel" + "github.com/fogfish/swarm/kernel/encoding" +) + +// Creates pair of channels to emit messages of type T +func Typed[T any](q *kernel.Enqueuer, category ...string) (snd chan<- T, dlq <-chan T) { + return kernel.Enqueue(q, + swarm.TypeOf[T](category...), + encoding.NewCodecJson[T](), + ) +} + +// Creates pair of channels to emit events of type T +func Event[M, T any](q *kernel.Enqueuer, category ...string) (snd chan<- swarm.Event[M, T], dlq <-chan swarm.Event[M, T]) { + cat := swarm.TypeOf[T](category...) + + return kernel.Enqueue(q, cat, + encoding.NewCodecEvent[M, T](q.Config.Source, cat), + ) +} + +// Create pair of channels to emit pure binaries +func Bytes(q *kernel.Enqueuer, cat string) (snd chan<- []byte, dlq <-chan []byte) { + if q.Config.Codec != nil { + return kernel.Enqueue(q, cat, q.Config.Codec) + } + + return kernel.Enqueue(q, cat, encoding.NewCodecByte()) +} diff --git a/enqueue/enqueue_test.go b/enqueue/enqueue_test.go new file mode 100644 index 0000000..f666e19 --- /dev/null +++ b/enqueue/enqueue_test.go @@ -0,0 +1,105 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package enqueue_test + +import ( + "context" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/enqueue" + "github.com/fogfish/swarm/kernel" +) + +// controls yield time before kernel is closed +const yield_before_close = 5 * time.Millisecond + +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func TestType(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + snd, _ := enqueue.Typed[User](k) + snd <- User{ID: "id", Text: "user"} + + k.Await() + + it.Then(t).Should( + it.Equal(mock.val, `{"id":"id","text":"user"}`), + ) +} + +func TestEvent(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + snd, _ := enqueue.Event[swarm.Meta, User](k) + snd <- swarm.Event[swarm.Meta, User]{ + Meta: &swarm.Meta{}, + Data: &User{ID: "id", Text: "user"}, + } + + k.Await() + + it.Then(t).Should( + it.String(mock.val).Contain(`"meta":`), + it.String(mock.val).Contain(`"data":`), + it.String(mock.val).Contain(`"id":`), + it.String(mock.val).Contain(`"type":"[User]"`), + it.String(mock.val).Contain(`"created":`), + it.String(mock.val).Contain(`{"id":"id","text":"user"}`), + ) +} + +func TestBytes(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + snd, _ := enqueue.Bytes(k, "User") + snd <- []byte(`{"id":"id","text":"user"}`) + + k.Await() + + it.Then(t).Should( + it.Equal(mock.val, `{"id":"id","text":"user"}`), + ) +} + +//------------------------------------------------------------------------------ + +type emitter struct { + val string +} + +func mockEmitter(wait int) *emitter { + return &emitter{} +} + +func (e *emitter) Enq(ctx context.Context, bag swarm.Bag) error { + e.val = string(bag.Object) + return nil +} diff --git a/enqueue/writer.go b/enqueue/writer.go new file mode 100644 index 0000000..923969a --- /dev/null +++ b/enqueue/writer.go @@ -0,0 +1,159 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package enqueue + +import ( + "context" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel" + "github.com/fogfish/swarm/kernel/encoding" +) + +// Synchronous emitter of typed messages to the broker. +// It blocks the routine until the message is accepted by the broker. +type EmitterTyped[T any] struct { + cat string + codec kernel.Encoder[T] + kernel *kernel.Enqueuer +} + +// Creates synchronous typed emitter +func NewTyped[T any](q *kernel.Enqueuer, category ...string) *EmitterTyped[T] { + return &EmitterTyped[T]{ + cat: swarm.TypeOf[T](category...), + codec: encoding.NewCodecJson[T](), + kernel: q, + } +} + +// Synchronously enqueue message to broker. +// It guarantees message to be send after return +func (q *EmitterTyped[T]) Enq(ctx context.Context, object T, cat ...string) error { + msg, err := q.codec.Encode(object) + if err != nil { + return err + } + + category := q.cat + if len(cat) > 0 { + category = cat[0] + } + + bag := swarm.Bag{ + Category: category, + Object: msg, + } + + err = q.kernel.Emitter.Enq(ctx, bag) + if err != nil { + return err + } + + return nil +} + +//------------------------------------------------------------------------------ + +// Synchronous emitter of events to the broker. +// It blocks the routine until the event is accepted by the broker. +type EmitterEvent[M, T any] struct { + cat string + codec kernel.Encoder[swarm.Event[M, T]] + kernel *kernel.Enqueuer +} + +// Creates synchronous event emitter +func NewEvent[M, T any](q *kernel.Enqueuer, category ...string) *EmitterEvent[M, T] { + cat := swarm.TypeOf[T](category...) + + return &EmitterEvent[M, T]{ + cat: cat, + codec: encoding.NewCodecEvent[M, T](q.Config.Source, cat), + kernel: q, + } +} + +// Synchronously enqueue event to broker. +// It guarantees event to be send after return. +func (q *EmitterEvent[M, T]) Enq(ctx context.Context, object swarm.Event[M, T], cat ...string) error { + msg, err := q.codec.Encode(object) + if err != nil { + return err + } + + category := q.cat + if len(cat) > 0 { + category = cat[0] + } + + bag := swarm.Bag{ + Category: category, + Object: msg, + } + + err = q.kernel.Emitter.Enq(ctx, bag) + if err != nil { + return err + } + + return nil +} + +//------------------------------------------------------------------------------ + +// Synchronous emitter of raw packets (bytes) to the broker. +// It blocks the routine until the message is accepted by the broker. +type EmitterBytes struct { + cat string + codec kernel.Encoder[[]byte] + kernel *kernel.Enqueuer +} + +// Creates synchronous emitter +func NewBytes(q *kernel.Enqueuer, category string) *EmitterBytes { + var codec swarm.Codec + if q.Config.Codec != nil { + codec = q.Config.Codec + } else { + codec = encoding.NewCodecByte() + } + + return &EmitterBytes{ + cat: category, + codec: codec, + kernel: q, + } +} + +// Synchronously enqueue bytes to broker. +// It guarantees message to be send after return +func (q *EmitterBytes) Enq(ctx context.Context, object []byte, cat ...string) error { + msg, err := q.codec.Encode(object) + if err != nil { + return err + } + + category := q.cat + if len(cat) > 0 { + category = cat[0] + } + + bag := swarm.Bag{ + Category: category, + Object: msg, + } + + err = q.kernel.Emitter.Enq(ctx, bag) + if err != nil { + return err + } + + return nil +} diff --git a/enqueue/writer_test.go b/enqueue/writer_test.go new file mode 100644 index 0000000..2a03111 --- /dev/null +++ b/enqueue/writer_test.go @@ -0,0 +1,71 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package enqueue_test + +import ( + "context" + "testing" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/enqueue" + "github.com/fogfish/swarm/kernel" +) + +func TestNewTypes(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + + q := enqueue.NewTyped[User](k) + q.Enq(context.Background(), User{ID: "id", Text: "user"}) + + it.Then(t).Should( + it.Equal(mock.val, `{"id":"id","text":"user"}`), + ) + + k.Close() +} + +func TestNewEvent(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + + q := enqueue.NewEvent[swarm.Meta, User](k) + q.Enq(context.Background(), + swarm.Event[swarm.Meta, User]{ + Meta: &swarm.Meta{}, + Data: &User{ID: "id", Text: "user"}, + }, + ) + + it.Then(t).Should( + it.String(mock.val).Contain(`"meta":`), + it.String(mock.val).Contain(`"data":`), + it.String(mock.val).Contain(`"id":`), + it.String(mock.val).Contain(`"type":"[User]"`), + it.String(mock.val).Contain(`"created":`), + it.String(mock.val).Contain(`{"id":"id","text":"user"}`), + ) + + k.Close() +} + +func TestNewBytes(t *testing.T) { + mock := mockEmitter(10) + k := kernel.NewEnqueuer(mock, swarm.Config{}) + + q := enqueue.NewBytes(k, "User") + q.Enq(context.Background(), []byte(`{"id":"id","text":"user"}`)) + + it.Then(t).Should( + it.Equal(mock.val, `{"id":"id","text":"user"}`), + ) + + k.Close() +} diff --git a/event.go b/event.go index 7d03997..b3b5ff7 100644 --- a/event.go +++ b/event.go @@ -12,13 +12,8 @@ import ( "time" "github.com/fogfish/curie" - "github.com/fogfish/golem/pure" ) -type EventType any - -type EventKind[A any] pure.HKT[EventType, A] - // Event defines immutable fact(s) placed into the queueing system. // Event resembles the concept of Action as it is defined by schema.org. // @@ -29,8 +24,17 @@ type EventKind[A any] pure.HKT[EventType, A] // These applications processes logical log of events, each event defines a change // to current state of the object, i.e. which attributes were inserted, // updated or deleted (a kind of diff). The event identifies the object that was -// changed together with using unique identifier. -type Event[T any] struct { +// changed together with using unique identifier. +// +// In contrast with other solutions, the event does not uses envelop approach. +// Instead, it side-car meta and data each other, making extendible +type Event[M, T any] struct { + Meta *M `json:"meta,omitempty"` + Data *T `json:"data,omitempty"` +} + +// The default metadata associated with event. +type Meta struct { // // Unique identity for event. // It is automatically defined by the library upon the transmission unless @@ -50,28 +54,16 @@ type Event[T any] struct { // unless defined by sender. Agent curie.IRI `json:"agent,omitempty"` - // - // Indicates target performer of the event, a software service that is able to - Target curie.IRI `json:"target,omitempty"` - - // - // Indirect participants, a user who initiated an event. - Participant curie.IRI `json:"participant,omitempty"` - // // ISO8601 timestamps when action has been created // It is automatically defined by the library upon the transmission Created time.Time `json:"created,omitempty"` // - // The object upon which the event is carried out. - Object T `json:"object,omitempty"` - - // Status (Pending | Success | Failure) - // Deadline before after | nbf NotBefore - // Target + // Indicates target performer of the event, a software service that is able to + Target curie.IRI `json:"target,omitempty"` + // + // Indirect participants, a user who initiated an event. + Participant curie.IRI `json:"participant,omitempty"` } - -func (Event[T]) HKT1(EventType) {} -func (Event[T]) HKT2(T) {} diff --git a/go.mod b/go.mod index 72a82fe..db6c2e5 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ require ( github.com/fogfish/curie v1.8.2 github.com/fogfish/faults v0.2.0 github.com/fogfish/golem/optics v0.13.0 - github.com/fogfish/golem/pure v0.10.1 github.com/fogfish/guid/v2 v2.0.4 github.com/fogfish/it v1.0.0 + github.com/fogfish/it/v2 v2.0.2 + github.com/fogfish/logger/v3 v3.1.1 ) require github.com/fogfish/golem/hseq v1.2.0 // indirect diff --git a/go.sum b/go.sum index a942ac3..76a1b4d 100644 --- a/go.sum +++ b/go.sum @@ -6,11 +6,11 @@ github.com/fogfish/golem/hseq v1.2.0 h1:B6yrzOHQNoTqSlhLb+AvK7dhEAELjHThrCQTF/uq github.com/fogfish/golem/hseq v1.2.0/go.mod h1:17XORt8nNKl6KOhF43MHSmjK8NksbkBsohAoJGiinUs= github.com/fogfish/golem/optics v0.13.0 h1:U3htppjVTMbICQIzPTTe151+WziSGEppNVmkanKa440= github.com/fogfish/golem/optics v0.13.0/go.mod h1:U1y90OVcXF/A61dIP3abQ0x2GweTmzVHPC15pv0pcM0= -github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE= -github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw= github.com/fogfish/guid/v2 v2.0.4 h1:EZiPlM4UAghqf7DU5/nLEF+iRH7ODe0AiFuYOMRvITQ= github.com/fogfish/guid/v2 v2.0.4/go.mod h1:KkZ5T4EE3BqWQJFZBPLSHV/tBe23Xq4KvuPfwtNtepU= github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o= github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4= +github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE= +github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= diff --git a/kernel/bridge.go b/kernel/bridge.go new file mode 100644 index 0000000..2d38815 --- /dev/null +++ b/kernel/bridge.go @@ -0,0 +1,84 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "fmt" + "time" + + "github.com/fogfish/swarm" +) + +// Bridge Lambda's main function to [Cathode] interface +type Bridge struct { + timeToFlight time.Duration + inflight map[string]struct{} + session chan error + ch chan []swarm.Bag +} + +func NewBridge(timeToFlight time.Duration) *Bridge { + return &Bridge{ + ch: make(chan []swarm.Bag), + session: make(chan error), + timeToFlight: timeToFlight, + } +} + +// Dispatch the batch of messages in the context of Lambda handler. +// +// lambda.Start( +// func(evt events.CloudWatchEvent) error { +// ... +// bridge.Dispatch(bag) +// } +// ) +func (s *Bridge) Dispatch(seq []swarm.Bag) error { + s.inflight = map[string]struct{}{} + for _, bag := range seq { + s.inflight[bag.Digest] = struct{}{} + } + + s.ch <- seq + + select { + case err := <-s.session: + return err + case <-time.After(s.timeToFlight): + return fmt.Errorf("ack timeout") + } +} + +// Ask converts input of Lambda handler to the context of the kernel +func (s *Bridge) Ask(ctx context.Context) ([]swarm.Bag, error) { + select { + case <-ctx.Done(): + return nil, nil + case bag := <-s.ch: + return bag, nil + } +} + +// Acknowledge processed message, allowing lambda handler progress +func (s *Bridge) Ack(ctx context.Context, digest string) error { + delete(s.inflight, digest) + if len(s.inflight) == 0 { + s.session <- nil + } + + return nil +} + +// Acknowledge error, allowing lambda handler progress +func (s *Bridge) Err(ctx context.Context, digest string, err error) error { + delete(s.inflight, digest) + s.session <- err + return nil +} diff --git a/kernel/bridge_test.go b/kernel/bridge_test.go new file mode 100644 index 0000000..b388374 --- /dev/null +++ b/kernel/bridge_test.go @@ -0,0 +1,269 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/logger/v3" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" +) + +func init() { + slog.SetDefault( + logger.New( + logger.WithSourceShorten(), + logger.WithoutTimestamp(), + logger.WithLogLevel(slog.LevelDebug), + ), + ) +} + +// controls yield time before kernel is closed +const yield_before_close = 5 * time.Millisecond + +func TestBridge(t *testing.T) { + codec := encoding.NewCodecJson[string]() + config := swarm.Config{PollFrequency: 0 * time.Millisecond} + + // + mockit := func(n int) (*Dequeuer, *bridge) { + seq := []swarm.Bag{} + for i := 0; i < n; i++ { + val := strconv.Itoa(i + 1) + seq = append(seq, + swarm.Bag{ + Category: "test", + Digest: val, + Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected + }, + ) + } + + brdg := mockBridge(seq) + k := NewDequeuer(brdg, config) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + return k, brdg + } + + t.Run("None", func(t *testing.T) { + k, _ := mockit(1) + Dequeue(k, "test", codec) + k.Await() + }) + + t.Run("Dequeue.1", func(t *testing.T) { + k, brdg := mockit(1) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { ack <- <-rcv }() + k.Await() + + it.Then(t).Should( + it.Seq(brdg.ack).Equal(`1`), + ) + }) + + t.Run("Dequeue.N", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + ack <- <-rcv + ack <- <-rcv + ack <- <-rcv + }() + k.Await() + + it.Then(t).Should( + it.Seq(brdg.ack).Equal(`1`, `2`, `3`), + ) + }) + + t.Run("Error.1", func(t *testing.T) { + k, brdg := mockit(1) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + x := <-rcv + ack <- x.Fail(fmt.Errorf("failed")) + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("failed"), + ) + }) + + t.Run("Error.N.1", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + x := <-rcv + ack <- x.Fail(fmt.Errorf("failed")) + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("failed"), + ) + }) + + t.Run("Error.N.2", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + ack <- <-rcv + x := <-rcv + ack <- x.Fail(fmt.Errorf("failed")) + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("failed"), + ) + }) + + t.Run("Error.N.3", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + ack <- <-rcv + ack <- <-rcv + x := <-rcv + ack <- x.Fail(fmt.Errorf("failed")) + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("failed"), + ) + }) + + t.Run("Timeout.1", func(t *testing.T) { + k, brdg := mockit(1) + rcv, _ := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { <-rcv }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("timeout"), + ) + }) + + t.Run("Timeout.N.1", func(t *testing.T) { + k, brdg := mockit(3) + rcv, _ := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + <-rcv + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("timeout"), + ) + }) + + t.Run("Timeout.N.2", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + ack <- <-rcv + <-rcv + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("timeout"), + ) + }) + + t.Run("Timeout.N.3", func(t *testing.T) { + k, brdg := mockit(3) + rcv, ack := Dequeue(k, "test", codec) + + // Note: in real apps receive loop is always go function + go func() { + ack <- <-rcv + ack <- <-rcv + <-rcv + }() + k.Await() + + it.Then(t).Should( + it.Fail(brdg.Status).Contain("timeout"), + ) + }) +} + +//------------------------------------------------------------------------------ + +// bridge mock +type bridge struct { + *Bridge + seq []swarm.Bag + ack []string + err error +} + +func mockBridge(seq []swarm.Bag) *bridge { + return &bridge{ + Bridge: NewBridge(2 * time.Millisecond), + seq: seq, + } +} + +func (s *bridge) Ack(ctx context.Context, digest string) error { + if err := s.Bridge.Ack(ctx, digest); err != nil { + return err + } + + s.ack = append(s.ack, digest) + return nil +} + +func (s *bridge) Run() { + s.err = s.Bridge.Dispatch(s.seq) +} + +// Note: simplify assertion +func (s *bridge) Status() error { + // Note: due to faked "handler" there is raise on setting s.err + // in Lambda the Dispatch returns value directly to lambda handler + if s.err == nil { + time.Sleep(10 * yield_before_close) + } + return s.err +} diff --git a/kernel/cathode.go b/kernel/cathode.go new file mode 100644 index 0000000..4a43787 --- /dev/null +++ b/kernel/cathode.go @@ -0,0 +1,198 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/fogfish/swarm" +) + +// Cathode defines on-the-wire protocol for [swarm.Bag], covering the ingress. +type Cathode interface { + Ack(ctx context.Context, digest string) error + Err(ctx context.Context, digest string, err error) error + Ask(ctx context.Context) ([]swarm.Bag, error) +} + +// Decode message from wire format +type Decoder[T any] interface{ Decode([]byte) (T, error) } + +type Router = interface { + Route(context.Context, swarm.Bag) error +} + +type Dequeuer struct { + sync.WaitGroup + sync.RWMutex + + // Control-plane stop channel used by go routines to stop I/O on data channels + context context.Context + cancel context.CancelFunc + + // Kernel configuration + Config swarm.Config + + // event router, binds category with destination channel + router map[string]Router + + // Cathode is the reader port on message broker + Cathode Cathode +} + +// Creates instance of broker reader +func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer { + ctx, can := context.WithCancel(context.Background()) + + return &Dequeuer{ + Config: config, + context: ctx, + cancel: can, + router: make(map[string]Router), + Cathode: cathode, + } +} + +// Closes broker reader, gracefully shutdowns all I/O +func (k *Dequeuer) Close() { + k.cancel() + k.WaitGroup.Wait() +} + +// Await reader to complete +func (k *Dequeuer) Await() { + if spawner, ok := k.Cathode.(interface{ Run() }); ok { + go spawner.Run() + } + + k.receive() + <-k.context.Done() + k.WaitGroup.Wait() +} + +// internal infinite receive loop. +// waiting for message from event buses and queues and schedules it for delivery. +func (k *Dequeuer) receive() { + asker := func() { + seq, err := k.Cathode.Ask(k.context) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + + for i := 0; i < len(seq); i++ { + bag := seq[i] + + k.RWMutex.RLock() + r, has := k.router[bag.Category] + k.RWMutex.RUnlock() + + if has { + err := r.Route(k.context, bag) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + } + } + } + + k.WaitGroup.Add(1) + go func() { + slog.Debug("kernel receive loop started") + + exit: + for { + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case <-time.After(k.Config.PollFrequency): + asker() + } + } + + k.WaitGroup.Done() + slog.Debug("kernel receive loop stopped") + }() +} + +// Dequeue creates pair of channels within kernel to enqueue messages +func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) { + rcv := make(chan swarm.Msg[T], k.Config.CapRcv) + ack := make(chan swarm.Msg[T], k.Config.CapAck) + + k.RWMutex.Lock() + k.router[cat] = router[T]{ch: rcv, codec: codec} + k.RWMutex.Unlock() + + // emitter routine + acks := func(msg swarm.Msg[T]) { + if msg.Error == nil { + err := k.Cathode.Ack(k.context, msg.Digest) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + } + } else { + err := k.Cathode.Err(k.context, msg.Digest, msg.Error) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + } + } + } + + k.WaitGroup.Add(1) + go func() { + slog.Debug("kernel dequeue started", "cat", cat) + + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case msg := <-ack: + acks(msg) + } + } + + backlog := len(ack) + close(ack) + + if backlog != 0 { + for msg := range ack { + acks(msg) + } + } + + k.WaitGroup.Done() + slog.Debug("kernel dequeue stopped", "cat", cat) + }() + + return rcv, ack +} diff --git a/kernel/cathode_test.go b/kernel/cathode_test.go new file mode 100644 index 0000000..a8fae4f --- /dev/null +++ b/kernel/cathode_test.go @@ -0,0 +1,106 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" +) + +func TestDequeuer(t *testing.T) { + codec := encoding.NewCodecJson[string]() + none := mockCathode(nil, nil) + pass := mockCathode(make(chan string), + []swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}}, + ) + + t.Run("Kernel", func(t *testing.T) { + k := New(nil, NewDequeuer(mockCathode(nil, nil), swarm.Config{})) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + k.Await() + }) + + t.Run("None", func(t *testing.T) { + k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second}) + go k.Await() + k.Close() + }) + + t.Run("Idle", func(t *testing.T) { + k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second}) + Dequeue(k, "test", codec) + go k.Await() + k.Close() + }) + + t.Run("Dequeue.1", func(t *testing.T) { + k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", codec) + go k.Await() + + ack <- <-rcv + it.Then(t).Should( + it.Equal(string(<-pass.ack), `1`), + ) + + k.Close() + }) + + t.Run("Backlog", func(t *testing.T) { + k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", codec) + go k.Await() + + ack <- <-rcv + ack <- <-rcv + ack <- <-rcv + ack <- <-rcv + go k.Close() + + it.Then(t).Should( + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + it.Equal(string(<-pass.ack), `1`), + ) + }) +} + +//------------------------------------------------------------------------------ + +type cathode struct { + seq []swarm.Bag + ack chan string +} + +func mockCathode(ack chan string, seq []swarm.Bag) cathode { + return cathode{seq: seq, ack: ack} +} + +func (c cathode) Ack(ctx context.Context, digest string) error { + c.ack <- digest + return nil +} + +func (c cathode) Err(ctx context.Context, digest string, err error) error { + c.ack <- digest + return nil +} + +func (c cathode) Ask(context.Context) ([]swarm.Bag, error) { + return c.seq, nil +} diff --git a/kernel/emitter.go b/kernel/emitter.go new file mode 100644 index 0000000..158910d --- /dev/null +++ b/kernel/emitter.go @@ -0,0 +1,129 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "sync" + + "github.com/fogfish/swarm" +) + +// Emitter defines on-the-wire protocol for [swarm.Bag], covering egress. +type Emitter interface { + Enq(context.Context, swarm.Bag) error +} + +// Encodes message into wire format +type Encoder[T any] interface{ Encode(T) ([]byte, error) } + +// Messaging Egress port +type Enqueuer struct { + sync.WaitGroup + + // Control-plane stop channel used by go routines to stop I/O on data channels + context context.Context + cancel context.CancelFunc + + // Kernel configuration + Config swarm.Config + + // Emitter is the writer port on message broker + Emitter Emitter +} + +// Creates instance of broker writer +func NewEnqueuer(emitter Emitter, config swarm.Config) *Enqueuer { + ctx, can := context.WithCancel(context.Background()) + + return &Enqueuer{ + Config: config, + context: ctx, + cancel: can, + Emitter: emitter, + } +} + +// Close enqueuer +func (k *Enqueuer) Close() { + k.cancel() + k.WaitGroup.Wait() +} + +// Await enqueue +func (k *Enqueuer) Await() { + <-k.context.Done() + k.WaitGroup.Wait() +} + +// Enqueue creates pair of channels within kernel to enqueue messages +func Enqueue[T any](k *Enqueuer, cat string, codec Encoder[T]) ( /*snd*/ chan<- T /*dlq*/, <-chan T) { + snd := make(chan T, k.Config.CapOut) + dlq := make(chan T, k.Config.CapDlq) + + // emitter routine + emit := func(obj T) { + msg, err := codec.Encode(obj) + if err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + + bag := swarm.Bag{Category: cat, Object: msg} + + if err := k.Emitter.Enq(context.Background(), bag); err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + } + + k.WaitGroup.Add(1) + go func() { + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.context.Done(): + break exit + default: + } + + select { + case <-k.context.Done(): + break exit + case obj := <-snd: + emit(obj) + } + } + + backlog := len(snd) + close(snd) + + if backlog != 0 { + for obj := range snd { + emit(obj) + } + } + + k.WaitGroup.Done() + }() + + return snd, dlq +} diff --git a/kernel/emitter_test.go b/kernel/emitter_test.go new file mode 100644 index 0000000..044c9fc --- /dev/null +++ b/kernel/emitter_test.go @@ -0,0 +1,212 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" +) + +func TestEnqueuer(t *testing.T) { + codec := encoding.NewCodecJson[string]() + mockit := func(config swarm.Config) (*Enqueuer, *emitter) { + mock := mockEmitter(10) + k := NewEnqueuer(mock, config) + + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + + return k, mock + } + + t.Run("Kernel", func(t *testing.T) { + k := New(NewEnqueuer(mockEmitter(10), swarm.Config{}), nil) + go func() { + time.Sleep(yield_before_close) + k.Close() + }() + k.Await() + }) + + t.Run("None", func(t *testing.T) { + k, _ := mockit(swarm.Config{}) + Enqueue(k, "test", codec) + k.Await() + }) + + t.Run("Enqueue.1", func(t *testing.T) { + k, e := mockit(swarm.Config{}) + snd, _ := Enqueue(k, "test", codec) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-e.val, `"1"`), + ) + + k.Await() + }) + + t.Run("Enqueue.1.Shut", func(t *testing.T) { + k, e := mockit(swarm.Config{}) + snd, _ := Enqueue(k, "test", codec) + + snd <- "1" + k.Await() + + it.Then(t).Should( + it.Seq(e.seq).Equal(`"1"`), + ) + }) + + t.Run("Enqueue.1.Error", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(looser{}, swarm.Config{StdErr: err}) + snd, dlq := Enqueue(k, "test", codec) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("lost"), + ) + + k.Close() + }) + + t.Run("Enqueue.1.Codec", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(mockEmitter(10), swarm.Config{StdErr: err}) + snd, dlq := Enqueue(k, "test", looser{}) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("invalid"), + ) + + k.Close() + }) + + t.Run("Enqueue.N", func(t *testing.T) { + k, e := mockit(swarm.Config{}) + snd, _ := Enqueue(k, "test", codec) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-e.val, `"1"`), + ) + + snd <- "2" + it.Then(t).Should( + it.Equal(<-e.val, `"2"`), + ) + + snd <- "3" + it.Then(t).Should( + it.Equal(<-e.val, `"3"`), + ) + + k.Await() + }) + + t.Run("Enqueue.N.Shut", func(t *testing.T) { + k, e := mockit(swarm.Config{}) + snd, _ := Enqueue(k, "test", codec) + + snd <- "1" + snd <- "2" + snd <- "3" + + k.Await() + + it.Then(t).Should( + it.Seq(e.seq).Equal(`"1"`, `"2"`, `"3"`), + ) + }) + + t.Run("Enqueue.N.Backlog", func(t *testing.T) { + e := mockEmitter(10) + k := NewEnqueuer(e, swarm.Config{CapOut: 4}) + snd, _ := Enqueue(k, "test", codec) + + snd <- "1" + snd <- "2" + snd <- "3" + + k.Close() + + it.Then(t).Should( + it.Seq(e.seq).Equal(`"1"`, `"2"`, `"3"`), + ) + }) + + t.Run("Enqueue.N.Error", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(looser{}, swarm.Config{CapOut: 4, CapDlq: 4, StdErr: err}) + snd, dlq := Enqueue(k, "test", codec) + + snd <- "1" + snd <- "2" + snd <- "3" + + it.Then(t).Should( + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("lost"), + + it.Equal(<-dlq, "2"), + it.Fail(func() error { return <-err }).Contain("lost"), + + it.Equal(<-dlq, "3"), + it.Fail(func() error { return <-err }).Contain("lost"), + ) + + k.Close() + }) +} + +//------------------------------------------------------------------------------ + +type emitter struct { + wait int + seq []string + val chan string +} + +func mockEmitter(wait int) *emitter { + return &emitter{ + wait: wait, + seq: make([]string, 0), + val: make(chan string, 1000), + } +} + +func (e *emitter) Enq(ctx context.Context, bag swarm.Bag) error { + time.Sleep(time.Duration(e.wait) * time.Microsecond) + e.seq = append(e.seq, string(bag.Object)) + + e.val <- string(bag.Object) + return nil +} + +type looser struct{} + +func (e looser) Enq(ctx context.Context, bag swarm.Bag) error { + return fmt.Errorf("lost") +} + +func (e looser) Encode(x string) ([]byte, error) { + return nil, fmt.Errorf("invalid") +} diff --git a/codec.go b/kernel/encoding/codec.go similarity index 75% rename from codec.go rename to kernel/encoding/codec.go index 94a1fe5..7519882 100644 --- a/codec.go +++ b/kernel/encoding/codec.go @@ -6,7 +6,7 @@ // https://github.com/fogfish/swarm // -package swarm +package encoding import ( "encoding/json" @@ -15,6 +15,7 @@ import ( "github.com/fogfish/curie" "github.com/fogfish/golem/optics" "github.com/fogfish/guid/v2" + "github.com/fogfish/swarm" ) //------------------------------------------------------------------------------ @@ -67,14 +68,14 @@ func NewCodecPacket() CodecPacket { return CodecPacket{} } //------------------------------------------------------------------------------ // Event codec for I/O kernel -type CodecEvent[T any, E EventKind[T]] struct { +type CodecEvent[M, T any] struct { source string cat string - shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time] + shape optics.Lens4[M, string, curie.IRI, curie.IRI, time.Time] } -func (c CodecEvent[T, E]) Encode(obj *E) ([]byte, error) { - _, knd, src, _ := c.shape.Get(obj) +func (c CodecEvent[M, T]) Encode(obj swarm.Event[M, T]) ([]byte, error) { + _, knd, src, _ := c.shape.Get(obj.Meta) if knd == "" { knd = curie.IRI(c.cat) } @@ -83,22 +84,22 @@ func (c CodecEvent[T, E]) Encode(obj *E) ([]byte, error) { src = curie.IRI(c.source) } - c.shape.Put(obj, guid.G(guid.Clock).String(), knd, src, time.Now()) + c.shape.Put(obj.Meta, guid.G(guid.Clock).String(), knd, src, time.Now()) return json.Marshal(obj) } -func (c CodecEvent[T, E]) Decode(b []byte) (*E, error) { - x := new(E) - err := json.Unmarshal(b, x) +func (c CodecEvent[M, T]) Decode(b []byte) (swarm.Event[M, T], error) { + var x swarm.Event[M, T] + err := json.Unmarshal(b, &x) return x, err } -func NewCodecEvent[T any, E EventKind[T]](source, cat string) CodecEvent[T, E] { - return CodecEvent[T, E]{ +func NewCodecEvent[M, T any](source, cat string) CodecEvent[M, T] { + return CodecEvent[M, T]{ source: source, cat: cat, - shape: optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created"), + shape: optics.ForShape4[M, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created"), } } diff --git a/kernel/kernel.go b/kernel/kernel.go index f28adb8..9703e94 100644 --- a/kernel/kernel.go +++ b/kernel/kernel.go @@ -8,363 +8,34 @@ package kernel -import ( - "context" - "errors" - "log/slog" - "sync" - "time" - - "github.com/fogfish/swarm" -) - -type Codec[T any] interface { - Encode(T) ([]byte, error) - Decode([]byte) (T, error) -} - -type Emitter interface { - Enq(swarm.Bag) error -} - -type Cathode interface { - Ack(digest string) error - Ask() ([]swarm.Bag, error) -} - -type Spawner interface { - Spawn(*Kernel) error -} - -// Event Kernel builds an infrastructure for integrating message brokers, -// events busses into Golang channel environment. -// The implementation follows the pattern, defined by -// https://go101.org/article/channel-closing.html type Kernel struct { - sync.WaitGroup - sync.RWMutex - - // Kernel configuration - Config swarm.Config - - // Control-plane stop channel. It is used to notify the kernel to terminate. - // Kernel notifies control plane of individual routines. - mainStop chan struct{} - - // Control-plane stop channel used by go routines to stop I/O on data channels - ctrlStop chan struct{} - - // Control-plane for ack of batch elements - ctrlAcks chan *swarm.Context - - // event router, binds category with destination channel - router map[string]interface{ Route(swarm.Bag) error } - - // The flag indicates if Await loop is started - waiting bool - - // On the wire protocol emitter (writer) and cathode (receiver) - Emitter Emitter - Cathode Cathode + *Enqueuer + *Dequeuer } -// New routing and dispatch kernel -func New(emitter Emitter, cathode Cathode, config swarm.Config) *Kernel { +func New(enqueuer *Enqueuer, dequeuer *Dequeuer) *Kernel { return &Kernel{ - Config: config, - mainStop: make(chan struct{}, 1), // MUST BE buffered - ctrlStop: make(chan struct{}), - - router: map[string]interface{ Route(swarm.Bag) error }{}, - - Emitter: emitter, - Cathode: cathode, + Enqueuer: enqueuer, + Dequeuer: dequeuer, } } -// internal infinite receive loop. -// waiting for message from event buses and queues and schedules it for delivery. -func (k *Kernel) receive() { - k.WaitGroup.Add(1) - - asker := func() { - seq, err := k.Cathode.Ask() - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - return - } - - for i := 0; i < len(seq); i++ { - bag := seq[i] - - k.RWMutex.RLock() - r, has := k.router[bag.Ctx.Category] - k.RWMutex.RUnlock() - - if has { - err := r.Route(bag) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - return - } - } - } - } - - go func() { - exit: - for { - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case <-time.After(k.Config.PollFrequency): - asker() - } - } - - slog.Debug("Free kernel infinite loop") - k.WaitGroup.Done() - }() - - slog.Debug("Init kernel infinite loop") -} - -// Close event delivery infrastructure func (k *Kernel) Close() { - k.mainStop <- struct{}{} - if !k.waiting { - close(k.ctrlStop) - k.WaitGroup.Wait() + if k.Dequeuer != nil { + k.Dequeuer.Close() } -} -// Await for event delivery -func (k *Kernel) Await() { - k.waiting = true - - if spawner, ok := k.Cathode.(Spawner); ok { - spawner.Spawn(k) - } else { - k.receive() + if k.Enqueuer != nil { + k.Enqueuer.Close() } - - <-k.mainStop - close(k.ctrlStop) - k.WaitGroup.Wait() } -// Dispatches batch of messages -func (k *Kernel) Dispatch(seq []swarm.Bag, timeout time.Duration) error { - k.WaitGroup.Add(1) - k.ctrlAcks = make(chan *swarm.Context, len(seq)) - - wnd := map[string]struct{}{} - for i := 0; i < len(seq); i++ { - bag := seq[i] - wnd[bag.Ctx.Digest] = struct{}{} - - k.RWMutex.RLock() - r, has := k.router[bag.Ctx.Category] - k.RWMutex.RUnlock() - - if has { - err := r.Route(bag) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - continue - } - } - } - - var err error - -exit: - for { - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case ack := <-k.ctrlAcks: - if err == nil && ack.Error != nil { - err = ack.Error - } - - delete(wnd, ack.Digest) - if len(wnd) == 0 { - break exit - } - case <-time.After(timeout): - err = errors.New("ack timeout") - break exit - } - } - - close(k.ctrlAcks) - k.ctrlAcks = nil - - return err -} - -// Enqueue channels for kernel -func Enqueue[T any](k *Kernel, cat string, codec Codec[T]) ( /*snd*/ chan<- T /*dlq*/, <-chan T) { - snd := make(chan T, k.Config.CapOut) - dlq := make(chan T, k.Config.CapDLQ) - - // emitter routine - emit := func(obj T) { - msg, err := codec.Encode(obj) - if err != nil { - dlq <- obj - if k.Config.StdErr != nil { - k.Config.StdErr <- err - } - return - } - - ctx := swarm.NewContext(context.Background(), cat, "") - bag := swarm.Bag{Ctx: ctx, Object: msg} - - if err := k.Emitter.Enq(bag); err != nil { - dlq <- obj - if k.Config.StdErr != nil { - k.Config.StdErr <- err - } - return - } - - slog.Debug("emitted ", "cat", cat, "object", obj) - } - - k.WaitGroup.Add(1) - go func() { - exit: - for { - // The try-receive operation here is to - // try to exit the sender goroutine as - // early as possible. Try-receive and - // try-send select blocks are specially - // optimized by the standard Go - // compiler, so they are very efficient. - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case obj := <-snd: - emit(obj) - } - } - - backlog := len(snd) - close(snd) - - slog.Debug("Free enqueue kernel", "cat", cat, "backlog", backlog) - if backlog != 0 { - for obj := range snd { - emit(obj) - } - } - k.WaitGroup.Done() - }() - - slog.Debug("Init enqueue kernel", "cat", cat) - - return snd, dlq -} - -type router[T any] struct { - ch chan swarm.Msg[T] - codec Codec[T] -} - -func (a router[T]) Route(bag swarm.Bag) error { - obj, err := a.codec.Decode(bag.Object) - if err != nil { - return err +func (k *Kernel) Await() { + if k.Dequeuer != nil { + k.Dequeuer.Await() } - msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj} - a.ch <- msg - return nil -} - -// Enqueue channels for kernel -func Dequeue[T any](k *Kernel, cat string, codec Codec[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) { - rcv := make(chan swarm.Msg[T], k.Config.CapRcv) - ack := make(chan swarm.Msg[T], k.Config.CapAck) - - k.RWMutex.Lock() - k.router[cat] = router[T]{ch: rcv, codec: codec} - k.RWMutex.Unlock() - - // emitter routine - acks := func(msg swarm.Msg[T]) { - if msg.Ctx.Error == nil { - err := k.Cathode.Ack(msg.Ctx.Digest) - if k.Config.StdErr != nil && err != nil { - k.Config.StdErr <- err - } - - slog.Debug("acked ", "cat", cat, "object", msg.Object) - } - - if k.ctrlAcks != nil { - k.ctrlAcks <- msg.Ctx - } + if k.Enqueuer != nil { + k.Enqueuer.Await() } - - k.WaitGroup.Add(1) - go func() { - exit: - for { - // The try-receive operation here is to - // try to exit the sender goroutine as - // early as possible. Try-receive and - // try-send select blocks are specially - // optimized by the standard Go - // compiler, so they are very efficient. - select { - case <-k.ctrlStop: - break exit - default: - } - - select { - case <-k.ctrlStop: - break exit - case msg := <-ack: - acks(msg) - } - } - - backlog := len(ack) - close(ack) - - slog.Debug("Free dequeue kernel", "cat", cat, "backlog", backlog) - if backlog != 0 { - for msg := range ack { - acks(msg) - } - } - k.WaitGroup.Done() - }() - - slog.Debug("Init dequeue kernel", "cat", cat) - - return rcv, ack } diff --git a/kernel/router.go b/kernel/router.go new file mode 100644 index 0000000..8a78253 --- /dev/null +++ b/kernel/router.go @@ -0,0 +1,38 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "fmt" + + "github.com/fogfish/swarm" +) + +// Router is typed pair of message channel and codec +type router[T any] struct { + ch chan swarm.Msg[T] + codec Decoder[T] +} + +func (a router[T]) Route(ctx context.Context, bag swarm.Bag) error { + obj, err := a.codec.Decode(bag.Object) + if err != nil { + return err + } + + msg := swarm.ToMsg(bag, obj) + + select { + case <-ctx.Done(): + return fmt.Errorf("routing cancelled: category %s", bag.Category) + case a.ch <- msg: + return nil + } +} diff --git a/kernel/router_test.go b/kernel/router_test.go new file mode 100644 index 0000000..59242c3 --- /dev/null +++ b/kernel/router_test.go @@ -0,0 +1,30 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "context" + "testing" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" +) + +func TestRoute(t *testing.T) { + r := router[string]{ + ch: make(chan swarm.Msg[string], 1), + codec: encoding.NewCodecJson[string](), + } + + r.Route(context.Background(), swarm.Bag{Object: []byte(`"1"`)}) + it.Then(t).Should( + it.Equal((<-r.ch).Object, `1`), + ) +} diff --git a/service.go b/service.go index 5202850..a392a47 100644 --- a/service.go +++ b/service.go @@ -12,12 +12,6 @@ import ( "log/slog" ) -// Message broker -type Broker interface { - Close() - Await() -} - // Consumes dead letter messages // // swarm.LogDeadLetters(queue.Enqueue(...)) diff --git a/version.go b/version.go index 667fd39..063e0f1 100644 --- a/version.go +++ b/version.go @@ -8,4 +8,4 @@ package swarm -const Version = "v0.16.0" +const Version = "v0.20.0-alpha"