diff --git a/kernel/bridge_test.go b/kernel/bridge_test.go index 4d978a2..d29bb1c 100644 --- a/kernel/bridge_test.go +++ b/kernel/bridge_test.go @@ -35,12 +35,6 @@ func init() { const yield_before_close = 1 * time.Millisecond func TestBridge(t *testing.T) { - // Use-case - // 1. Recv - Ack - // 2. Recv - Timeout - // 3. Recv - Error - // 4. Recv batch - config := swarm.Config{PollFrequency: 0 * time.Millisecond} // @@ -68,6 +62,7 @@ func TestBridge(t *testing.T) { t.Run("None", func(t *testing.T) { k, _ := mockit(1) + Dequeue(k, "test", swarm.NewCodecJson[string]()) k.Await() }) diff --git a/kernel/emitter_test.go b/kernel/emitter_test.go index 44817ec..fe6fb58 100644 --- a/kernel/emitter_test.go +++ b/kernel/emitter_test.go @@ -10,6 +10,7 @@ package kernel import ( "context" + "fmt" "testing" "time" @@ -18,95 +19,150 @@ import ( ) func TestEnqueuer(t *testing.T) { - none := mockEmitter(0, nil) - pass := mockEmitter(10, make(chan []byte)) + // none := mockEmitter(0, nil) + // pass := mockEmitter(10, make(chan []byte)) - t.Run("None", func(t *testing.T) { - k := NewEnqueuer(none, swarm.Config{}) - k.Close() - }) + mockit := func(config swarm.Config) (*Enqueuer, *emitter) { + mock := mockEmitter(10) + k := NewEnqueuer(mock, config) + + go func() { + time.Sleep(yield_before_close) + k.Close() + }() - t.Run("Idle", func(t *testing.T) { - k := NewEnqueuer(none, swarm.Config{}) + return k, mock + } + + t.Run("None", func(t *testing.T) { + k, _ := mockit(swarm.Config{}) Enqueue(k, "test", swarm.NewCodecJson[string]()) - k.Close() + k.Await() }) t.Run("Enqueue.1", func(t *testing.T) { - k := NewEnqueuer(pass, swarm.Config{}) + k, e := mockit(swarm.Config{}) + snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-e.val, `"1"`), + ) + + k.Await() + }) + + t.Run("Enqueue.1.Shut", func(t *testing.T) { + k, e := mockit(swarm.Config{}) snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd <- "1" + k.Await() + + it.Then(t).Should( + it.Seq(e.seq).Equal(`"1"`), + ) + }) + + t.Run("Enqueue.1.Error", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(looser{}, swarm.Config{StdErr: err}) + snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd <- "1" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"1"`), + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("lost"), + ) + + k.Close() + }) + + t.Run("Enqueue.1.Codec", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(mockEmitter(10), swarm.Config{StdErr: err}) + snd, dlq := Enqueue(k, "test", looser{}) + + snd <- "1" + it.Then(t).Should( + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("invalid"), ) k.Close() }) t.Run("Enqueue.N", func(t *testing.T) { - k := NewEnqueuer(pass, swarm.Config{}) + k, e := mockit(swarm.Config{}) snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) snd <- "1" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"1"`), + it.Equal(<-e.val, `"1"`), ) snd <- "2" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"2"`), + it.Equal(<-e.val, `"2"`), ) snd <- "3" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"3"`), - ) - - snd <- "4" - it.Then(t).Should( - it.Equal(string(<-pass.ch), `"4"`), + it.Equal(<-e.val, `"3"`), ) - k.Close() + k.Await() }) - t.Run("Backlog", func(t *testing.T) { - k := NewEnqueuer(pass, swarm.Config{CapOut: 4}) + t.Run("Enqueue.N.Shut", func(t *testing.T) { + k, e := mockit(swarm.Config{}) snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) + snd <- "1" snd <- "2" snd <- "3" - snd <- "4" - go k.Close() + + k.Await() it.Then(t).Should( - it.Equal(string(<-pass.ch), `"1"`), - it.Equal(string(<-pass.ch), `"2"`), - it.Equal(string(<-pass.ch), `"3"`), - it.Equal(string(<-pass.ch), `"4"`), + it.Seq(e.seq).Equal(`"1"`, `"2"`, `"3"`), ) }) - t.Run("Queues.N", func(t *testing.T) { - k := NewEnqueuer(pass, swarm.Config{}) - a, _ := Enqueue(k, "a", swarm.NewCodecJson[string]()) - b, _ := Enqueue(k, "b", swarm.NewCodecJson[string]()) - c, _ := Enqueue(k, "c", swarm.NewCodecJson[string]()) + t.Run("Enqueue.N.Backlog", func(t *testing.T) { + e := mockEmitter(10) + k := NewEnqueuer(e, swarm.Config{CapOut: 4}) + snd, _ := Enqueue(k, "test", swarm.NewCodecJson[string]()) - a <- "a" - it.Then(t).Should( - it.Equal(string(<-pass.ch), `"a"`), - ) + snd <- "1" + snd <- "2" + snd <- "3" + + k.Close() - b <- "b" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"b"`), + it.Seq(e.seq).Equal(`"1"`, `"2"`, `"3"`), ) + }) + + t.Run("Enqueue.N.Error", func(t *testing.T) { + err := make(chan error) + k := NewEnqueuer(looser{}, swarm.Config{CapOut: 4, CapDLQ: 4, StdErr: err}) + snd, dlq := Enqueue(k, "test", swarm.NewCodecJson[string]()) + + snd <- "1" + snd <- "2" + snd <- "3" - c <- "c" it.Then(t).Should( - it.Equal(string(<-pass.ch), `"c"`), + it.Equal(<-dlq, "1"), + it.Fail(func() error { return <-err }).Contain("lost"), + + it.Equal(<-dlq, "2"), + it.Fail(func() error { return <-err }).Contain("lost"), + + it.Equal(<-dlq, "3"), + it.Fail(func() error { return <-err }).Contain("lost"), ) k.Close() @@ -116,19 +172,33 @@ func TestEnqueuer(t *testing.T) { //------------------------------------------------------------------------------ type emitter struct { - ms int - ch chan []byte + wait int + seq []string + val chan string } -func mockEmitter(ms int, ch chan []byte) emitter { - return emitter{ - ms: ms, - ch: ch, +func mockEmitter(wait int) *emitter { + return &emitter{ + wait: wait, + seq: make([]string, 0), + val: make(chan string, 1000), } } -func (e emitter) Enq(ctx context.Context, bag swarm.Bag) error { - time.Sleep(time.Duration(e.ms) * time.Microsecond) - e.ch <- bag.Object +func (e *emitter) Enq(ctx context.Context, bag swarm.Bag) error { + time.Sleep(time.Duration(e.wait) * time.Microsecond) + e.seq = append(e.seq, string(bag.Object)) + + e.val <- string(bag.Object) return nil } + +type looser struct{} + +func (e looser) Enq(ctx context.Context, bag swarm.Bag) error { + return fmt.Errorf("lost") +} + +func (e looser) Encode(x string) ([]byte, error) { + return nil, fmt.Errorf("invalid") +}