Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix routing in bridge
Browse files Browse the repository at this point in the history
fogfish committed Sep 28, 2024
1 parent f417dd2 commit f48cfab
Showing 4 changed files with 218 additions and 40 deletions.
223 changes: 193 additions & 30 deletions kernel/bridge_test.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,9 @@ package kernel

import (
"context"
"fmt"
"log/slog"
"strconv"
"testing"
"time"

@@ -29,72 +31,223 @@ func init() {
)
}

// controls yield time before kernel is closed
const yield_before_close = 1 * time.Millisecond

func TestBridge(t *testing.T) {
// Use-case
// 1. Recv - Ack
// 2. Recv - Timeout
// 3. Recv - Error
// 4. Recv batch

config := swarm.Config{PollFrequency: 0 * time.Millisecond}

//
mockit := func(n int) (*Dequeuer, *bridge) {
seq := []swarm.Bag{}
for i := 0; i < n; i++ {
val := strconv.Itoa(i + 1)
seq = append(seq,
swarm.Bag{
Ctx: &swarm.Context{Category: "test", Digest: val},
Object: []byte(fmt.Sprintf(`"%s"`, val)), // JSON is expected
},
)
}

brdg := mockBridge(seq)
k := NewDequeuer(brdg, config)
go func() {
time.Sleep(yield_before_close)
k.Close()
}()

seq := []swarm.Bag{
{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)},
{Ctx: &swarm.Context{Category: "test", Digest: "2"}, Object: []byte(`"2"`)},
return k, brdg
}

t.Run("None", func(t *testing.T) {
k := NewDequeuer(mockSpawner(seq), swarm.Config{PollFrequency: 1 * time.Second})
go k.Await()
k.Close()
k, _ := mockit(1)
k.Await()
})

t.Run("Dequeue.1", func(t *testing.T) {
mock := mockSpawner(seq)
k := NewDequeuer(mock, swarm.Config{PollFrequency: 10 * time.Millisecond})
k, brdg := mockit(1)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
go k.Await()

ack <- <-rcv
ack <- <-rcv
// Note: in real apps receive loop is always go function
go func() { ack <- <-rcv }()
k.Await()

it.Then(t).Should(
it.Seq(brdg.ack).Equal(`1`),
)
})

t.Run("Dequeue.N", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() {
ack <- <-rcv
ack <- <-rcv
ack <- <-rcv
}()
k.Await()

it.Then(t).Should(
it.Seq(brdg.ack).Equal(`1`, `2`, `3`),
)
})

t.Run("Error.1", func(t *testing.T) {
k, brdg := mockit(1)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

k.Close()
// Note: in real apps receive loop is always go function
go func() {
x := <-rcv
ack <- x.Fail(fmt.Errorf("failed"))
}()
k.Await()

it.Then(t).Should(
it.Seq(mock.ack).Equal(`1`, `2`),
it.Fail(brdg.Status).Contain("failed"),
)
})

t.Run("Timeout", func(t *testing.T) {
mock := mockSpawner(seq)
k := NewDequeuer(mock, swarm.Config{PollFrequency: 0 * time.Millisecond})
t.Run("Error.N.1", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
go k.Await()

ack <- <-rcv
<-rcv
time.Sleep(1 * time.Millisecond)
// Note: in real apps receive loop is always go function
go func() {
x := <-rcv
ack <- x.Fail(fmt.Errorf("failed"))
}()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("failed"),
)
})

t.Run("Error.N.2", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

k.Close()
time.Sleep(1 * time.Millisecond)
// Note: in real apps receive loop is always go function
go func() {
ack <- <-rcv
x := <-rcv
ack <- x.Fail(fmt.Errorf("failed"))
}()
k.Await()

it.Then(t).ShouldNot(
it.Nil(mock.err),
it.Then(t).Should(
it.Fail(brdg.Status).Contain("failed"),
)
})

t.Run("Error.N.3", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() {
ack <- <-rcv
ack <- <-rcv
x := <-rcv
ack <- x.Fail(fmt.Errorf("failed"))
}()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("failed"),
)
})

t.Run("Timeout.1", func(t *testing.T) {
k, brdg := mockit(1)
rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() { <-rcv }()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("timeout"),
)
})

t.Run("Timeout.N.1", func(t *testing.T) {
k, brdg := mockit(3)
rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() {
<-rcv
}()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("timeout"),
)
})

t.Run("Timeout.N.2", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() {
ack <- <-rcv
<-rcv
}()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("timeout"),
)
})

t.Run("Timeout.N.3", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())

// Note: in real apps receive loop is always go function
go func() {
ack <- <-rcv
ack <- <-rcv
<-rcv
}()
k.Await()

it.Then(t).Should(
it.Fail(brdg.Status).Contain("timeout"),
)
})
}

//------------------------------------------------------------------------------

type spawner struct {
// bridge mock
type bridge struct {
*Bridge
seq []swarm.Bag
ack []string
err error
}

func mockSpawner(seq []swarm.Bag) *spawner {
return &spawner{
Bridge: NewBridge(1 * time.Millisecond),
func mockBridge(seq []swarm.Bag) *bridge {
return &bridge{
Bridge: NewBridge(2 * time.Millisecond),
seq: seq,
}
}

func (s *spawner) Ack(ctx context.Context, digest string) error {
func (s *bridge) Ack(ctx context.Context, digest string) error {
if err := s.Bridge.Ack(ctx, digest); err != nil {
return err
}
@@ -103,6 +256,16 @@ func (s *spawner) Ack(ctx context.Context, digest string) error {
return nil
}

func (s *spawner) Run() {
func (s *bridge) Run() {
s.err = s.Bridge.Dispatch(s.seq)
}

// Note: simplify assertion
func (s *bridge) Status() error {
// Note: due to faked "handler" there is raise on setting s.err
// in Lambda the Dispatch returns value directly to lambda handler
if s.err == nil {
time.Sleep(10 * yield_before_close)
}
return s.err
}
14 changes: 9 additions & 5 deletions kernel/cathode.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,10 @@ type Cathode interface {
// Decode message from wire format
type Decoder[T any] interface{ Decode([]byte) (T, error) }

type Router = interface {
Route(context.Context, swarm.Bag) error
}

type Dequeuer struct {
sync.WaitGroup
sync.RWMutex
@@ -38,7 +42,7 @@ type Dequeuer struct {
Config swarm.Config

// event router, binds category with destination channel
router map[string]interface{ Route(swarm.Bag) error }
router map[string]Router

// Cathode is the reader port on message broker
Cathode Cathode
@@ -51,7 +55,7 @@ func NewDequeuer(cathode Cathode, config swarm.Config) *Dequeuer {
Config: config,
context: ctx,
cancel: can,
router: map[string]interface{ Route(swarm.Bag) error }{},
router: make(map[string]Router),
Cathode: cathode,
}
}
@@ -90,7 +94,7 @@ func (k *Dequeuer) receive() {
k.RWMutex.RUnlock()

if has {
err := r.Route(bag)
err := r.Route(k.context, bag)
if k.Config.StdErr != nil && err != nil {
k.Config.StdErr <- err
return
@@ -101,7 +105,7 @@ func (k *Dequeuer) receive() {

k.WaitGroup.Add(1)
go func() {
slog.Debug("kernel dequeue loop started")
slog.Debug("kernel receive loop started")

exit:
for {
@@ -120,7 +124,7 @@ func (k *Dequeuer) receive() {
}

k.WaitGroup.Done()
slog.Debug("kernel dequeue loop stopped")
slog.Debug("kernel receive loop stopped")
}()
}

18 changes: 14 additions & 4 deletions kernel/router.go
Original file line number Diff line number Diff line change
@@ -8,21 +8,31 @@

package kernel

import "github.com/fogfish/swarm"
import (
"context"
"fmt"

"github.com/fogfish/swarm"
)

// Router is typed pair of message channel and codec
type router[T any] struct {
ch chan swarm.Msg[T]
codec Decoder[T]
}

func (a router[T]) Route(bag swarm.Bag) error {
func (a router[T]) Route(ctx context.Context, bag swarm.Bag) error {
obj, err := a.codec.Decode(bag.Object)
if err != nil {
return err
}

msg := swarm.Msg[T]{Ctx: bag.Ctx, Object: obj}
a.ch <- msg
return nil

select {
case <-ctx.Done():
return fmt.Errorf("routing cancelled: category %s", bag.Ctx.Category)
case a.ch <- msg:
return nil
}
}
3 changes: 2 additions & 1 deletion kernel/router_test.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
package kernel

import (
"context"
"testing"

"github.com/fogfish/it/v2"
@@ -21,7 +22,7 @@ func TestRoute(t *testing.T) {
codec: swarm.NewCodecJson[string](),
}

r.Route(swarm.Bag{Object: []byte(`"1"`)})
r.Route(context.Background(), swarm.Bag{Object: []byte(`"1"`)})
it.Then(t).Should(
it.Equal((<-r.ch).Object, `1`),
)

0 comments on commit f48cfab

Please sign in to comment.