From 4f1f7d5d821e175a4b1244b8a76f4f5016948f21 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Sat, 28 Sep 2024 15:54:41 +0300 Subject: [PATCH] move codec within kernel --- kernel/bridge_test.go | 24 +++++++++++++----------- kernel/cathode_test.go | 8 +++++--- kernel/emitter_test.go | 18 ++++++++++-------- codec.go => kernel/encoding/codec.go | 7 ++++--- kernel/router_test.go | 3 ++- 5 files changed, 34 insertions(+), 26 deletions(-) rename codec.go => kernel/encoding/codec.go (92%) diff --git a/kernel/bridge_test.go b/kernel/bridge_test.go index 7a959e5..03756ea 100644 --- a/kernel/bridge_test.go +++ b/kernel/bridge_test.go @@ -19,6 +19,7 @@ import ( "github.com/fogfish/it/v2" "github.com/fogfish/logger/v3" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" ) func init() { @@ -35,6 +36,7 @@ func init() { const yield_before_close = 5 * time.Millisecond func TestBridge(t *testing.T) { + codec := encoding.NewCodecJson[string]() config := swarm.Config{PollFrequency: 0 * time.Millisecond} // @@ -62,13 +64,13 @@ func TestBridge(t *testing.T) { t.Run("None", func(t *testing.T) { k, _ := mockit(1) - Dequeue(k, "test", swarm.NewCodecJson[string]()) + Dequeue(k, "test", codec) k.Await() }) t.Run("Dequeue.1", func(t *testing.T) { k, brdg := mockit(1) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { ack <- <-rcv }() @@ -81,7 +83,7 @@ func TestBridge(t *testing.T) { t.Run("Dequeue.N", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -98,7 +100,7 @@ func TestBridge(t *testing.T) { t.Run("Error.1", func(t *testing.T) { k, brdg := mockit(1) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -114,7 +116,7 @@ func TestBridge(t *testing.T) { t.Run("Error.N.1", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -130,7 +132,7 @@ func TestBridge(t *testing.T) { t.Run("Error.N.2", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -147,7 +149,7 @@ func TestBridge(t *testing.T) { t.Run("Error.N.3", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -165,7 +167,7 @@ func TestBridge(t *testing.T) { t.Run("Timeout.1", func(t *testing.T) { k, brdg := mockit(1) - rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, _ := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { <-rcv }() @@ -178,7 +180,7 @@ func TestBridge(t *testing.T) { t.Run("Timeout.N.1", func(t *testing.T) { k, brdg := mockit(3) - rcv, _ := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, _ := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -193,7 +195,7 @@ func TestBridge(t *testing.T) { t.Run("Timeout.N.2", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { @@ -209,7 +211,7 @@ func TestBridge(t *testing.T) { t.Run("Timeout.N.3", func(t *testing.T) { k, brdg := mockit(3) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) // Note: in real apps receive loop is always go function go func() { diff --git a/kernel/cathode_test.go b/kernel/cathode_test.go index 1f16a76..0f3c54a 100644 --- a/kernel/cathode_test.go +++ b/kernel/cathode_test.go @@ -15,9 +15,11 @@ import ( "github.com/fogfish/it/v2" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" ) func TestDequeuer(t *testing.T) { + codec := encoding.NewCodecJson[string]() none := mockCathode(nil, nil) pass := mockCathode(make(chan string), []swarm.Bag{{Ctx: &swarm.Context{Category: "test", Digest: "1"}, Object: []byte(`"1"`)}}, @@ -40,14 +42,14 @@ func TestDequeuer(t *testing.T) { t.Run("Idle", func(t *testing.T) { k := NewDequeuer(none, swarm.Config{PollFrequency: 1 * time.Second}) - Dequeue(k, "test", swarm.NewCodecJson[string]()) + Dequeue(k, "test", codec) go k.Await() k.Close() }) t.Run("Dequeue.1", func(t *testing.T) { k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond}) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) go k.Await() ack <- <-rcv @@ -60,7 +62,7 @@ func TestDequeuer(t *testing.T) { t.Run("Backlog", func(t *testing.T) { k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond}) - rcv, ack := Dequeue(k, "test", swarm.NewCodecJson[string]()) + rcv, ack := Dequeue(k, "test", codec) go k.Await() ack <- <-rcv diff --git a/kernel/emitter_test.go b/kernel/emitter_test.go index 093ca12..29e1c28 100644 --- a/kernel/emitter_test.go +++ b/kernel/emitter_test.go @@ -16,9 +16,11 @@ import ( "github.com/fogfish/it/v2" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" ) func TestEnqueuer(t *testing.T) { + codec := encoding.NewCodecJson[string]() mockit := func(config swarm.Config) (*Enqueuer, *emitter) { mock := mockEmitter(10) k := NewEnqueuer(mock, config) @@ -42,13 +44,13 @@ func TestEnqueuer(t *testing.T) { t.Run("None", func(t *testing.T) { k, _ := mockit(swarm.Config{}) - Enqueue(k, "test", swarm.NewCodecJson[string]()) + Enqueue(k, "test", codec) k.Await() }) t.Run("Enqueue.1", func(t *testing.T) { k, e := mockit(swarm.Config{}) - snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, _ := Enqueue(k, "test", codec) snd <- "1" it.Then(t).Should( @@ -60,7 +62,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.1.Shut", func(t *testing.T) { k, e := mockit(swarm.Config{}) - snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, _ := Enqueue(k, "test", codec) snd <- "1" k.Await() @@ -73,7 +75,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.1.Error", func(t *testing.T) { err := make(chan error) k := NewEnqueuer(looser{}, swarm.Config{StdErr: err}) - snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, dlq := Enqueue(k, "test", codec) snd <- "1" it.Then(t).Should( @@ -100,7 +102,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.N", func(t *testing.T) { k, e := mockit(swarm.Config{}) - snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, _ := Enqueue(k, "test", codec) snd <- "1" it.Then(t).Should( @@ -122,7 +124,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.N.Shut", func(t *testing.T) { k, e := mockit(swarm.Config{}) - snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, _ := Enqueue(k, "test", codec) snd <- "1" snd <- "2" @@ -138,7 +140,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.N.Backlog", func(t *testing.T) { e := mockEmitter(10) k := NewEnqueuer(e, swarm.Config{CapOut: 4}) - snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, _ := Enqueue(k, "test", codec) snd <- "1" snd <- "2" @@ -154,7 +156,7 @@ func TestEnqueuer(t *testing.T) { t.Run("Enqueue.N.Error", func(t *testing.T) { err := make(chan error) k := NewEnqueuer(looser{}, swarm.Config{CapOut: 4, CapDLQ: 4, StdErr: err}) - snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd, dlq := Enqueue(k, "test", codec) snd <- "1" snd <- "2" diff --git a/codec.go b/kernel/encoding/codec.go similarity index 92% rename from codec.go rename to kernel/encoding/codec.go index 94a1fe5..cee0ea0 100644 --- a/codec.go +++ b/kernel/encoding/codec.go @@ -6,7 +6,7 @@ // https://github.com/fogfish/swarm // -package swarm +package encoding import ( "encoding/json" @@ -15,6 +15,7 @@ import ( "github.com/fogfish/curie" "github.com/fogfish/golem/optics" "github.com/fogfish/guid/v2" + "github.com/fogfish/swarm" ) //------------------------------------------------------------------------------ @@ -67,7 +68,7 @@ func NewCodecPacket() CodecPacket { return CodecPacket{} } //------------------------------------------------------------------------------ // Event codec for I/O kernel -type CodecEvent[T any, E EventKind[T]] struct { +type CodecEvent[T any, E swarm.EventKind[T]] struct { source string cat string shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time] @@ -95,7 +96,7 @@ func (c CodecEvent[T, E]) Decode(b []byte) (*E, error) { return x, err } -func NewCodecEvent[T any, E EventKind[T]](source, cat string) CodecEvent[T, E] { +func NewCodecEvent[T any, E swarm.EventKind[T]](source, cat string) CodecEvent[T, E] { return CodecEvent[T, E]{ source: source, cat: cat, diff --git a/kernel/router_test.go b/kernel/router_test.go index b2745a8..59242c3 100644 --- a/kernel/router_test.go +++ b/kernel/router_test.go @@ -14,12 +14,13 @@ import ( "github.com/fogfish/it/v2" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" ) func TestRoute(t *testing.T) { r := router[string]{ ch: make(chan swarm.Msg[string], 1), - codec: swarm.NewCodecJson[string](), + codec: encoding.NewCodecJson[string](), } r.Route(context.Background(), swarm.Bag{Object: []byte(`"1"`)})