Skip to content

Commit

Permalink
move codec within kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 28, 2024
1 parent 14412eb commit 4f1f7d5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
24 changes: 13 additions & 11 deletions kernel/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/fogfish/it/v2"
"github.com/fogfish/logger/v3"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

func init() {
Expand All @@ -35,6 +36,7 @@ func init() {
const yield_before_close = 5 * time.Millisecond

func TestBridge(t *testing.T) {
codec := encoding.NewCodecJson[string]()
config := swarm.Config{PollFrequency: 0 * time.Millisecond}

//
Expand Down Expand Up @@ -62,13 +64,13 @@ func TestBridge(t *testing.T) {

t.Run("None", func(t *testing.T) {
k, _ := mockit(1)
Dequeue(k, "test", swarm.NewCodecJson[string]())
Dequeue(k, "test", codec)
k.Await()
})

t.Run("Dequeue.1", func(t *testing.T) {
k, brdg := mockit(1)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() { ack <- <-rcv }()
Expand All @@ -81,7 +83,7 @@ func TestBridge(t *testing.T) {

t.Run("Dequeue.N", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -98,7 +100,7 @@ func TestBridge(t *testing.T) {

t.Run("Error.1", func(t *testing.T) {
k, brdg := mockit(1)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -114,7 +116,7 @@ func TestBridge(t *testing.T) {

t.Run("Error.N.1", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -130,7 +132,7 @@ func TestBridge(t *testing.T) {

t.Run("Error.N.2", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -147,7 +149,7 @@ func TestBridge(t *testing.T) {

t.Run("Error.N.3", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -165,7 +167,7 @@ func TestBridge(t *testing.T) {

t.Run("Timeout.1", func(t *testing.T) {
k, brdg := mockit(1)
rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, _ := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() { <-rcv }()
Expand All @@ -178,7 +180,7 @@ func TestBridge(t *testing.T) {

t.Run("Timeout.N.1", func(t *testing.T) {
k, brdg := mockit(3)
rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, _ := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -193,7 +195,7 @@ func TestBridge(t *testing.T) {

t.Run("Timeout.N.2", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand All @@ -209,7 +211,7 @@ func TestBridge(t *testing.T) {

t.Run("Timeout.N.3", func(t *testing.T) {
k, brdg := mockit(3)
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)

// Note: in real apps receive loop is always go function
go func() {
Expand Down
8 changes: 5 additions & 3 deletions kernel/cathode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (

"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

func TestDequeuer(t *testing.T) {
codec := encoding.NewCodecJson[string]()
none := mockCathode(nil, nil)
pass := mockCathode(make(chan string),
[]swarm.Bag{{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}},
Expand All @@ -40,14 +42,14 @@ func TestDequeuer(t *testing.T) {

t.Run("Idle", func(t *testing.T) {
k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second})
Dequeue(k, "test", swarm.NewCodecJson[string]())
Dequeue(k, "test", codec)
go k.Await()
k.Close()
})

t.Run("Dequeue.1", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond})
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)
go k.Await()

ack <- <-rcv
Expand All @@ -60,7 +62,7 @@ func TestDequeuer(t *testing.T) {

t.Run("Backlog", func(t *testing.T) {
k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond})
rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]())
rcv, ack := Dequeue(k, "test", codec)
go k.Await()

ack <- <-rcv
Expand Down
18 changes: 10 additions & 8 deletions kernel/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (

"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

func TestEnqueuer(t *testing.T) {
codec := encoding.NewCodecJson[string]()
mockit := func(config swarm.Config) (*Enqueuer, *emitter) {
mock := mockEmitter(10)
k := NewEnqueuer(mock, config)
Expand All @@ -42,13 +44,13 @@ func TestEnqueuer(t *testing.T) {

t.Run("None", func(t *testing.T) {
k, _ := mockit(swarm.Config{})
Enqueue(k, "test", swarm.NewCodecJson[string]())
Enqueue(k, "test", codec)
k.Await()
})

t.Run("Enqueue.1", func(t *testing.T) {
k, e := mockit(swarm.Config{})
snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, _ := Enqueue(k, "test", codec)

snd <- "1"
it.Then(t).Should(
Expand All @@ -60,7 +62,7 @@ func TestEnqueuer(t *testing.T) {

t.Run("Enqueue.1.Shut", func(t *testing.T) {
k, e := mockit(swarm.Config{})
snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, _ := Enqueue(k, "test", codec)

snd <- "1"
k.Await()
Expand All @@ -73,7 +75,7 @@ func TestEnqueuer(t *testing.T) {
t.Run("Enqueue.1.Error", func(t *testing.T) {
err := make(chan error)
k := NewEnqueuer(looser{}, swarm.Config{StdErr: err})
snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, dlq := Enqueue(k, "test", codec)

snd <- "1"
it.Then(t).Should(
Expand All @@ -100,7 +102,7 @@ func TestEnqueuer(t *testing.T) {

t.Run("Enqueue.N", func(t *testing.T) {
k, e := mockit(swarm.Config{})
snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, _ := Enqueue(k, "test", codec)

snd <- "1"
it.Then(t).Should(
Expand All @@ -122,7 +124,7 @@ func TestEnqueuer(t *testing.T) {

t.Run("Enqueue.N.Shut", func(t *testing.T) {
k, e := mockit(swarm.Config{})
snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, _ := Enqueue(k, "test", codec)

snd <- "1"
snd <- "2"
Expand All @@ -138,7 +140,7 @@ func TestEnqueuer(t *testing.T) {
t.Run("Enqueue.N.Backlog", func(t *testing.T) {
e := mockEmitter(10)
k := NewEnqueuer(e, swarm.Config{CapOut: 4})
snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, _ := Enqueue(k, "test", codec)

snd <- "1"
snd <- "2"
Expand All @@ -154,7 +156,7 @@ func TestEnqueuer(t *testing.T) {
t.Run("Enqueue.N.Error", func(t *testing.T) {
err := make(chan error)
k := NewEnqueuer(looser{}, swarm.Config{CapOut: 4, CapDLQ: 4, StdErr: err})
snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]())
snd, dlq := Enqueue(k, "test", codec)

snd <- "1"
snd <- "2"
Expand Down
7 changes: 4 additions & 3 deletions codec.go → kernel/encoding/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// https://github.com/fogfish/swarm
//

package swarm
package encoding

import (
"encoding/json"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/fogfish/curie"
"github.com/fogfish/golem/optics"
"github.com/fogfish/guid/v2"
"github.com/fogfish/swarm"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -67,7 +68,7 @@ func NewCodecPacket() CodecPacket { return CodecPacket{} }
//------------------------------------------------------------------------------

// Event codec for I/O kernel
type CodecEvent[T any, E EventKind[T]] struct {
type CodecEvent[T any, E swarm.EventKind[T]] struct {
source string
cat string
shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time]
Expand Down Expand Up @@ -95,7 +96,7 @@ func (c CodecEvent[T, E]) Decode(b []byte) (*E, error) {
return x, err
}

func NewCodecEvent[T any, E EventKind[T]](source, cat string) CodecEvent[T, E] {
func NewCodecEvent[T any, E swarm.EventKind[T]](source, cat string) CodecEvent[T, E] {
return CodecEvent[T, E]{
source: source,
cat: cat,
Expand Down
3 changes: 2 additions & 1 deletion kernel/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (

"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

func TestRoute(t *testing.T) {
r := router[string]{
ch: make(chan swarm.Msg[string], 1),
codec: swarm.NewCodecJson[string](),
codec: encoding.NewCodecJson[string](),
}

r.Route(context.Background(), swarm.Bag{Object: []byte(`"1"`)})
Expand Down

0 comments on commit 4f1f7d5

Please sign in to comment.