diff --git a/README.md b/README.md index d26cec0..e06e52a 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,8 @@ user <- &User{ID: "C", Text: "some text by A"} The library guarantees following clauses `A before C` and `C after A` because both messages are produced to single channel `user`. It do not guarantee clauses `A before B`, `B before C` or `C after B` because multiple channels are used. +The library does not provide any higher guarantee than underlying message broker. For example, using SQS would not guarantee any ordering while SQS FIFO makes sure that messages of same type is ordered. + ### Octet Streams diff --git a/broker/sqs/sqs.go b/broker/sqs/sqs.go index 8d58905..b74c3f6 100644 --- a/broker/sqs/sqs.go +++ b/broker/sqs/sqs.go @@ -10,6 +10,7 @@ package sqs import ( "context" + "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -22,6 +23,7 @@ type client struct { service SQS config *swarm.Config queue *string + isFIFO bool } func newClient(queue string, config *swarm.Config) (*client, error) { @@ -46,6 +48,7 @@ func newClient(queue string, config *swarm.Config) (*client, error) { service: api, config: config, queue: spec.QueueUrl, + isFIFO: strings.HasSuffix(queue, ".fifo"), }, nil } @@ -70,14 +73,20 @@ func (cli *client) Enq(bag swarm.Bag) error { ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) defer cancel() + var idMsgGroup *string + if cli.isFIFO { + idMsgGroup = aws.String(bag.Category) + } + _, err := cli.service.SendMessage(ctx, &sqs.SendMessageInput{ MessageAttributes: map[string]types.MessageAttributeValue{ "Source": {StringValue: aws.String(cli.config.Source), DataType: aws.String("String")}, "Category": {StringValue: aws.String(bag.Category), DataType: aws.String("String")}, }, - MessageBody: aws.String(string(bag.Object)), - QueueUrl: cli.queue, + MessageGroupId: idMsgGroup, + MessageBody: aws.String(string(bag.Object)), + QueueUrl: cli.queue, }, ) return err diff --git a/broker/sqs/sqs_test.go b/broker/sqs/sqs_test.go index 3296929..e88ece0 100644 --- a/broker/sqs/sqs_test.go +++ b/broker/sqs/sqs_test.go @@ -35,9 +35,7 @@ func TestDequeueSQS(t *testing.T) { qtest.TestDequeueEvent(t, newMockDequeue) } -// // Mock AWS SQS Enqueue -// type mockEnqueue struct { sut.SQS expectCategory string @@ -75,9 +73,7 @@ func (m *mockEnqueue) SendMessage(ctx context.Context, req *sqs.SendMessageInput return &sqs.SendMessageOutput{}, nil } -// // Mock AWS SQS Dequeue -// type mockDequeue struct { sut.SQS returnCategory string diff --git a/examples/events/dequeue/events.go b/examples/events/dequeue/events.go index d38b2cd..9053faa 100644 --- a/examples/events/dequeue/events.go +++ b/examples/events/dequeue/events.go @@ -18,7 +18,6 @@ import ( "github.com/fogfish/swarm/queue/events" ) -// // Date type (object) affected by events type User struct { ID string `json:"id"` @@ -30,7 +29,6 @@ type Note struct { Text string `json:"text"` } -// // Events type EventCreateUser swarm.Event[*User] @@ -47,13 +45,18 @@ type EventRemoveUser swarm.Event[*User] func (EventRemoveUser) HKT1(swarm.EventType) {} func (EventRemoveUser) HKT2(*User) {} +type EventNote swarm.Event[*Note] + +func (EventNote) HKT1(swarm.EventType) {} +func (EventNote) HKT2(*Note) {} + func main() { q := queue.Must(sqs.New("swarm-test")) go create(events.Dequeue[*User, EventCreateUser](q)) go update(events.Dequeue[*User, EventUpdateUser](q)) go remove(events.Dequeue[*User, EventRemoveUser](q)) - go common(events.Dequeue[*Note, swarm.Event[*Note]](q)) + go common(events.Dequeue[*Note, EventNote](q)) q.Await() } @@ -82,7 +85,7 @@ func remove(rcv <-chan *EventRemoveUser, ack chan<- *EventRemoveUser) { } } -func common(rcv <-chan *swarm.Event[*Note], ack chan<- *swarm.Event[*Note]) { +func common(rcv <-chan *EventNote, ack chan<- *EventNote) { for msg := range rcv { prefix := "" switch string(msg.Type) { diff --git a/examples/events/enqueue/events.go b/examples/events/enqueue/events.go index 8a82606..d2dcc8a 100644 --- a/examples/events/enqueue/events.go +++ b/examples/events/enqueue/events.go @@ -26,7 +26,6 @@ type Note struct { Text string `json:"text"` } -// Events type EventCreateUser swarm.Event[*User] func (EventCreateUser) HKT1(swarm.EventType) {} @@ -42,13 +41,18 @@ type EventRemoveUser swarm.Event[*User] func (EventRemoveUser) HKT1(swarm.EventType) {} func (EventRemoveUser) HKT2(*User) {} +type EventNote swarm.Event[*Note] + +func (EventNote) HKT1(swarm.EventType) {} +func (EventNote) HKT2(*Note) {} + func main() { q := queue.Must(sqs.New("swarm-test")) userCreated, _ := events.Enqueue[*User, EventCreateUser](q) userUpdated, _ := events.Enqueue[*User, EventUpdateUser](q) userRemoved, _ := events.Enqueue[*User, EventRemoveUser](q) - note, _ := events.Enqueue[*Note, swarm.Event[*Note]](q) + note, _ := events.Enqueue[*Note, EventNote](q) // // Multiple channels emits events @@ -72,21 +76,21 @@ func main() { // // Single channel emits event - note <- &swarm.Event[*Note]{ + note <- &EventNote{ Type: "note:EventCreateNote", Agent: "example", Participant: "user", Object: &Note{ID: "note", Text: "some text"}, } - note <- &swarm.Event[*Note]{ + note <- &EventNote{ Type: "note:EventUpdateNote", Agent: "example", Participant: "user", Object: &Note{ID: "note", Text: "some text with changes"}, } - note <- &swarm.Event[*Note]{ + note <- &EventNote{ Type: "note:EventRemoveNote", Agent: "example", Participant: "user", diff --git a/go.mod b/go.mod index db20b74..63bf0c2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/fogfish/swarm -go 1.18 +go 1.21 require ( github.com/aws/aws-cdk-go/awscdk/v2 v2.106.1 diff --git a/go.sum b/go.sum index 8c8ee18..354f891 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,7 @@ github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2/go.mod h1:CvFHBo github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1 h1:MBBQNKKPJ5GArbctgwpiCy7KmwGjHDjUUH5wEzwIq8w= github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1/go.mod h1:/2WiXEft9s8ViJjD01CJqDuyJ8HXBjhBLtK5OvJfdSc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fogfish/curie v1.8.2 h1:+4CezyjZ5uszSXUZAV27gfKwv58w3lKTH0JbQwh3S9A= @@ -64,13 +65,16 @@ github.com/fogfish/it/v2 v2.0.1/go.mod h1:h5FdKaEQT4sUEykiVkB8VV4jX27XabFVeWhoDZ github.com/fogfish/scud v0.6.0 h1:sJsWAvvRcX4kRYYUXbOTw9hyZV+ax01TxpXlHKeTJGg= github.com/fogfish/scud v0.6.0/go.mod h1:7EH9GAGQK4oux9sTMhtSEfEVbism2ED+2gTb/UNFqvs= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.5.3 h1:3HUJmBFbQW9fhQOzMgseU134xfi6hU+mjWywx5Ty+/M= github.com/yuin/goldmark v1.5.3/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -84,6 +88,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -96,3 +101,4 @@ golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/qtest/qtest.go b/internal/qtest/qtest.go index b66631f..685bf4e 100644 --- a/internal/qtest/qtest.go +++ b/internal/qtest/qtest.go @@ -27,7 +27,7 @@ const ( Event = "" Receipt = "0x123456789abcdef" - EventCategory = "note:Event[*github.com/fogfish/swarm/internal/qtest.Note]" + EventCategory = "Event[Note]" ) var ( @@ -140,7 +140,7 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) { it.Then(t). Should(it.Nil(err)). Should(it.Equal(*val.Object, Note{Some: "message"})). - Should(it.Equal(val.Type, "note:Event[*github.com/fogfish/swarm/internal/qtest.Note]")). + Should(it.Equal(val.Type, "note:Event[Note]")). ShouldNot(it.Equal(len(val.ID), 0)). ShouldNot(it.True(val.Created.IsZero())) diff --git a/queue/dequeue.go b/queue/dequeue.go index 87ba3f6..fae575d 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -24,7 +24,7 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c conf := q.Config() ch := swarm.NewMsgDeqCh[T](conf.DequeueCapacity) - cat := typeOf[T]() + cat := categoryOf[T]() if len(category) > 0 { cat = category[0] } diff --git a/queue/enqueue.go b/queue/enqueue.go index 0921440..a1e1392 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -11,6 +11,7 @@ package queue import ( "encoding/json" "reflect" + "strings" "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" @@ -26,7 +27,7 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { conf := q.Config() ch := swarm.NewMsgEnqCh[T](conf.EnqueueCapacity) - cat := typeOf[T]() + cat := categoryOf[T]() if len(category) > 0 { cat = category[0] } @@ -59,19 +60,22 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { return ch.Msg, ch.Err } -func typeOf[T any]() string { - // - // TODO: fix - // Action[*swarm.User] if container type is used - // - - typ := reflect.TypeOf(*new(T)) +// normalized type name +func categoryOf[T any]() string { + typ := reflect.TypeOf(new(T)).Elem() cat := typ.Name() if typ.Kind() == reflect.Ptr { cat = typ.Elem().Name() } - return cat + seq := strings.Split(strings.Trim(cat, "]"), "[") + tkn := make([]string, len(seq)) + for i, s := range seq { + r := strings.Split(s, ".") + tkn[i] = r[len(r)-1] + } + + return strings.Join(tkn, "[") + strings.Repeat("]", len(tkn)-1) } func Must(broker swarm.Broker, err error) swarm.Broker { diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index 20ddf52..2354214 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -10,7 +10,6 @@ package events import ( "encoding/json" - "strings" "github.com/fogfish/golem/optics" "github.com/fogfish/swarm" @@ -24,21 +23,21 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< conf := q.Config() ch := swarm.NewEvtDeqCh[T, E](conf.DequeueCapacity) - cat := strings.ToLower(typeOf[T]()) + ":" + typeOf[E]() + catE := categoryOf[E]() if len(category) > 0 { - cat = category[0] + catE = category[0] } shape := optics.ForShape2[E, string, error]("Digest", "Err") - sock := q.Dequeue(cat, &ch) + sock := q.Dequeue(catE, &ch) pipe.ForEach(ch.Ack, func(object *E) { digest, fail := shape.Get(object) err := conf.Backoff.Retry(func() error { return sock.Ack(swarm.Bag{ - Category: cat, + Category: catE, Digest: digest, Err: fail, }) @@ -51,7 +50,7 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*E, error) { var bag swarm.Bag err := conf.Backoff.Retry(func() (err error) { - bag, err = sock.Deq(cat) + bag, err = sock.Deq(catE) return }) if err != nil { diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 4cba6df..9b8a7a5 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -28,14 +28,15 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c conf := q.Config() ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity) - cat := strings.ToLower(typeOf[T]()) + ":" + typeOf[E]() + catT := strings.ToLower(categoryOf[T]()) + catE := categoryOf[E]() if len(category) > 0 { - cat = category[0] + catE = category[0] } shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created") - sock := q.Enqueue(cat, &ch) + sock := q.Enqueue(catE, &ch) pipe.ForEach(ch.Msg, func(object *E) { ch.Busy.Lock() @@ -43,7 +44,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c _, knd, src, _ := shape.Get(object) if knd == "" { - knd = curie.IRI(cat) + knd = curie.IRI(catT + ":" + catE) } if src == "" { @@ -62,7 +63,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c return } - bag := swarm.Bag{Category: cat, Object: msg} + bag := swarm.Bag{Category: catE, Object: msg} err = conf.Backoff.Retry(func() error { return sock.Enq(bag) }) if err != nil { ch.Err <- object @@ -75,17 +76,20 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c return ch.Msg, ch.Err } -func typeOf[T any]() string { - // - // TODO: fix - // Action[*swarm.User] if container type is used - // - - typ := reflect.TypeOf(*new(T)) +// normalized type name +func categoryOf[T any]() string { + typ := reflect.TypeOf(new(T)).Elem() cat := typ.Name() if typ.Kind() == reflect.Ptr { cat = typ.Elem().Name() } - return cat + seq := strings.Split(strings.Trim(cat, "]"), "[") + tkn := make([]string, len(seq)) + for i, s := range seq { + r := strings.Split(s, ".") + tkn[i] = r[len(r)-1] + } + + return strings.Join(tkn, "[") + strings.Repeat("]", len(tkn)-1) } diff --git a/queue/events/queue.go b/queue/events/queue.go index 9563ccc..bdfa0b7 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -10,10 +10,10 @@ package events import ( "encoding/json" - "strings" "time" - "unsafe" + "github.com/fogfish/curie" + "github.com/fogfish/golem/optics" "github.com/fogfish/guid" "github.com/fogfish/swarm" ) @@ -22,37 +22,28 @@ type Queue[T any, E swarm.EventKind[T]] interface { Enqueue(*E) error } -// type queue[T any, E swarm.EventKind[T]] struct { - cat string - conf swarm.Config - sock swarm.Enqueue - - offID, offType, offCreated uintptr + cat string + conf swarm.Config + sock swarm.Enqueue + shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time] } func (q queue[T, E]) Sync() {} func (q queue[T, E]) Close() {} func (q queue[T, E]) Enqueue(object *E) error { - evt := unsafe.Pointer(object) - - // patch ID - if len(*(*string)(unsafe.Pointer(uintptr(evt) + q.offID))) == 0 { - id := guid.G.K(guid.Clock).String() - *(*string)(unsafe.Pointer(uintptr(evt) + q.offID)) = id + _, knd, src, _ := q.shape.Get(object) + if knd == "" { + knd = curie.IRI(q.cat) } - // patch Type - if len(*(*string)(unsafe.Pointer(uintptr(evt) + q.offType))) == 0 { - *(*string)(unsafe.Pointer(uintptr(evt) + q.offType)) = q.cat + if src == "" { + src = curie.IRI(q.conf.Source) } - // patch Created - if len(*(*string)(unsafe.Pointer(uintptr(evt) + q.offCreated))) == 0 { - t := time.Now().Format(time.RFC3339) - *(*string)(unsafe.Pointer(uintptr(evt) + q.offCreated)) = t - } + // TODO: migrate to v2 + q.shape.Put(object, guid.G.K(guid.Clock).String(), knd, src, time.Now()) msg, err := json.Marshal(object) if err != nil { @@ -68,29 +59,20 @@ func (q queue[T, E]) Enqueue(object *E) error { return nil } -// func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Queue[T, E] { - cat := strings.ToLower(typeOf[T]()) + ":" + typeOf[E]() + catE := categoryOf[E]() if len(category) > 0 { - cat = category[0] + catE = category[0] } - // - // building memory layout to make unsafe struct reading - kindT := swarm.Event[T]{} - offID, offType, offCreated := - unsafe.Offsetof(kindT.ID), - unsafe.Offsetof(kindT.Type), - unsafe.Offsetof(kindT.Created) + shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created") queue := &queue[T, E]{ - cat: cat, - conf: q.Config(), - offID: offID, - offType: offType, - offCreated: offCreated, + cat: catE, + conf: q.Config(), + shape: shape, } - queue.sock = q.Enqueue(cat, queue) + queue.sock = q.Enqueue(catE, queue) return queue } diff --git a/queue/queue.go b/queue/queue.go index 67cb0d5..3f47d92 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -18,7 +18,6 @@ type Queue[T any] interface { Enqueue(T) error } -// type queue[T any] struct { cat string conf swarm.Config @@ -43,9 +42,8 @@ func (q queue[T]) Enqueue(object T) error { return nil } -// func New[T any](q swarm.Broker, category ...string) Queue[T] { - cat := typeOf[T]() + cat := categoryOf[T]() if len(category) > 0 { cat = category[0] }