diff --git a/broker/sqs/config.go b/broker/sqs/config.go new file mode 100644 index 0000000..913f5e7 --- /dev/null +++ b/broker/sqs/config.go @@ -0,0 +1,42 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package sqs + +import ( + "github.com/fogfish/swarm" +) + +type Option func(*Client) + +var defs = []Option{WithConfig()} + +func WithConfig(opts ...swarm.Option) Option { + return func(c *Client) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + c.batchSize = 1 + + c.config = config + } +} + +func WithService(service SQS) Option { + return func(c *Client) { + c.service = service + } +} + +func WithBatchSize(batch int) Option { + return func(c *Client) { + c.batchSize = batch + } +} diff --git a/broker/sqs/examples/bytes/dequeue/bytes.go b/broker/sqs/examples/dequeue/bytes/sqs.go similarity index 60% rename from broker/sqs/examples/bytes/dequeue/bytes.go rename to broker/sqs/examples/dequeue/bytes/sqs.go index 8ccf4c6..38e87a5 100644 --- a/broker/sqs/examples/bytes/dequeue/bytes.go +++ b/broker/sqs/examples/dequeue/bytes/sqs.go @@ -13,16 +13,23 @@ import ( "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/bytes" + "github.com/fogfish/swarm/dequeue" ) func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) + q, err := sqs.NewDequeuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs reader has failed", "err", err) + return + } - go actor("user").handle(bytes.Dequeue(q, "User")) - go actor("note").handle(bytes.Dequeue(q, "Note")) - go actor("like").handle(bytes.Dequeue(q, "Like")) + go actor("user").handle(dequeue.Bytes(q, "User")) + go actor("note").handle(dequeue.Bytes(q, "Note")) + go actor("like").handle(dequeue.Bytes(q, "Like")) q.Await() } diff --git a/broker/sqs/examples/dequeue/event/sqs.go b/broker/sqs/examples/dequeue/event/sqs.go new file mode 100644 index 0000000..56016bb --- /dev/null +++ b/broker/sqs/examples/dequeue/event/sqs.go @@ -0,0 +1,105 @@ +// +// Copyright (C) 2021 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "encoding/json" + "fmt" + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/sqs" + "github.com/fogfish/swarm/dequeue" +) + +// Date type (object) affected by events +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type CreatedUser User + +type EventCreateUser = swarm.Event[swarm.Meta, CreatedUser] + +type UpdatedUser User + +type EventUpdateUser = swarm.Event[swarm.Meta, UpdatedUser] + +type RemovedUser User + +type EventRemoveUser = swarm.Event[swarm.Meta, RemovedUser] + +type Note struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type EventNote = swarm.Event[swarm.Meta, Note] + +func main() { + q, err := sqs.NewDequeuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs reader has failed", "err", err) + return + } + + go create(dequeue.Event[swarm.Meta, CreatedUser](q)) + go update(dequeue.Event[swarm.Meta, UpdatedUser](q)) + go remove(dequeue.Event[swarm.Meta, RemovedUser](q)) + go common(dequeue.Event[swarm.Meta, Note](q)) + + q.Await() +} + +func create(rcv <-chan swarm.Msg[EventCreateUser], ack chan<- swarm.Msg[EventCreateUser]) { + for msg := range rcv { + v, _ := json.MarshalIndent(msg, "+ |", " ") + fmt.Printf("create user > \n %s\n", v) + ack <- msg + } +} + +func update(rcv <-chan swarm.Msg[EventUpdateUser], ack chan<- swarm.Msg[EventUpdateUser]) { + for msg := range rcv { + v, _ := json.MarshalIndent(msg, "~ |", " ") + fmt.Printf("update user > \n %s\n", v) + ack <- msg + } +} + +func remove(rcv <-chan swarm.Msg[EventRemoveUser], ack chan<- swarm.Msg[EventRemoveUser]) { + for msg := range rcv { + v, _ := json.MarshalIndent(msg, "- |", " ") + fmt.Printf("remove user > \n %s\n", v) + ack <- msg + } +} + +func common(rcv <-chan swarm.Msg[EventNote], ack chan<- swarm.Msg[EventNote]) { + for msg := range rcv { + prefix := "" + switch string(msg.Object.Meta.Type) { + case "note:EventCreateNote": + prefix = "+ |" + case "note:EventUpdateNote": + prefix = "~ |" + case "note:EventRemoveNote": + prefix = "- |" + } + + v, _ := json.MarshalIndent(msg, prefix, " ") + fmt.Printf("common note > \n %s\n", v) + ack <- msg + } +} diff --git a/broker/sqs/examples/dequeue/sqs.go b/broker/sqs/examples/dequeue/typed/sqs.go similarity index 63% rename from broker/sqs/examples/dequeue/sqs.go rename to broker/sqs/examples/dequeue/typed/sqs.go index 5e4947e..5b1d2b6 100644 --- a/broker/sqs/examples/dequeue/sqs.go +++ b/broker/sqs/examples/dequeue/typed/sqs.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -13,7 +13,7 @@ import ( "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/dequeue" ) type User struct { @@ -32,11 +32,19 @@ type Like struct { } func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) + q, err := sqs.NewDequeuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs reader has failed", "err", err) + return + } - go actor[User]("user").handle(queue.Dequeue[User](q)) - go actor[Note]("note").handle(queue.Dequeue[Note](q)) - go actor[Like]("like").handle(queue.Dequeue[Like](q)) + go actor[User]("user").handle(dequeue.Typed[User](q)) + go actor[Note]("note").handle(dequeue.Typed[Note](q)) + go actor[Like]("like").handle(dequeue.Typed[Like](q)) q.Await() } diff --git a/broker/sqs/examples/bytes/enqueue/bytes.go b/broker/sqs/examples/enqueue/bytes/sqs.go similarity index 54% rename from broker/sqs/examples/bytes/enqueue/bytes.go rename to broker/sqs/examples/enqueue/bytes/sqs.go index 7ad54f7..c6c2a38 100644 --- a/broker/sqs/examples/bytes/enqueue/bytes.go +++ b/broker/sqs/examples/enqueue/bytes/sqs.go @@ -9,18 +9,27 @@ package main import ( + "log/slog" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/bytes" + "github.com/fogfish/swarm/enqueue" ) func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - - user := swarm.LogDeadLetters(bytes.Enqueue(q, "User")) - note := swarm.LogDeadLetters(bytes.Enqueue(q, "Note")) - like := swarm.LogDeadLetters(bytes.Enqueue(q, "Like")) + q, err := sqs.NewEnqueuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs writer has failed", "err", err) + return + } + + user := swarm.LogDeadLetters(enqueue.Bytes(q, "User")) + note := swarm.LogDeadLetters(enqueue.Bytes(q, "Note")) + like := swarm.LogDeadLetters(enqueue.Bytes(q, "Like")) user <- []byte("user|some text by user") diff --git a/broker/sqs/examples/enqueue/event/sqs.go b/broker/sqs/examples/enqueue/event/sqs.go new file mode 100644 index 0000000..3bcf23f --- /dev/null +++ b/broker/sqs/examples/enqueue/event/sqs.go @@ -0,0 +1,116 @@ +// +// Copyright (C) 2021 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/sqs" + "github.com/fogfish/swarm/enqueue" +) + +// Date type (object) affected by events +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type CreatedUser User + +type EventCreateUser = swarm.Event[swarm.Meta, CreatedUser] + +type UpdatedUser User + +type EventUpdateUser = swarm.Event[swarm.Meta, UpdatedUser] + +type RemovedUser User + +type EventRemoveUser = swarm.Event[swarm.Meta, RemovedUser] + +type Note struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type EventNote = swarm.Event[swarm.Meta, Note] + +func main() { + q, err := sqs.NewEnqueuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs writer has failed", "err", err) + return + } + + userCreated := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, CreatedUser](q)) + userUpdated := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, UpdatedUser](q)) + userRemoved := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, RemovedUser](q)) + note := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, Note](q)) + + // + // Multiple channels emits events + userCreated <- EventCreateUser{ + Meta: &swarm.Meta{ + Agent: "example", + Participant: "user", + }, + Data: &CreatedUser{ID: "user", Text: "some text"}, + } + + userUpdated <- EventUpdateUser{ + Meta: &swarm.Meta{ + Agent: "example", + Participant: "user", + }, + Data: &UpdatedUser{ID: "user", Text: "some text with changes"}, + } + + userRemoved <- swarm.Event[swarm.Meta, RemovedUser]{ + Meta: &swarm.Meta{ + Agent: "example", + Participant: "user", + }, + Data: &RemovedUser{ID: "user"}, + } + + // + // Single channel emits event + note <- EventNote{ + Meta: &swarm.Meta{ + Type: "note:EventCreateNote", + Agent: "example", + Participant: "user", + }, + Data: &Note{ID: "note", Text: "some text"}, + } + + note <- EventNote{ + Meta: &swarm.Meta{ + Type: "note:EventUpdateNote", + Agent: "example", + Participant: "user", + }, + Data: &Note{ID: "note", Text: "some text with changes"}, + } + + note <- EventNote{ + Meta: &swarm.Meta{ + Type: "note:EventRemoveNote", + Agent: "example", + Participant: "user", + }, + Data: &Note{ID: "note"}, + } + + q.Close() +} diff --git a/broker/sqs/examples/enqueue/sqs.go b/broker/sqs/examples/enqueue/typed/sqs.go similarity index 65% rename from broker/sqs/examples/enqueue/sqs.go rename to broker/sqs/examples/enqueue/typed/sqs.go index 39c2189..9f264a2 100644 --- a/broker/sqs/examples/enqueue/sqs.go +++ b/broker/sqs/examples/enqueue/typed/sqs.go @@ -9,9 +9,11 @@ package main import ( + "log/slog" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/enqueue" ) type User struct { @@ -30,11 +32,19 @@ type Like struct { } func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - - user := swarm.LogDeadLetters(queue.Enqueue[*User](q)) - note := swarm.LogDeadLetters(queue.Enqueue[*Note](q)) - like := swarm.LogDeadLetters(queue.Enqueue[*Like](q)) + q, err := sqs.NewEnqueuer("swarm-test", + sqs.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("sqs writer has failed", "err", err) + return + } + + user := swarm.LogDeadLetters(enqueue.Typed[*User](q)) + note := swarm.LogDeadLetters(enqueue.Typed[*Note](q)) + like := swarm.LogDeadLetters(enqueue.Typed[*Like](q)) user <- &User{ID: "user", Text: "some text by user"} diff --git a/broker/sqs/examples/events/dequeue/events.go b/broker/sqs/examples/events/dequeue/events.go deleted file mode 100644 index edead12..0000000 --- a/broker/sqs/examples/events/dequeue/events.go +++ /dev/null @@ -1,104 +0,0 @@ -// -// Copyright (C) 2021 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package main - -import ( - "encoding/json" - "fmt" - - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/events" -) - -// Date type (object) affected by events -type User struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Note struct { - ID string `json:"id"` - Text string `json:"text"` -} - -// Events -type EventCreateUser swarm.Event[*User] - -func (EventCreateUser) HKT1(swarm.EventType) {} -func (EventCreateUser) HKT2(*User) {} - -type EventUpdateUser swarm.Event[*User] - -func (EventUpdateUser) HKT1(swarm.EventType) {} -func (EventUpdateUser) HKT2(*User) {} - -type EventRemoveUser swarm.Event[*User] - -func (EventRemoveUser) HKT1(swarm.EventType) {} -func (EventRemoveUser) HKT2(*User) {} - -type EventNote swarm.Event[*Note] - -func (EventNote) HKT1(swarm.EventType) {} -func (EventNote) HKT2(*Note) {} - -func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - - go create(events.Dequeue[*User, EventCreateUser](q)) - go update(events.Dequeue[*User, EventUpdateUser](q)) - go remove(events.Dequeue[*User, EventRemoveUser](q)) - go common(events.Dequeue[*Note, EventNote](q)) - - q.Await() -} - -func create(rcv <-chan swarm.Msg[*EventCreateUser], ack chan<- swarm.Msg[*EventCreateUser]) { - for msg := range rcv { - v, _ := json.MarshalIndent(msg, "+ |", " ") - fmt.Printf("create user > \n %s\n", v) - ack <- msg - } -} - -func update(rcv <-chan swarm.Msg[*EventUpdateUser], ack chan<- swarm.Msg[*EventUpdateUser]) { - for msg := range rcv { - v, _ := json.MarshalIndent(msg, "~ |", " ") - fmt.Printf("update user > \n %s\n", v) - ack <- msg - } -} - -func remove(rcv <-chan swarm.Msg[*EventRemoveUser], ack chan<- swarm.Msg[*EventRemoveUser]) { - for msg := range rcv { - v, _ := json.MarshalIndent(msg, "- |", " ") - fmt.Printf("remove user > \n %s\n", v) - ack <- msg - } -} - -func common(rcv <-chan swarm.Msg[*EventNote], ack chan<- swarm.Msg[*EventNote]) { - for msg := range rcv { - prefix := "" - switch string(msg.Object.Type) { - case "note:EventCreateNote": - prefix = "+ |" - case "note:EventUpdateNote": - prefix = "~ |" - case "note:EventRemoveNote": - prefix = "- |" - } - - v, _ := json.MarshalIndent(msg, prefix, " ") - fmt.Printf("common note > \n %s\n", v) - ack <- msg - } -} diff --git a/broker/sqs/examples/events/enqueue/events.go b/broker/sqs/examples/events/enqueue/events.go deleted file mode 100644 index b5ff66c..0000000 --- a/broker/sqs/examples/events/enqueue/events.go +++ /dev/null @@ -1,101 +0,0 @@ -// -// Copyright (C) 2021 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package main - -import ( - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/events" -) - -// Date type (object) affected by events -type User struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Note struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type EventCreateUser swarm.Event[*User] - -func (EventCreateUser) HKT1(swarm.EventType) {} -func (EventCreateUser) HKT2(*User) {} - -type EventUpdateUser swarm.Event[*User] - -func (EventUpdateUser) HKT1(swarm.EventType) {} -func (EventUpdateUser) HKT2(*User) {} - -type EventRemoveUser swarm.Event[*User] - -func (EventRemoveUser) HKT1(swarm.EventType) {} -func (EventRemoveUser) HKT2(*User) {} - -type EventNote swarm.Event[*Note] - -func (EventNote) HKT1(swarm.EventType) {} -func (EventNote) HKT2(*Note) {} - -func main() { - q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - - userCreated := swarm.LogDeadLetters(events.Enqueue[*User, EventCreateUser](q)) - userUpdated := swarm.LogDeadLetters(events.Enqueue[*User, EventUpdateUser](q)) - userRemoved := swarm.LogDeadLetters(events.Enqueue[*User, EventRemoveUser](q)) - note := swarm.LogDeadLetters(events.Enqueue[*Note, EventNote](q)) - - // - // Multiple channels emits events - userCreated <- &EventCreateUser{ - Agent: "example", - Participant: "user", - Object: &User{ID: "user", Text: "some text"}, - } - - userUpdated <- &EventUpdateUser{ - Agent: "example", - Participant: "user", - Object: &User{ID: "user", Text: "some text with changes"}, - } - - userRemoved <- &EventRemoveUser{ - Agent: "example", - Participant: "user", - Object: &User{ID: "note"}, - } - - // - // Single channel emits event - note <- &EventNote{ - Type: "note:EventCreateNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note", Text: "some text"}, - } - - note <- &EventNote{ - Type: "note:EventUpdateNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note", Text: "some text with changes"}, - } - - note <- &EventNote{ - Type: "note:EventRemoveNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note"}, - } - - q.Close() -} diff --git a/broker/sqs/go.mod b/broker/sqs/go.mod index 248c351..3723cb9 100644 --- a/broker/sqs/go.mod +++ b/broker/sqs/go.mod @@ -6,9 +6,8 @@ require ( github.com/aws/aws-sdk-go-v2 v1.31.0 github.com/aws/aws-sdk-go-v2/config v1.27.37 github.com/aws/aws-sdk-go-v2/service/sqs v1.35.1 - github.com/fogfish/swarm v0.16.0 - github.com/fogfish/swarm/qtest v0.16.1 - github.com/fogfish/swarm/queue v0.16.1 + github.com/fogfish/it/v2 v2.0.2 + github.com/fogfish/swarm v0.20.0 ) require ( @@ -27,7 +26,5 @@ require ( github.com/fogfish/faults v0.2.0 // indirect github.com/fogfish/golem/hseq v1.2.0 // indirect github.com/fogfish/golem/optics v0.13.0 // indirect - github.com/fogfish/golem/pure v0.10.1 // indirect github.com/fogfish/guid/v2 v2.0.4 // indirect - github.com/fogfish/it/v2 v2.0.2 // indirect ) diff --git a/broker/sqs/go.sum b/broker/sqs/go.sum index fc2ad57..7c1d8a6 100644 --- a/broker/sqs/go.sum +++ b/broker/sqs/go.sum @@ -34,17 +34,13 @@ github.com/fogfish/golem/hseq v1.2.0 h1:B6yrzOHQNoTqSlhLb+AvK7dhEAELjHThrCQTF/uq github.com/fogfish/golem/hseq v1.2.0/go.mod h1:17XORt8nNKl6KOhF43MHSmjK8NksbkBsohAoJGiinUs= github.com/fogfish/golem/optics v0.13.0 h1:U3htppjVTMbICQIzPTTe151+WziSGEppNVmkanKa440= github.com/fogfish/golem/optics v0.13.0/go.mod h1:U1y90OVcXF/A61dIP3abQ0x2GweTmzVHPC15pv0pcM0= -github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE= -github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw= github.com/fogfish/guid/v2 v2.0.4 h1:EZiPlM4UAghqf7DU5/nLEF+iRH7ODe0AiFuYOMRvITQ= github.com/fogfish/guid/v2 v2.0.4/go.mod h1:KkZ5T4EE3BqWQJFZBPLSHV/tBe23Xq4KvuPfwtNtepU= github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o= github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4= -github.com/fogfish/swarm v0.16.0 h1:6AviPpPbrSraLcjH2GwW0oVUb7tb6IZMqdT+FdXOCHU= -github.com/fogfish/swarm v0.16.0/go.mod h1:u5uJmXu3xHz1OTzyxhu9viI6eq+GmkJJkDqdrf8IlJw= -github.com/fogfish/swarm/qtest v0.16.1 h1:bSWownAij0hhifA4IimEoFBM5xKyNt9tskCz0tPGjHk= -github.com/fogfish/swarm/qtest v0.16.1/go.mod h1:79qfqu256c40UntI1PzYEIToaXr6oJBr48gkAocupYw= -github.com/fogfish/swarm/queue v0.16.1 h1:bReiuwm2a1h0hP4ElSzETksqs1gT2yKRHtA6aVHQnKY= -github.com/fogfish/swarm/queue v0.16.1/go.mod h1:tiSPnknVI/nHs2TL8H17YszUvgBT1E2ksaOGBQV/E6Q= +github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE= +github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= +github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8= +github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo= diff --git a/broker/sqs/sqs.go b/broker/sqs/sqs.go index ee0f804..943275d 100644 --- a/broker/sqs/sqs.go +++ b/broker/sqs/sqs.go @@ -29,41 +29,68 @@ type SQS interface { } type Client struct { - service SQS - config swarm.Config - queue *string - isFIFO bool + service SQS + config swarm.Config + queue *string + isFIFO bool + batchSize int } -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - cli, err := NewSQS(queue, opts...) +// Create enqueue routine to AWS SQS +func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) { + cli, err := newSQS(queue, opts...) if err != nil { return nil, err } - config := swarm.NewConfig() - for _, opt := range opts { - opt(&config) + return kernel.NewEnqueuer(cli, cli.config), nil +} + +// Create dequeue routine to AWS SQS +func NewDequeuer(queue string, opts ...Option) (*kernel.Dequeuer, error) { + cli, err := newSQS(queue, opts...) + if err != nil { + return nil, err } - return kernel.New(cli, cli, config), err + return kernel.NewDequeuer(cli, cli.config), nil } -func NewSQS(queue string, opts ...swarm.Option) (*Client, error) { - config := swarm.NewConfig() +// Create enqueue & dequeue routine to AWS SQS +func New(queue string, opts ...Option) (*kernel.Kernel, error) { + cli, err := newSQS(queue, opts...) + if err != nil { + return nil, err + } + + return kernel.New( + kernel.NewEnqueuer(cli, cli.config), + kernel.NewDequeuer(cli, cli.config), + ), nil +} + +func newSQS(queue string, opts ...Option) (*Client, error) { + c := &Client{} + + for _, opt := range defs { + opt(c) + } for _, opt := range opts { - opt(&config) + opt(c) } - api, err := newService(&config) - if err != nil { - return nil, err + if c.service == nil { + aws, err := config.LoadDefaultConfig(context.Background()) + if err != nil { + return nil, swarm.ErrServiceIO.New(err) + } + c.service = sqs.NewFromConfig(aws) } - ctx, cancel := context.WithTimeout(context.Background(), config.NetworkTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.config.NetworkTimeout) defer cancel() - spec, err := api.GetQueueUrl(ctx, + spec, err := c.service.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{ QueueName: aws.String(queue), }, @@ -72,45 +99,27 @@ func NewSQS(queue string, opts ...swarm.Option) (*Client, error) { return nil, swarm.ErrServiceIO.New(err) } - return &Client{ - service: api, - config: config, - queue: spec.QueueUrl, - isFIFO: strings.HasSuffix(queue, ".fifo"), - }, nil -} - -func newService(conf *swarm.Config) (SQS, error) { - if conf.Service != nil { - service, ok := conf.Service.(SQS) - if ok { - return service, nil - } - } + c.queue = spec.QueueUrl + c.isFIFO = strings.HasSuffix(queue, ".fifo") - aws, err := config.LoadDefaultConfig(context.Background()) - if err != nil { - return nil, swarm.ErrServiceIO.New(err) - } - - return sqs.NewFromConfig(aws), nil + return c, nil } // Enq enqueues message to broker -func (cli *Client) Enq(bag swarm.Bag) error { - ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) +func (cli *Client) Enq(ctx context.Context, bag swarm.Bag) error { + ctx, cancel := context.WithTimeout(ctx, cli.config.NetworkTimeout) defer cancel() var idMsgGroup *string if cli.isFIFO { - idMsgGroup = aws.String(bag.Ctx.Category) + idMsgGroup = aws.String(bag.Category) } _, err := cli.service.SendMessage(ctx, &sqs.SendMessageInput{ MessageAttributes: map[string]types.MessageAttributeValue{ "Source": {StringValue: aws.String(cli.config.Source), DataType: aws.String("String")}, - "Category": {StringValue: aws.String(bag.Ctx.Category), DataType: aws.String("String")}, + "Category": {StringValue: aws.String(bag.Category), DataType: aws.String("String")}, }, MessageGroupId: idMsgGroup, MessageBody: aws.String(string(bag.Object)), @@ -124,8 +133,8 @@ func (cli *Client) Enq(bag swarm.Bag) error { return nil } -func (cli *Client) Ack(digest string) error { - ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) +func (cli *Client) Ack(ctx context.Context, digest string) error { + ctx, cancel := context.WithTimeout(ctx, cli.config.NetworkTimeout) defer cancel() _, err := cli.service.DeleteMessage(ctx, @@ -141,16 +150,21 @@ func (cli *Client) Ack(digest string) error { return nil } +func (cli *Client) Err(ctx context.Context, digest string, err error) error { + // Note: do nothing, AWS SQS makes the magic + return nil +} + // Deq dequeues message from broker -func (cli Client) Ask() ([]swarm.Bag, error) { - ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout*2) +func (cli Client) Ask(ctx context.Context) ([]swarm.Bag, error) { + ctx, cancel := context.WithTimeout(ctx, cli.config.NetworkTimeout*2) defer cancel() result, err := cli.service.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ MessageAttributeNames: []string{string(types.QueueAttributeNameAll)}, QueueUrl: cli.queue, - MaxNumberOfMessages: 1, // TODO + MaxNumberOfMessages: int32(cli.batchSize), WaitTimeSeconds: int32(cli.config.NetworkTimeout.Seconds()), }, ) @@ -162,14 +176,16 @@ func (cli Client) Ask() ([]swarm.Bag, error) { return nil, nil } - head := result.Messages[0] + bag := make([]swarm.Bag, len(result.Messages)) + for i, msg := range result.Messages { + bag[i] = swarm.Bag{ + Category: attr(&msg, "Category"), + Digest: aws.ToString(msg.ReceiptHandle), + Object: []byte(aws.ToString(msg.Body)), + } + } - return []swarm.Bag{ - { - Ctx: swarm.NewContext(context.Background(), attr(&head, "Category"), *head.ReceiptHandle), - Object: []byte(*head.Body), - }, - }, nil + return bag, nil } func attr(msg *types.Message, key string) string { diff --git a/broker/sqs/sqs_test.go b/broker/sqs/sqs_test.go index 9b2415e..f26c4c2 100644 --- a/broker/sqs/sqs_test.go +++ b/broker/sqs/sqs_test.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -10,117 +10,122 @@ package sqs_test import ( "context" - "fmt" - "strings" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs" + awssqs "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/fogfish/it/v2" "github.com/fogfish/swarm" - sut "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/qtest" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/broker/sqs" + "github.com/fogfish/swarm/dequeue" ) -func TestEnqueueSQS(t *testing.T) { - qtest.TestEnqueueTyped(t, newMockEnqueue) - qtest.TestEnqueueBytes(t, newMockEnqueue) - qtest.TestEnqueueEvent(t, newMockEnqueue) -} +func TestEnqueuer(t *testing.T) { + t.Run("NewEnqueuer", func(t *testing.T) { + mock := &mockEnqueue{} + q, err := sqs.NewEnqueuer("test", sqs.WithService(mock)) + it.Then(t).Should(it.Nil(err)) + q.Close() + }) + + t.Run("Enqueue", func(t *testing.T) { + mock := &mockEnqueue{} + + q, err := sqs.NewEnqueuer("test", sqs.WithService(mock)) + it.Then(t).Should(it.Nil(err)) -func TestDequeueSQS(t *testing.T) { - qtest.TestDequeueTyped(t, newMockDequeue) - qtest.TestDequeueBytes(t, newMockDequeue) - qtest.TestDequeueEvent(t, newMockDequeue) + err = q.Emitter.Enq(context.Background(), + swarm.Bag{ + Category: "cat", + Object: []byte(`value`), + }, + ) + it.Then(t).Should( + it.Nil(err), + it.Equal(*mock.req.MessageAttributes["Category"].StringValue, "cat"), + it.Equal(*mock.req.MessageBody, "value"), + ) + + q.Close() + }) } -// Mock AWS SQS Enqueue -type mockEnqueue struct { - sut.SQS - expectCategory string - loopback chan string +func TestDequeuer(t *testing.T) { + t.Run("NewDequeuer", func(t *testing.T) { + mock := &mockDequeue{} + q, err := sqs.NewDequeuer("test", sqs.WithService(mock)) + it.Then(t).Should(it.Nil(err)) + q.Close() + }) + + t.Run("Dequeue", func(t *testing.T) { + mock := &mockDequeue{} + + q, err := sqs.NewDequeuer("test", sqs.WithService(mock)) + it.Then(t).Should(it.Nil(err)) + + rcv, ack := dequeue.Bytes(q, "test") + go func() { + ack <- <-rcv + + time.Sleep(5 * time.Millisecond) + q.Close() + }() + + q.Await() + + it.Then(t).Should( + it.Equal(*mock.req.ReceiptHandle, "1"), + ) + }) } -func newMockEnqueue( - loopback chan string, - queueName string, - expectCategory string, - opts ...swarm.Option, -) swarm.Broker { - mock := &mockEnqueue{expectCategory: expectCategory, loopback: loopback} - conf := append(opts, swarm.WithService(mock)) - return queue.Must(sut.New(queueName, conf...)) +//------------------------------------------------------------------------------ + +type mockEnqueue struct { + sqs.SQS + req *awssqs.SendMessageInput } -func (m *mockEnqueue) GetQueueUrl(ctx context.Context, req *sqs.GetQueueUrlInput, opts ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { - return &sqs.GetQueueUrlOutput{ +func (m *mockEnqueue) GetQueueUrl(ctx context.Context, req *awssqs.GetQueueUrlInput, opts ...func(*awssqs.Options)) (*awssqs.GetQueueUrlOutput, error) { + return &awssqs.GetQueueUrlOutput{ QueueUrl: aws.String("https://sqs.eu-west-1.amazonaws.com/000000000000/mock"), }, nil } -func (m *mockEnqueue) SendMessage(ctx context.Context, req *sqs.SendMessageInput, opts ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) { - cat, exists := req.MessageAttributes["Category"] - if !exists { - return nil, fmt.Errorf("Bad message attributes") - } - - if !strings.HasPrefix(*cat.StringValue, m.expectCategory) { - return nil, fmt.Errorf("Bad message category") - } - - m.loopback <- aws.ToString(req.MessageBody) - return &sqs.SendMessageOutput{}, nil +func (m *mockEnqueue) SendMessage(ctx context.Context, req *awssqs.SendMessageInput, opts ...func(*awssqs.Options)) (*awssqs.SendMessageOutput, error) { + m.req = req + return &awssqs.SendMessageOutput{}, nil } -// Mock AWS SQS Dequeue type mockDequeue struct { - sut.SQS - returnCategory string - returnMessage string - returnReceipt string - loopback chan string -} - -func newMockDequeue( - loopback chan string, - queueName string, - returnCategory string, - returnMessage string, - returnReceipt string, - opts ...swarm.Option, -) swarm.Broker { - mock := &mockDequeue{ - loopback: loopback, - returnCategory: returnCategory, - returnMessage: returnMessage, - returnReceipt: returnReceipt, - } - conf := append(opts, swarm.WithService(mock)) - return queue.Must(sut.New(queueName, conf...)) + sqs.SQS + req *awssqs.DeleteMessageInput } -func (m *mockDequeue) GetQueueUrl(ctx context.Context, req *sqs.GetQueueUrlInput, opts ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { - return &sqs.GetQueueUrlOutput{ +func (m *mockDequeue) GetQueueUrl(ctx context.Context, req *awssqs.GetQueueUrlInput, opts ...func(*awssqs.Options)) (*awssqs.GetQueueUrlOutput, error) { + return &awssqs.GetQueueUrlOutput{ QueueUrl: aws.String("https://sqs.eu-west-1.amazonaws.com/000000000000/mock"), }, nil } -func (m *mockDequeue) ReceiveMessage(ctx context.Context, req *sqs.ReceiveMessageInput, opts ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { - return &sqs.ReceiveMessageOutput{ +func (m *mockDequeue) ReceiveMessage(ctx context.Context, req *awssqs.ReceiveMessageInput, opts ...func(*awssqs.Options)) (*awssqs.ReceiveMessageOutput, error) { + return &awssqs.ReceiveMessageOutput{ Messages: []types.Message{ { MessageAttributes: map[string]types.MessageAttributeValue{ - "Category": {StringValue: aws.String(m.returnCategory)}, + "Category": {StringValue: aws.String("test")}, }, - Body: aws.String(m.returnMessage), - ReceiptHandle: aws.String(m.returnReceipt), + Body: aws.String(`{"sut":"test"}`), + ReceiptHandle: aws.String(`1`), }, }, }, nil } -func (m *mockDequeue) DeleteMessage(ctx context.Context, req *sqs.DeleteMessageInput, opts ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { - m.loopback <- aws.ToString(req.ReceiptHandle) - return &sqs.DeleteMessageOutput{}, nil +func (m *mockDequeue) DeleteMessage(ctx context.Context, req *awssqs.DeleteMessageInput, opts ...func(*awssqs.Options)) (*awssqs.DeleteMessageOutput, error) { + m.req = req + return &awssqs.DeleteMessageOutput{}, nil }