diff --git a/bag.go b/bag.go index a17d7a5..360625d 100644 --- a/bag.go +++ b/bag.go @@ -9,15 +9,13 @@ package swarm import ( - "context" "reflect" "strings" ) -// Global context for the message -type Context struct { - context.Context - +// Msg is a generic envelop type for incoming messages. +// It contains both decoded object and its digest used to acknowledge message. +type Msg[T any] struct { // Message category ~ topic Category string @@ -26,36 +24,51 @@ type Context struct { // Error on the message processing Error error -} - -func NewContext(ctx context.Context, cat, digest string) *Context { - return &Context{ - Context: ctx, - Category: cat, - Digest: digest, - } -} -// Msg is a generic envelop type for incoming messages. -// It contains both decoded object and its digest used to acknowledge message. -type Msg[T any] struct { - Ctx *Context + // Message decoded content Object T } // Fail message with error func (msg Msg[T]) Fail(err error) Msg[T] { - msg.Ctx.Error = err + msg.Error = err return msg } // Bag is an abstract container for octet stream. // Bag is used by the transport to abstract message on the wire. type Bag struct { - Ctx *Context + // Message category ~ topic + Category string + + // Unique brief summary of the message + Digest string + + // Error on the message processing + Error error + + // Message raw content Object []byte } +func ToMsg[T any](bag Bag, object T) Msg[T] { + return Msg[T]{ + Category: bag.Category, + Digest: bag.Digest, + Error: bag.Error, + Object: object, + } +} + +// func ToBag[T any](msg Msg[T], object []byte) Bag { +// return Bag{ +// Category: msg.Category, +// Digest: msg.Digest, +// Error: msg.Error, +// Object: object, +// } +// } + // TypeOf returns normalized name of the type T. func TypeOf[T any](category ...string) string { if len(category) > 0 { diff --git a/dequeue/dequeue.go b/dequeue/dequeue.go index 165a0df..0242a1f 100644 --- a/dequeue/dequeue.go +++ b/dequeue/dequeue.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. diff --git a/dequeue/dequeue_test.go b/dequeue/dequeue_test.go index 76fbc48..05c7237 100644 --- a/dequeue/dequeue_test.go +++ b/dequeue/dequeue_test.go @@ -39,8 +39,8 @@ func TestDequeueType(t *testing.T) { k.Await() it.Then(t).Should( - it.Equal(msg.Ctx.Category, "User"), - it.Equal(msg.Ctx.Digest, "1"), + it.Equal(msg.Category, "User"), + it.Equal(msg.Digest, "1"), it.Equal(msg.Object.ID, "id"), it.Equal(msg.Object.Text, "user"), ) @@ -65,8 +65,8 @@ func TestDequeueBytes(t *testing.T) { k.Await() it.Then(t).Should( - it.Equal(msg.Ctx.Category, "User"), - it.Equal(msg.Ctx.Digest, "1"), + it.Equal(msg.Category, "User"), + it.Equal(msg.Digest, "1"), it.Equal(string(msg.Object), `{"id":"id","text":"user"}`), ) } @@ -99,6 +99,6 @@ func (c cathode) Ask(context.Context) ([]swarm.Bag, error) { return nil, err } - bag := []swarm.Bag{{Ctx: &swarm.Context{Category: c.cat, Digest: "1"}, Object: data}} + bag := []swarm.Bag{{Category: c.cat, Digest: "1", Object: data}} return bag, nil } diff --git a/enqueue/writer.go b/enqueue/writer.go index 22e1681..35edbe5 100644 --- a/enqueue/writer.go +++ b/enqueue/writer.go @@ -39,8 +39,8 @@ func (q *EmitterTyped[T]) Enq(ctx context.Context, object T, cat ...string) erro } bag := swarm.Bag{ - Ctx: swarm.NewContext(context.Background(), category, ""), - Object: msg, + Category: category, + Object: msg, } err = q.kernel.Emitter.Enq(ctx, bag) @@ -86,8 +86,8 @@ func (q *EmitterEvent[T, E]) Enq(ctx context.Context, object *E, cat ...string) } bag := swarm.Bag{ - Ctx: swarm.NewContext(context.Background(), category, ""), - Object: msg, + Category: category, + Object: msg, } err = q.kernel.Emitter.Enq(ctx, bag) @@ -138,8 +138,8 @@ func (q *EmitterBytes) Enq(ctx context.Context, object []byte, cat ...string) er } bag := swarm.Bag{ - Ctx: swarm.NewContext(context.Background(), category, ""), - Object: msg, + Category: category, + Object: msg, } err = q.kernel.Emitter.Enq(ctx, bag) diff --git a/kernel/bridge.go b/kernel/bridge.go index 630f9c1..2d38815 100644 --- a/kernel/bridge.go +++ b/kernel/bridge.go @@ -43,7 +43,7 @@ func NewBridge(timeToFlight time.Duration) *Bridge { func (s *Bridge) Dispatch(seq []swarm.Bag) error { s.inflight = map[string]struct{}{} for _, bag := range seq { - s.inflight[bag.Ctx.Digest] = struct{}{} + s.inflight[bag.Digest] = struct{}{} } s.ch <- seq diff --git a/kernel/bridge_test.go b/kernel/bridge_test.go index 03756ea..b388374 100644 --- a/kernel/bridge_test.go +++ b/kernel/bridge_test.go @@ -46,8 +46,9 @@ func TestBridge(t *testing.T) { val := strconv.Itoa(i + 1) seq = append(seq, swarm.Bag{ - Ctx: &swarm.Context{Category: "test", Digest: val}, - Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected + Category: "test", + Digest: val, + Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected }, ) } diff --git a/kernel/cathode.go b/kernel/cathode.go index 0448c39..4a43787 100644 --- a/kernel/cathode.go +++ b/kernel/cathode.go @@ -93,7 +93,7 @@ func (k *Dequeuer) receive() { bag := seq[i] k.RWMutex.RLock() - r, has := k.router[bag.Ctx.Category] + r, has := k.router[bag.Category] k.RWMutex.RUnlock() if has { @@ -142,13 +142,13 @@ func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan // emitter routine acks := func(msg swarm.Msg[T]) { - if msg.Ctx.Error == nil { - err := k.Cathode.Ack(k.context, msg.Ctx.Digest) + if msg.Error == nil { + err := k.Cathode.Ack(k.context, msg.Digest) if k.Config.StdErr != nil && err != nil { k.Config.StdErr <- err } } else { - err := k.Cathode.Err(k.context, msg.Ctx.Digest, msg.Ctx.Error) + err := k.Cathode.Err(k.context, msg.Digest, msg.Error) if k.Config.StdErr != nil && err != nil { k.Config.StdErr <- err } diff --git a/kernel/cathode_test.go b/kernel/cathode_test.go index 0f3c54a..a8fae4f 100644 --- a/kernel/cathode_test.go +++ b/kernel/cathode_test.go @@ -22,7 +22,7 @@ func TestDequeuer(t *testing.T) { codec := encoding.NewCodecJson[string]() none := mockCathode(nil, nil) pass := mockCathode(make(chan string), - []swarm.Bag{{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}}, + []swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}}, ) t.Run("Kernel", func(t *testing.T) { diff --git a/kernel/emitter.go b/kernel/emitter.go index b33bb07..1acea99 100644 --- a/kernel/emitter.go +++ b/kernel/emitter.go @@ -78,8 +78,7 @@ func Enqueue[T any](k *Enqueuer, cat string, codec Encoder[T]) ( /*snd*/ chan<- return } - ctx := swarm.NewContext(context.Background(), cat, "") - bag := swarm.Bag{Ctx: ctx, Object: msg} + bag := swarm.Bag{Category: cat, Object: msg} if err := k.Emitter.Enq(context.Background(), bag); err != nil { dlq <- obj diff --git a/kernel/router.go b/kernel/router.go index d23ce48..8a78253 100644 --- a/kernel/router.go +++ b/kernel/router.go @@ -27,11 +27,11 @@ func (a router[T]) Route(ctx context.Context, bag swarm.Bag) error { return err } - msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj} + msg := swarm.ToMsg(bag, obj) select { case <-ctx.Done(): - return fmt.Errorf("routing cancelled: category %s", bag.Ctx.Category) + return fmt.Errorf("routing cancelled: category %s", bag.Category) case a.ch <- msg: return nil } diff --git a/service.go b/service.go index 5202850..a392a47 100644 --- a/service.go +++ b/service.go @@ -12,12 +12,6 @@ import ( "log/slog" ) -// Message broker -type Broker interface { - Close() - Await() -} - // Consumes dead letter messages // // swarm.LogDeadLetters(queue.Enqueue(...))