Skip to content

Commit

Permalink
refactor definition of message context
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 28, 2024
1 parent 7fb0b63 commit 1810991
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 50 deletions.
53 changes: 33 additions & 20 deletions bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
package swarm

import (
"context"
"reflect"
"strings"
)

// Global context for the message
type Context struct {
context.Context

// Msg is a generic envelop type for incoming messages.
// It contains both decoded object and its digest used to acknowledge message.
type Msg[T any] struct {
// Message category ~ topic
Category string

Expand All @@ -26,36 +24,51 @@ type Context struct {

// Error on the message processing
Error error
}

func NewContext(ctx context.Context, cat, digest string) *Context {
return &Context{
Context: ctx,
Category: cat,
Digest: digest,
}
}

// Msg is a generic envelop type for incoming messages.
// It contains both decoded object and its digest used to acknowledge message.
type Msg[T any] struct {
Ctx *Context
// Message decoded content
Object T
}

// Fail message with error
func (msg Msg[T]) Fail(err error) Msg[T] {
msg.Ctx.Error = err
msg.Error = err
return msg
}

// Bag is an abstract container for octet stream.
// Bag is used by the transport to abstract message on the wire.
type Bag struct {
Ctx *Context
// Message category ~ topic
Category string

// Unique brief summary of the message
Digest string

// Error on the message processing
Error error

// Message raw content
Object []byte
}

func ToMsg[T any](bag Bag, object T) Msg[T] {
return Msg[T]{
Category: bag.Category,
Digest: bag.Digest,
Error: bag.Error,
Object: object,
}
}

// func ToBag[T any](msg Msg[T], object []byte) Bag {
// return Bag{
// Category: msg.Category,
// Digest: msg.Digest,
// Error: msg.Error,
// Object: object,
// }
// }

// TypeOf returns normalized name of the type T.
func TypeOf[T any](category ...string) string {
if len(category) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion dequeue/dequeue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down
10 changes: 5 additions & 5 deletions dequeue/dequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestDequeueType(t *testing.T) {
k.Await()

it.Then(t).Should(
it.Equal(msg.Ctx.Category, "User"),
it.Equal(msg.Ctx.Digest, "1"),
it.Equal(msg.Category, "User"),
it.Equal(msg.Digest, "1"),
it.Equal(msg.Object.ID, "id"),
it.Equal(msg.Object.Text, "user"),
)
Expand All @@ -65,8 +65,8 @@ func TestDequeueBytes(t *testing.T) {
k.Await()

it.Then(t).Should(
it.Equal(msg.Ctx.Category, "User"),
it.Equal(msg.Ctx.Digest, "1"),
it.Equal(msg.Category, "User"),
it.Equal(msg.Digest, "1"),
it.Equal(string(msg.Object), `{"id":"id","text":"user"}`),
)
}
Expand Down Expand Up @@ -99,6 +99,6 @@ func (c cathode) Ask(context.Context) ([]swarm.Bag, error) {
return nil, err
}

bag := []swarm.Bag{{Ctx: &swarm.Context{Category: c.cat, Digest: "1"}, Object: data}}
bag := []swarm.Bag{{Category: c.cat, Digest: "1", Object: data}}
return bag, nil
}
12 changes: 6 additions & 6 deletions enqueue/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (q *EmitterTyped[T]) Enq(ctx context.Context, object T, cat ...string) erro
}

bag := swarm.Bag{
Ctx: swarm.NewContext(context.Background(), category, ""),
Object: msg,
Category: category,
Object: msg,
}

err = q.kernel.Emitter.Enq(ctx, bag)
Expand Down Expand Up @@ -86,8 +86,8 @@ func (q *EmitterEvent[T, E]) Enq(ctx context.Context, object *E, cat ...string)
}

bag := swarm.Bag{
Ctx: swarm.NewContext(context.Background(), category, ""),
Object: msg,
Category: category,
Object: msg,
}

err = q.kernel.Emitter.Enq(ctx, bag)
Expand Down Expand Up @@ -138,8 +138,8 @@ func (q *EmitterBytes) Enq(ctx context.Context, object []byte, cat ...string) er
}

bag := swarm.Bag{
Ctx: swarm.NewContext(context.Background(), category, ""),
Object: msg,
Category: category,
Object: msg,
}

err = q.kernel.Emitter.Enq(ctx, bag)
Expand Down
2 changes: 1 addition & 1 deletion kernel/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewBridge(timeToFlight time.Duration) *Bridge {
func (s *Bridge) Dispatch(seq []swarm.Bag) error {
s.inflight = map[string]struct{}{}
for _, bag := range seq {
s.inflight[bag.Ctx.Digest] = struct{}{}
s.inflight[bag.Digest] = struct{}{}
}

s.ch <- seq
Expand Down
5 changes: 3 additions & 2 deletions kernel/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func TestBridge(t *testing.T) {
val := strconv.Itoa(i + 1)
seq = append(seq,
swarm.Bag{
Ctx: &swarm.Context{Category: "test", Digest: val},
Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected
Category: "test",
Digest: val,
Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected
},
)
}
Expand Down
8 changes: 4 additions & 4 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (k *Dequeuer) receive() {
bag := seq[i]

k.RWMutex.RLock()
r, has := k.router[bag.Ctx.Category]
r, has := k.router[bag.Category]
k.RWMutex.RUnlock()

if has {
Expand Down Expand Up @@ -142,13 +142,13 @@ func Dequeue[T any](k *Dequeuer, cat string, codec Decoder[T]) ( /*rcv*/ <-chan

// emitter routine
acks := func(msg swarm.Msg[T]) {
if msg.Ctx.Error == nil {
err := k.Cathode.Ack(k.context, msg.Ctx.Digest)
if msg.Error == nil {
err := k.Cathode.Ack(k.context, msg.Digest)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
}
} else {
err := k.Cathode.Err(k.context, msg.Ctx.Digest, msg.Ctx.Error)
err := k.Cathode.Err(k.context, msg.Digest, msg.Error)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/cathode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestDequeuer(t *testing.T) {
codec := encoding.NewCodecJson[string]()
none := mockCathode(nil, nil)
pass := mockCathode(make(chan string),
[]swarm.Bag{{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}},
[]swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}},
)

t.Run("Kernel", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions kernel/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func Enqueue[T any](k *Enqueuer, cat string, codec Encoder[T]) ( /*snd*/ chan<-
return
}

ctx := swarm.NewContext(context.Background(), cat, "")
bag := swarm.Bag{Ctx: ctx, Object: msg}
bag := swarm.Bag{Category: cat, Object: msg}

if err := k.Emitter.Enq(context.Background(), bag); err != nil {
dlq <- obj
Expand Down
4 changes: 2 additions & 2 deletions kernel/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func (a router[T]) Route(ctx context.Context, bag swarm.Bag) error {
return err
}

msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj}
msg := swarm.ToMsg(bag, obj)

select {
case <-ctx.Done():
return fmt.Errorf("routing cancelled: category %s", bag.Ctx.Category)
return fmt.Errorf("routing cancelled: category %s", bag.Category)
case a.ch <- msg:
return nil
}
Expand Down
6 changes: 0 additions & 6 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"log/slog"
)

// Message broker
type Broker interface {
Close()
Await()
}

// Consumes dead letter messages
//
// swarm.LogDeadLetters(queue.Enqueue(...))
Expand Down

0 comments on commit 1810991

Please sign in to comment.