Skip to content

Commit

Permalink
dequeue api for applications
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 28, 2024
1 parent 4f1f7d5 commit 94a8af0
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
41 changes: 41 additions & 0 deletions dequeue/dequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package dequeue

import (
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel"
"github.com/fogfish/swarm/kernel/encoding"
)

// Creates pair of channels to receive and acknowledge messages of type T
func Type[T any](q *kernel.Dequeuer, category ...string) (rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) {
return kernel.Dequeue(q,
swarm.TypeOf[T](category...),
encoding.NewCodecJson[T](),
)
}

// Creates pair of channels to receive and acknowledge events of type T
func Event[T any, E swarm.EventKind[T]](q *kernel.Dequeuer, category ...string) (<-chan swarm.Msg[*E], chan<- swarm.Msg[*E]) {
cat := swarm.TypeOf[E](category...)

return kernel.Dequeue(q, cat,
encoding.NewCodecEvent[T, E](q.Config.Source, cat),
)
}

// Create pair of channels to receive and acknowledge pure binary
func Bytes(q *kernel.Dequeuer, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) {
if q.Config.Codec != nil {

Check failure on line 36 in dequeue/dequeue.go

View workflow job for this annotation

GitHub Actions / unit (.)

q.Config.Codec undefined (type swarm.Config has no field or method Codec)
return kernel.Dequeue(q, cat, q.Config.Codec)

Check failure on line 37 in dequeue/dequeue.go

View workflow job for this annotation

GitHub Actions / unit (.)

q.Config.Codec undefined (type swarm.Config has no field or method Codec)
}

return kernel.Dequeue(q, cat, encoding.NewCodecByte())
}
104 changes: 104 additions & 0 deletions dequeue/dequeue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package dequeue_test

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/dequeue"
"github.com/fogfish/swarm/kernel"
)

// controls yield time before kernel is closed
const yield_before_close = 5 * time.Millisecond

type User struct {
ID string `json:"id"`
Text string `json:"text"`
}

func TestDequeueType(t *testing.T) {
user := User{ID: "id", Text: "user"}

k := kernel.NewDequeuer(mockCathode(user), swarm.Config{})
go func() {
time.Sleep(yield_before_close)
k.Close()
}()

var msg swarm.Msg[User]
rcv, ack := dequeue.Type[User](k)

go func() {
msg = <-rcv
ack <- msg
}()
k.Await()

it.Then(t).Should(
it.Equal(msg.Ctx.Category, "User"),
it.Equal(msg.Ctx.Digest, "1"),
it.Equal(msg.Object.ID, "id"),
it.Equal(msg.Object.Text, "user"),
)
}

func TestDequeueBytes(t *testing.T) {
user := User{ID: "id", Text: "user"}

k := kernel.NewDequeuer(mockCathode(user), swarm.Config{})
go func() {
time.Sleep(yield_before_close)
k.Close()
}()

var msg swarm.Msg[[]byte]
rcv, ack := dequeue.Bytes(k, "User")

go func() {
msg = <-rcv
ack <- msg
}()
k.Await()

it.Then(t).Should(
it.Equal(msg.Ctx.Category, "User"),
it.Equal(msg.Ctx.Digest, "1"),
it.Equal(string(msg.Object), `{"id":"id","text":"user"}`),
)
}

//------------------------------------------------------------------------------

type cathode struct {
cat string
user User
}

func mockCathode(user User) cathode {
return cathode{
cat: swarm.TypeOf[User](),
user: user,
}
}

func (c cathode) Ack(ctx context.Context, digest string) error {
return nil
}

func (c cathode) Err(ctx context.Context, digest string, err error) error {
return nil
}

func (c cathode) Ask(context.Context) ([]swarm.Bag, error) {
data, err := json.Marshal(c.user)
if err != nil {
return nil, err
}

bag := []swarm.Bag{{Ctx: &swarm.Context{Category: c.cat, Digest: "1"}, Object: data}}
return bag, nil
}

0 comments on commit 94a8af0

Please sign in to comment.