diff --git a/broker/eventbridge/awscdk.go b/broker/eventbridge/awscdk.go index e716a57..4b569a2 100644 --- a/broker/eventbridge/awscdk.go +++ b/broker/eventbridge/awscdk.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -79,6 +79,7 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink { ) if props.Function != nil { + props.Function.Setenv(EnvEventBridge, *props.System.EventBusName()) sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function) sink.Rule.AddTarget(awseventstargets.NewLambdaFunction( diff --git a/broker/eventbridge/awscdk_test.go b/broker/eventbridge/awscdk_test.go index c320852..53b40df 100644 --- a/broker/eventbridge/awscdk_test.go +++ b/broker/eventbridge/awscdk_test.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -27,10 +27,11 @@ func TestEventBridgeCDK(t *testing.T) { broker.NewSink( &eventbridge.SinkProps{ - Source: []string{"swarm-example-eventbridge"}, + Source: []string{"swarm-example-eventbridge"}, + Categories: []string{"category"}, Function: &scud.FunctionGoProps{ SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge", - SourceCodeLambda: "examples/dequeue", + SourceCodeLambda: "examples/dequeue/typed", }, }, ) diff --git a/broker/eventbridge/config.go b/broker/eventbridge/config.go new file mode 100644 index 0000000..db1b8f5 --- /dev/null +++ b/broker/eventbridge/config.go @@ -0,0 +1,52 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventbridge + +import ( + "os" + "time" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel/encoding" +) + +type Option func(*Client) + +var defs = []Option{WithConfig(), WithEnv()} + +func WithConfig(opts ...swarm.Option) Option { + return func(c *Client) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + // Mandatory overrides + config.PollFrequency = 5 * time.Microsecond + config.Codec = encoding.NewCodecPacket() + + c.config = config + } +} + +func WithService(service EventBridge) Option { + return func(c *Client) { + c.service = service + } +} + +const EnvEventBridge = "CONFIG_SWARM_EVENT_BRIDGE" + +func WithEnv() Option { + return func(c *Client) { + if val, has := os.LookupEnv(EnvEventBridge); has { + c.bus = val + } + } +} diff --git a/broker/eventbridge/eventbridge.go b/broker/eventbridge/eventbridge.go index 1f95328..7c2d33b 100644 --- a/broker/eventbridge/eventbridge.go +++ b/broker/eventbridge/eventbridge.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -27,75 +27,63 @@ type EventBridge interface { PutEvents(context.Context, *eventbridge.PutEventsInput, ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) } +// EventBridge client type Client struct { service EventBridge bus string config swarm.Config } -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - cli, err := NewEventBridge(queue, opts...) +// Create writer to AWS EventBridge +func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) { + cli, err := newEventBridge(queue, opts...) if err != nil { return nil, err } - config := swarm.NewConfig() - for _, opt := range opts { - opt(&config) - } + return kernel.NewEnqueuer(cli, cli.config), nil +} - starter := lambda.Start +// Create reader from AWS EventBridge +func NewReader(queue string, opts ...Option) (*kernel.Dequeuer, error) { + c := &Client{bus: queue} - type Mock interface{ Start(interface{}) } - if config.Service != nil { - service, ok := config.Service.(Mock) - if ok { - starter = service.Start - } + for _, opt := range defs { + opt(c) } - - sls := spawner{f: starter, c: config} - - return kernel.New(cli, sls, config), err -} - -func NewEventBridge(bus string, opts ...swarm.Option) (*Client, error) { - config := swarm.NewConfig() for _, opt := range opts { - opt(&config) + opt(c) } - api, err := newService(&config) - if err != nil { - return nil, err - } + bridge := &bridge{kernel.NewBridge(c.config.TimeToFlight)} - return &Client{ - service: api, - bus: bus, - config: config, - }, nil + return kernel.NewDequeuer(bridge, c.config), nil } -func newService(conf *swarm.Config) (EventBridge, error) { - if conf.Service != nil { - service, ok := conf.Service.(EventBridge) - if ok { - return service, nil - } +func newEventBridge(queue string, opts ...Option) (*Client, error) { + c := &Client{bus: queue} + + for _, opt := range defs { + opt(c) + } + for _, opt := range opts { + opt(c) } - aws, err := config.LoadDefaultConfig(context.Background()) - if err != nil { - return nil, swarm.ErrServiceIO.New(err) + if c.service == nil { + aws, err := config.LoadDefaultConfig(context.Background()) + if err != nil { + return nil, swarm.ErrServiceIO.New(err) + } + c.service = eventbridge.NewFromConfig(aws) } - return eventbridge.NewFromConfig(aws), nil + return c, nil } // Enq enqueues message to broker -func (cli *Client) Enq(bag swarm.Bag) error { - ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) +func (cli *Client) Enq(ctx context.Context, bag swarm.Bag) error { + ctx, cancel := context.WithTimeout(ctx, cli.config.NetworkTimeout) defer cancel() ret, err := cli.service.PutEvents(ctx, @@ -104,7 +92,7 @@ func (cli *Client) Enq(bag swarm.Bag) error { { EventBusName: aws.String(cli.bus), Source: aws.String(cli.config.Source), - DetailType: aws.String(bag.Ctx.Category), + DetailType: aws.String(bag.Category), Detail: aws.String(string(bag.Object)), }, }, @@ -126,26 +114,17 @@ func (cli *Client) Enq(bag swarm.Bag) error { //------------------------------------------------------------------------------ -type spawner struct { - c swarm.Config - f func(any) -} +type bridge struct{ *kernel.Bridge } -func (s spawner) Spawn(k *kernel.Kernel) error { - s.f( - func(evt events.CloudWatchEvent) error { - bag := make([]swarm.Bag, 1) - bag[0] = swarm.Bag{ - Ctx: swarm.NewContext(context.Background(), evt.DetailType, evt.ID), - Object: evt.Detail, - } +func (s bridge) Run() { lambda.Start(s.run) } - return k.Dispatch(bag, s.c.TimeToFlight) - }, - ) +func (s bridge) run(evt events.CloudWatchEvent) error { + bag := make([]swarm.Bag, 1) + bag[0] = swarm.Bag{ + Category: evt.DetailType, + Digest: evt.ID, + Object: evt.Detail, + } - return nil + return s.Bridge.Dispatch(bag) } - -func (s spawner) Ack(digest string) error { return nil } -func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } diff --git a/broker/eventbridge/eventbridge_test.go b/broker/eventbridge/eventbridge_test.go index 7b9a58d..a9ff7c0 100644 --- a/broker/eventbridge/eventbridge_test.go +++ b/broker/eventbridge/eventbridge_test.go @@ -1,116 +1,125 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. // https://github.com/fogfish/swarm // -package eventbridge_test +package eventbridge import ( "context" "encoding/json" "fmt" - "strings" "testing" + "time" "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/eventbridge" + "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" + "github.com/fogfish/it/v2" "github.com/fogfish/swarm" - sut "github.com/fogfish/swarm/broker/eventbridge" - "github.com/fogfish/swarm/qtest" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/kernel" ) -func TestEnqueueEventBridge(t *testing.T) { - qtest.TestEnqueueTyped(t, newMockEnqueue) - qtest.TestEnqueueBytes(t, newMockEnqueue) - qtest.TestEnqueueEvent(t, newMockEnqueue) -} +func TestReader(t *testing.T) { + var bag []swarm.Bag + bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)} + + t.Run("New", func(t *testing.T) { + q, err := NewReader("test") + it.Then(t).Should(it.Nil(err)) + q.Close() + }) + + t.Run("Dequeue", func(t *testing.T) { + go func() { + bag, _ = bridge.Ask(context.Background()) + for _, m := range bag { + bridge.Ack(context.Background(), m.Digest) + } + }() + + err := bridge.run( + events.CloudWatchEvent{ + ID: "abc-def", + DetailType: "category", + Detail: json.RawMessage(`{"sut":"test"}`), + }, + ) + + it.Then(t).Should( + it.Nil(err), + it.Equal(len(bag), 1), + it.Equal(bag[0].Category, "category"), + it.Equiv(bag[0].Object, []byte(`{"sut":"test"}`)), + ) + }) + + t.Run("Dequeue.Timeout", func(t *testing.T) { + go func() { + bag, _ = bridge.Ask(context.Background()) + }() -func TestDequeueEventBridge(t *testing.T) { - qtest.TestDequeueTyped(t, newMockDequeue) - qtest.TestDequeueBytes(t, newMockDequeue) - qtest.TestDequeueEvent(t, newMockDequeue) + err := bridge.run( + events.CloudWatchEvent{ + ID: "abc-def", + DetailType: "category", + Detail: json.RawMessage(`{"sut":"test"}`), + }, + ) + + it.Then(t).ShouldNot( + it.Nil(err), + ) + }) } -// Mock AWS EventBridge Enqueue -type mockEnqueue struct { - sut.EventBridge - expectCategory string - loopback chan string +func TestWriter(t *testing.T) { + t.Run("New", func(t *testing.T) { + q, err := NewWriter("test") + it.Then(t).Should(it.Nil(err)) + q.Close() + }) + + t.Run("Enqueue", func(t *testing.T) { + mock := &mockEventBridge{} + + q, err := NewWriter("test", WithService(mock)) + it.Then(t).Should(it.Nil(err)) + + err = q.Emitter.Enq(context.Background(), + swarm.Bag{ + Category: "cat", + Object: []byte(`value`), + }, + ) + it.Then(t).Should( + it.Nil(err), + it.Equal(*mock.val.DetailType, "cat"), + it.Equal(*mock.val.Detail, "value"), + ) + + q.Close() + }) } -func newMockEnqueue( - loopback chan string, - queueName string, - expectCategory string, - opts ...swarm.Option, -) swarm.Broker { - mock := &mockEnqueue{expectCategory: expectCategory, loopback: loopback} - conf := append(opts, swarm.WithService(mock)) - return queue.Must(sut.New(queueName, conf...)) +//------------------------------------------------------------------------------ + +type mockEventBridge struct { + EventBridge + val types.PutEventsRequestEntry } -func (m *mockEnqueue) PutEvents(ctx context.Context, req *eventbridge.PutEventsInput, opts ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) { +func (m *mockEventBridge) PutEvents(ctx context.Context, req *eventbridge.PutEventsInput, opts ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) { if len(req.Entries) != 1 { return nil, fmt.Errorf("Bad request") } - if !strings.HasPrefix(*req.Entries[0].DetailType, m.expectCategory) { - return nil, fmt.Errorf("Bad message category") - } + m.val = req.Entries[0] - m.loopback <- aws.ToString(req.Entries[0].Detail) return &eventbridge.PutEventsOutput{ FailedEntryCount: 0, }, nil } - -// Mock AWS EventBridge Dequeue -func newMockDequeue( - loopback chan string, - queueName string, - returnCategory string, - returnMessage string, - returnReceipt string, - opts ...swarm.Option, -) swarm.Broker { - mock := &mockLambda{ - loopback: loopback, - returnCategory: returnCategory, - returnMessage: returnMessage, - returnReceipt: returnReceipt, - } - conf := append(opts, swarm.WithService(mock)) - return queue.Must(sut.New(queueName, conf...)) -} - -type mockLambda struct { - sut.EventBridge - loopback chan string - returnCategory string - returnMessage string - returnReceipt string -} - -func (mock *mockLambda) Start(handler interface{}) { - msg, _ := json.Marshal( - events.CloudWatchEvent{ - ID: "abc-def", - DetailType: mock.returnCategory, - Detail: json.RawMessage(mock.returnMessage), - }, - ) - - h := lambda.NewHandler(handler) - _, err := h.Invoke(context.Background(), msg) - if err != nil { - panic(err) - } - - mock.loopback <- mock.returnReceipt -} diff --git a/broker/eventbridge/examples/dequeue/bytes/eventbridge.go b/broker/eventbridge/examples/dequeue/bytes/eventbridge.go new file mode 100644 index 0000000..c1484f7 --- /dev/null +++ b/broker/eventbridge/examples/dequeue/bytes/eventbridge.go @@ -0,0 +1,45 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/dequeue" +) + +func main() { + q, err := eventbridge.NewReader("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge reader has failed", "err", err) + return + } + + // + go actor("user").handle(dequeue.Bytes(q, "User")) + go actor("note").handle(dequeue.Bytes(q, "Note")) + go actor("like").handle(dequeue.Bytes(q, "Like")) + + q.Await() +} + +type actor string + +func (a actor) handle(rcv <-chan swarm.Msg[[]byte], ack chan<- swarm.Msg[[]byte]) { + for msg := range rcv { + slog.Info("Event", "type", a, "msg", string(msg.Object)) + ack <- msg + } +} diff --git a/broker/eventbridge/examples/dequeue/event/eventbridge.go b/broker/eventbridge/examples/dequeue/event/eventbridge.go new file mode 100644 index 0000000..cec8828 --- /dev/null +++ b/broker/eventbridge/examples/dequeue/event/eventbridge.go @@ -0,0 +1,61 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "encoding/json" + "fmt" + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/dequeue" +) + +type Event = swarm.Event[swarm.Meta, EventNote] + +type EventNote struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func main() { + q, err := eventbridge.NewReader("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge reader has failed", "err", err) + return + } + + go bus(dequeue.Event[swarm.Meta, EventNote](q)) + + q.Await() +} + +func bus(rcv <-chan swarm.Msg[Event], ack chan<- swarm.Msg[Event]) { + for msg := range rcv { + prefix := "" + switch string(msg.Object.Meta.Type) { + case "note:EventCreateNote": + prefix = "+ |" + case "note:EventUpdateNote": + prefix = "~ |" + case "note:EventRemoveNote": + prefix = "- |" + } + + v, _ := json.MarshalIndent(msg, prefix, " ") + fmt.Printf("event > \n %s\n", v) + + ack <- msg + } +} diff --git a/broker/eventbridge/examples/dequeue/eventbridge.go b/broker/eventbridge/examples/dequeue/eventbridge.go deleted file mode 100644 index ba6ee53..0000000 --- a/broker/eventbridge/examples/dequeue/eventbridge.go +++ /dev/null @@ -1,78 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package main - -import ( - "encoding/json" - "fmt" - "log/slog" - - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/broker/eventbridge" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/events" -) - -type User struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Note struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Like struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type EventNote swarm.Event[*Note] - -func (EventNote) HKT1(swarm.EventType) {} -func (EventNote) HKT2(*Note) {} - -func main() { - q := queue.Must(eventbridge.New("swarm-example-eventbridge", swarm.WithLogStdErr())) - - go actor[User]("user").handle(queue.Dequeue[User](q)) - go actor[Note]("note").handle(queue.Dequeue[Note](q)) - go actor[Like]("like").handle(queue.Dequeue[Like](q)) - go ebus(events.Dequeue[*Note, EventNote](q)) - - q.Await() -} - -type actor[T any] string - -func (a actor[T]) handle(rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { - for msg := range rcv { - slog.Info("Event", "type", a, "msg", msg.Object) - ack <- msg - } -} - -func ebus(rcv <-chan swarm.Msg[*EventNote], ack chan<- swarm.Msg[*EventNote]) { - for msg := range rcv { - prefix := "" - switch string(msg.Object.Type) { - case "note:EventCreateNote": - prefix = "+ |" - case "note:EventUpdateNote": - prefix = "~ |" - case "note:EventRemoveNote": - prefix = "- |" - } - - v, _ := json.MarshalIndent(msg, prefix, " ") - fmt.Printf("common note > \n %s\n", v) - ack <- msg - } -} diff --git a/broker/eventbridge/examples/dequeue/typed/eventbridge.go b/broker/eventbridge/examples/dequeue/typed/eventbridge.go new file mode 100644 index 0000000..6c92c82 --- /dev/null +++ b/broker/eventbridge/examples/dequeue/typed/eventbridge.go @@ -0,0 +1,60 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/dequeue" +) + +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type Note struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type Like struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func main() { + q, err := eventbridge.NewReader("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge reader has failed", "err", err) + return + } + + // + go actor[User]("user").handle(dequeue.Typed[User](q)) + go actor[Note]("note").handle(dequeue.Typed[Note](q)) + go actor[Like]("like").handle(dequeue.Typed[Like](q)) + + q.Await() +} + +type actor[T any] string + +func (a actor[T]) handle(rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { + for msg := range rcv { + slog.Info("Event", "type", a, "msg", msg.Object) + ack <- msg + } +} diff --git a/broker/eventbridge/examples/enqueue/bytes/eventbridge.go b/broker/eventbridge/examples/enqueue/bytes/eventbridge.go new file mode 100644 index 0000000..ae98608 --- /dev/null +++ b/broker/eventbridge/examples/enqueue/bytes/eventbridge.go @@ -0,0 +1,40 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/enqueue" +) + +func main() { + q, err := eventbridge.NewWriter("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithSource("swarm-example-eventbridge"), + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge writer has failed", "err", err) + return + } + + user := swarm.LogDeadLetters(enqueue.Bytes(q, "User")) + note := swarm.LogDeadLetters(enqueue.Bytes(q, "Note")) + like := swarm.LogDeadLetters(enqueue.Bytes(q, "Like")) + + user <- []byte(`User Signed in`) + note <- []byte(`User wrote note`) + like <- []byte(`User liked note`) + + q.Close() +} diff --git a/broker/eventbridge/examples/enqueue/event/eventbridge.go b/broker/eventbridge/examples/enqueue/event/eventbridge.go new file mode 100644 index 0000000..ef942ac --- /dev/null +++ b/broker/eventbridge/examples/enqueue/event/eventbridge.go @@ -0,0 +1,68 @@ +// +// Copyright (C) 2021 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/enqueue" +) + +type Event = swarm.Event[swarm.Meta, EventNote] + +type EventNote struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func main() { + q, err := eventbridge.NewWriter("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithSource("swarm-example-eventbridge"), + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge writer has failed", "err", err) + return + } + + bus := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, EventNote](q)) + + bus <- Event{ + Meta: &swarm.Meta{ + Type: "note:EventCreateNote", + Agent: "example", + Participant: "user", + }, + Data: &EventNote{ID: "note", Text: "some text"}, + } + + bus <- Event{ + Meta: &swarm.Meta{ + Type: "note:EventUpdateNote", + Agent: "example", + Participant: "user", + }, + Data: &EventNote{ID: "note", Text: "some text with changes"}, + } + + bus <- Event{ + Meta: &swarm.Meta{ + Type: "note:EventRemoveNote", + Agent: "example", + Participant: "user", + }, + Data: &EventNote{ID: "note"}, + } + + q.Close() +} diff --git a/broker/eventbridge/examples/enqueue/eventbridge.go b/broker/eventbridge/examples/enqueue/eventbridge.go deleted file mode 100644 index 3efa0f7..0000000 --- a/broker/eventbridge/examples/enqueue/eventbridge.go +++ /dev/null @@ -1,77 +0,0 @@ -// -// Copyright (C) 2021 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package main - -import ( - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/broker/eventbridge" - "github.com/fogfish/swarm/queue" - "github.com/fogfish/swarm/queue/events" -) - -type User struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Note struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type Like struct { - ID string `json:"id"` - Text string `json:"text"` -} - -type EventNote swarm.Event[*Note] - -func (EventNote) HKT1(swarm.EventType) {} -func (EventNote) HKT2(*Note) {} - -func main() { - q := queue.Must(eventbridge.New("swarm-example-eventbridge", - swarm.WithSource("swarm-example-eventbridge"), - swarm.WithLogStdErr(), - )) - - user := swarm.LogDeadLetters(queue.Enqueue[*User](q)) - note := swarm.LogDeadLetters(queue.Enqueue[*Note](q)) - like := swarm.LogDeadLetters(queue.Enqueue[*Like](q)) - ebus := swarm.LogDeadLetters(events.Enqueue[*Note, EventNote](q)) - - user <- &User{ID: "user", Text: "some text"} - note <- &Note{ID: "note", Text: "some text"} - like <- &Like{ID: "like", Text: "some text"} - - // - // Single channel emits event - ebus <- &EventNote{ - Type: "note:EventCreateNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note", Text: "some text"}, - } - - ebus <- &EventNote{ - Type: "note:EventUpdateNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note", Text: "some text with changes"}, - } - - ebus <- &EventNote{ - Type: "note:EventRemoveNote", - Agent: "example", - Participant: "user", - Object: &Note{ID: "note"}, - } - - q.Close() -} diff --git a/broker/eventbridge/examples/enqueue/typed/eventbridge.go b/broker/eventbridge/examples/enqueue/typed/eventbridge.go new file mode 100644 index 0000000..2e26aa1 --- /dev/null +++ b/broker/eventbridge/examples/enqueue/typed/eventbridge.go @@ -0,0 +1,55 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventbridge" + "github.com/fogfish/swarm/enqueue" +) + +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type Note struct { + ID string `json:"id"` + Text string `json:"text"` +} + +type Like struct { + ID string `json:"id"` + Text string `json:"text"` +} + +func main() { + q, err := eventbridge.NewWriter("swarm-example-eventbridge", + eventbridge.WithConfig( + swarm.WithSource("swarm-example-eventbridge"), + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventbridge writer has failed", "err", err) + return + } + + user := swarm.LogDeadLetters(enqueue.Typed[*User](q)) + note := swarm.LogDeadLetters(enqueue.Typed[*Note](q)) + like := swarm.LogDeadLetters(enqueue.Typed[*Like](q)) + + user <- &User{ID: "user", Text: "user signed in"} + note <- &Note{ID: "note", Text: "user wrote note"} + like <- &Like{ID: "like", Text: "user liked note"} + + q.Close() +} diff --git a/broker/eventbridge/examples/serverless/cdk.json b/broker/eventbridge/examples/serverless/cdk.json index 79691a5..1f4025a 100644 --- a/broker/eventbridge/examples/serverless/cdk.json +++ b/broker/eventbridge/examples/serverless/cdk.json @@ -1,5 +1,5 @@ { - "app": "go run main.go", + "app": "go run eventbridge.go", "requireApproval": "never", "context": { "@aws-cdk/aws-apigateway:usagePlanKeyOrderInsensitiveId": true, diff --git a/broker/eventbridge/examples/serverless/main.go b/broker/eventbridge/examples/serverless/eventbridge.go similarity index 51% rename from broker/eventbridge/examples/serverless/main.go rename to broker/eventbridge/examples/serverless/eventbridge.go index df96c81..1a35255 100644 --- a/broker/eventbridge/examples/serverless/main.go +++ b/broker/eventbridge/examples/serverless/eventbridge.go @@ -28,15 +28,39 @@ func main() { }, ) + // broker := eventbridge.NewBroker(stack, jsii.String("Broker"), nil) broker.NewEventBus(nil) broker.NewSink( &eventbridge.SinkProps{ - Source: []string{"swarm-example-eventbridge"}, + Source: []string{"swarm-example-eventbridge"}, + Categories: []string{"User", "Note", "Like"}, Function: &scud.FunctionGoProps{ - SourceCodeModule: "github.com/fogfish/swarm", - SourceCodeLambda: "examples/eventbridge/dequeue", + SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge", + SourceCodeLambda: "examples/dequeue/typed", + }, + }, + ) + + broker.NewSink( + &eventbridge.SinkProps{ + Source: []string{"swarm-example-eventbridge"}, + Categories: []string{"EventNote"}, + Function: &scud.FunctionGoProps{ + SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge", + SourceCodeLambda: "examples/dequeue/event", + }, + }, + ) + + broker.NewSink( + &eventbridge.SinkProps{ + Source: []string{"swarm-example-eventbridge"}, + Categories: []string{"User", "Note", "Like"}, + Function: &scud.FunctionGoProps{ + SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge", + SourceCodeLambda: "examples/dequeue/bytes", }, }, ) diff --git a/broker/eventbridge/go.mod b/broker/eventbridge/go.mod index c51af6e..ba646ad 100644 --- a/broker/eventbridge/go.mod +++ b/broker/eventbridge/go.mod @@ -6,19 +6,18 @@ require ( github.com/aws/aws-cdk-go/awscdk/v2 v2.160.0 github.com/aws/aws-lambda-go v1.47.0 github.com/aws/aws-sdk-go-v2 v1.31.0 - github.com/aws/aws-sdk-go-v2/config v1.27.37 - github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.1 + github.com/aws/aws-sdk-go-v2/config v1.27.39 + github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.3 github.com/aws/constructs-go/constructs/v10 v10.3.0 github.com/aws/jsii-runtime-go v1.103.1 - github.com/fogfish/scud v0.10.1 - github.com/fogfish/swarm v0.16.0 - github.com/fogfish/swarm/qtest v0.16.1 - github.com/fogfish/swarm/queue v0.16.1 + github.com/fogfish/it/v2 v2.0.2 + github.com/fogfish/scud v0.10.2 + github.com/fogfish/swarm v0.20.0 ) require ( github.com/Masterminds/semver/v3 v3.2.1 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.35 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.37 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect @@ -26,9 +25,9 @@ require ( github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.23.1 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.31.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect github.com/aws/smithy-go v1.21.0 // indirect github.com/cdklabs/awscdk-asset-awscli-go/awscliv1/v2 v2.2.202 // indirect github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect @@ -39,9 +38,7 @@ require ( github.com/fogfish/faults v0.2.0 // indirect github.com/fogfish/golem/hseq v1.2.0 // indirect github.com/fogfish/golem/optics v0.13.0 // indirect - github.com/fogfish/golem/pure v0.10.1 // indirect github.com/fogfish/guid/v2 v2.0.4 // indirect - github.com/fogfish/it/v2 v2.0.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/yuin/goldmark v1.5.3 // indirect diff --git a/broker/eventbridge/go.sum b/broker/eventbridge/go.sum index b54a421..c770551 100644 --- a/broker/eventbridge/go.sum +++ b/broker/eventbridge/go.sum @@ -6,10 +6,10 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U= github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA= -github.com/aws/aws-sdk-go-v2/config v1.27.37 h1:xaoIwzHVuRWRHFI0jhgEdEGc8xE1l91KaeRDsWEIncU= -github.com/aws/aws-sdk-go-v2/config v1.27.37/go.mod h1:S2e3ax9/8KnMSyRVNd3sWTKs+1clJ2f1U6nE0lpvQRg= -github.com/aws/aws-sdk-go-v2/credentials v1.17.35 h1:7QknrZhYySEB1lEXJxGAmuD5sWwys5ZXNr4m5oEz0IE= -github.com/aws/aws-sdk-go-v2/credentials v1.17.35/go.mod h1:8Vy4kk7at4aPSmibr7K+nLTzG6qUQAUO4tW49fzUV4E= +github.com/aws/aws-sdk-go-v2/config v1.27.39 h1:FCylu78eTGzW1ynHcongXK9YHtoXD5AiiUqq3YfJYjU= +github.com/aws/aws-sdk-go-v2/config v1.27.39/go.mod h1:wczj2hbyskP4LjMKBEZwPRO1shXY+GsQleab+ZXT2ik= +github.com/aws/aws-sdk-go-v2/credentials v1.17.37 h1:G2aOH01yW8X373JK419THj5QVqu9vKEwxSEsGxihoW0= +github.com/aws/aws-sdk-go-v2/credentials v1.17.37/go.mod h1:0ecCjlb7htYCptRD45lXJ6aJDQac6D2NlKGpZqyTG6A= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM= @@ -20,18 +20,18 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvK github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18 h1:OWYvKL53l1rbsUmW7bQyJVsYU/Ii3bbAAQIIFNbM0Tk= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.18/go.mod h1:CUx0G1v3wG6l01tUB+j7Y8kclA8NSqK4ef0YG79a4cg= -github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.1 h1:m6Jf7bqgAC93Z22W8JDSD+S26D1QAmPAG7jVgliDoVc= -github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.1/go.mod h1:bcL34EfmexE+PLh2o4oC1VFpP82Ev8p4dL0PqdZ13dE= +github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.3 h1:voc3mmh8nP2y+XobELnq5ge7Om5FFJQ93AnTUTMwgUQ= +github.com/aws/aws-sdk-go-v2/service/eventbridge v1.34.3/go.mod h1:bcL34EfmexE+PLh2o4oC1VFpP82Ev8p4dL0PqdZ13dE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg= -github.com/aws/aws-sdk-go-v2/service/sso v1.23.1 h1:2jrVsMHqdLD1+PA4BA6Nh1eZp0Gsy3mFSB5MxDvcJtU= -github.com/aws/aws-sdk-go-v2/service/sso v1.23.1/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1 h1:0L7yGCg3Hb3YQqnSgBTZM5wepougtL1aEccdcdYhHME= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E= -github.com/aws/aws-sdk-go-v2/service/sts v1.31.1 h1:8K0UNOkZiK9Uh3HIF6Bx0rcNCftqGCeKmOaR7Gp5BSo= -github.com/aws/aws-sdk-go-v2/service/sts v1.31.1/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 h1:rs4JCczF805+FDv2tRhZ1NU0RB2H6ryAvsWPanAr72Y= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.3/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 h1:S7EPdMVZod8BGKQQPTBK+FcX9g7bKR7c4+HxWqHP7Vg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 h1:VzudTFrDCIDakXtemR7l6Qzt2+JYsVqo2MxBPt5k8T8= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.3/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= github.com/aws/constructs-go/constructs/v10 v10.3.0 h1:LsjBIMiaDX/vqrXWhzTquBJ9pPdi02/H+z1DCwg0PEM= github.com/aws/constructs-go/constructs/v10 v10.3.0/go.mod h1:GgzwIwoRJ2UYsr3SU+JhAl+gq5j39bEMYf8ev3J+s9s= github.com/aws/jsii-runtime-go v1.103.1 h1:7CwjdpiSrylOeuYP1LzHu2AJKV2K65P89nuOC/8Do7g= @@ -58,22 +58,18 @@ github.com/fogfish/golem/hseq v1.2.0 h1:B6yrzOHQNoTqSlhLb+AvK7dhEAELjHThrCQTF/uq github.com/fogfish/golem/hseq v1.2.0/go.mod h1:17XORt8nNKl6KOhF43MHSmjK8NksbkBsohAoJGiinUs= github.com/fogfish/golem/optics v0.13.0 h1:U3htppjVTMbICQIzPTTe151+WziSGEppNVmkanKa440= github.com/fogfish/golem/optics v0.13.0/go.mod h1:U1y90OVcXF/A61dIP3abQ0x2GweTmzVHPC15pv0pcM0= -github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE= -github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw= github.com/fogfish/guid/v2 v2.0.4 h1:EZiPlM4UAghqf7DU5/nLEF+iRH7ODe0AiFuYOMRvITQ= github.com/fogfish/guid/v2 v2.0.4/go.mod h1:KkZ5T4EE3BqWQJFZBPLSHV/tBe23Xq4KvuPfwtNtepU= github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o= github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4= -github.com/fogfish/scud v0.10.1 h1:eJI/1zQamihBTTwDhgn2VTXlG+74B1qVAOnGGDv4u7E= -github.com/fogfish/scud v0.10.1/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs= -github.com/fogfish/swarm v0.16.0 h1:6AviPpPbrSraLcjH2GwW0oVUb7tb6IZMqdT+FdXOCHU= -github.com/fogfish/swarm v0.16.0/go.mod h1:u5uJmXu3xHz1OTzyxhu9viI6eq+GmkJJkDqdrf8IlJw= -github.com/fogfish/swarm/qtest v0.16.1 h1:bSWownAij0hhifA4IimEoFBM5xKyNt9tskCz0tPGjHk= -github.com/fogfish/swarm/qtest v0.16.1/go.mod h1:79qfqu256c40UntI1PzYEIToaXr6oJBr48gkAocupYw= -github.com/fogfish/swarm/queue v0.16.1 h1:bReiuwm2a1h0hP4ElSzETksqs1gT2yKRHtA6aVHQnKY= -github.com/fogfish/swarm/queue v0.16.1/go.mod h1:tiSPnknVI/nHs2TL8H17YszUvgBT1E2ksaOGBQV/E6Q= +github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE= +github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= +github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w= +github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs= +github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8= +github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/broker/eventbridge/version.go b/broker/eventbridge/version.go index c876700..492a814 100644 --- a/broker/eventbridge/version.go +++ b/broker/eventbridge/version.go @@ -8,4 +8,4 @@ package eventbridge -const Version = "broker/eventbridge/v0.16.1" +const Version = "broker/eventbridge/v0.20.0"