diff --git a/broker/eventbridge/README.md b/broker/eventbridge/README.md new file mode 100644 index 0000000..f1355e2 --- /dev/null +++ b/broker/eventbridge/README.md @@ -0,0 +1,35 @@ +# AWS EventBridge Broker + +The sub-module implements swarm broker for AWS EventBridge. See [the library documentation](../../README.md) for details. + + +## Serverless + + +**AWS Event Bridge** has a feature that allows to [match execution of consumer to the pattern](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html#CloudWatchEventsPatterns) of JSON object. Use it to build reliable matching of incoming events: + +```go +/* +enq <- &swarm.Event[User]{ + Meta: &swarm.Meta{Agent: "swarm:example", Participant: "user"}, + Data: &User{ID: "user", Text: "some text"}, +} +*/ + +stack.NewSink( + &eventbridge.SinkProps{ + Pattern: map[string]interface{}{ + "@type": []string{"[User]"}, + "agent": []string{"[swarm:example]"}, + "participant": []string{"[user]"}, + }, + /* ... */ + }, +) +``` + + +## Limitation + +AWS EventBridge cannot transmit bytes. They have to be enveloped as JSON object. +The library implements automatic encapsulation using built-in `kernel/encoding/CodecPacket`. Use these codec (`WithPacketCodec`) if you automatically source events from EventBridge to SQS. diff --git a/broker/eventbridge/config.go b/broker/eventbridge/config.go index db1b8f5..b73d3fe 100644 --- a/broker/eventbridge/config.go +++ b/broker/eventbridge/config.go @@ -29,7 +29,7 @@ func WithConfig(opts ...swarm.Option) Option { // Mandatory overrides config.PollFrequency = 5 * time.Microsecond - config.Codec = encoding.NewCodecPacket() + config.PacketCodec = encoding.NewCodecPacket() c.config = config } diff --git a/broker/eventbridge/eventbridge.go b/broker/eventbridge/eventbridge.go index 7c2d33b..c42295b 100644 --- a/broker/eventbridge/eventbridge.go +++ b/broker/eventbridge/eventbridge.go @@ -35,7 +35,7 @@ type Client struct { } // Create writer to AWS EventBridge -func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) { +func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) { cli, err := newEventBridge(queue, opts...) if err != nil { return nil, err @@ -45,7 +45,7 @@ func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) { } // Create reader from AWS EventBridge -func NewReader(queue string, opts ...Option) (*kernel.Dequeuer, error) { +func NewDequeuer(queue string, opts ...Option) (*kernel.Dequeuer, error) { c := &Client{bus: queue} for _, opt := range defs { diff --git a/broker/eventbridge/eventbridge_test.go b/broker/eventbridge/eventbridge_test.go index a9ff7c0..2acc5fa 100644 --- a/broker/eventbridge/eventbridge_test.go +++ b/broker/eventbridge/eventbridge_test.go @@ -28,7 +28,7 @@ func TestReader(t *testing.T) { bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)} t.Run("New", func(t *testing.T) { - q, err := NewReader("test") + q, err := NewDequeuer("test") it.Then(t).Should(it.Nil(err)) q.Close() }) @@ -78,7 +78,7 @@ func TestReader(t *testing.T) { func TestWriter(t *testing.T) { t.Run("New", func(t *testing.T) { - q, err := NewWriter("test") + q, err := NewEnqueuer("test") it.Then(t).Should(it.Nil(err)) q.Close() }) @@ -86,7 +86,7 @@ func TestWriter(t *testing.T) { t.Run("Enqueue", func(t *testing.T) { mock := &mockEventBridge{} - q, err := NewWriter("test", WithService(mock)) + q, err := NewEnqueuer("test", WithService(mock)) it.Then(t).Should(it.Nil(err)) err = q.Emitter.Enq(context.Background(), diff --git a/broker/eventbridge/examples/dequeue/bytes/eventbridge.go b/broker/eventbridge/examples/dequeue/bytes/eventbridge.go index c1484f7..f34a062 100644 --- a/broker/eventbridge/examples/dequeue/bytes/eventbridge.go +++ b/broker/eventbridge/examples/dequeue/bytes/eventbridge.go @@ -17,7 +17,7 @@ import ( ) func main() { - q, err := eventbridge.NewReader("swarm-example-eventbridge", + q, err := eventbridge.NewDequeuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithLogStdErr(), ), diff --git a/broker/eventbridge/examples/dequeue/event/eventbridge.go b/broker/eventbridge/examples/dequeue/event/eventbridge.go index cec8828..9607a51 100644 --- a/broker/eventbridge/examples/dequeue/event/eventbridge.go +++ b/broker/eventbridge/examples/dequeue/event/eventbridge.go @@ -26,7 +26,7 @@ type EventNote struct { } func main() { - q, err := eventbridge.NewReader("swarm-example-eventbridge", + q, err := eventbridge.NewDequeuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithLogStdErr(), ), diff --git a/broker/eventbridge/examples/dequeue/typed/eventbridge.go b/broker/eventbridge/examples/dequeue/typed/eventbridge.go index 6c92c82..ad05755 100644 --- a/broker/eventbridge/examples/dequeue/typed/eventbridge.go +++ b/broker/eventbridge/examples/dequeue/typed/eventbridge.go @@ -32,7 +32,7 @@ type Like struct { } func main() { - q, err := eventbridge.NewReader("swarm-example-eventbridge", + q, err := eventbridge.NewDequeuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithLogStdErr(), ), diff --git a/broker/eventbridge/examples/enqueue/bytes/eventbridge.go b/broker/eventbridge/examples/enqueue/bytes/eventbridge.go index ae98608..75d9804 100644 --- a/broker/eventbridge/examples/enqueue/bytes/eventbridge.go +++ b/broker/eventbridge/examples/enqueue/bytes/eventbridge.go @@ -17,7 +17,7 @@ import ( ) func main() { - q, err := eventbridge.NewWriter("swarm-example-eventbridge", + q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithSource("swarm-example-eventbridge"), swarm.WithLogStdErr(), diff --git a/broker/eventbridge/examples/enqueue/event/eventbridge.go b/broker/eventbridge/examples/enqueue/event/eventbridge.go index ef942ac..84b1bd3 100644 --- a/broker/eventbridge/examples/enqueue/event/eventbridge.go +++ b/broker/eventbridge/examples/enqueue/event/eventbridge.go @@ -24,7 +24,7 @@ type EventNote struct { } func main() { - q, err := eventbridge.NewWriter("swarm-example-eventbridge", + q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithSource("swarm-example-eventbridge"), swarm.WithLogStdErr(), diff --git a/broker/eventbridge/examples/enqueue/typed/eventbridge.go b/broker/eventbridge/examples/enqueue/typed/eventbridge.go index 2e26aa1..7de27d5 100644 --- a/broker/eventbridge/examples/enqueue/typed/eventbridge.go +++ b/broker/eventbridge/examples/enqueue/typed/eventbridge.go @@ -32,7 +32,7 @@ type Like struct { } func main() { - q, err := eventbridge.NewWriter("swarm-example-eventbridge", + q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge", eventbridge.WithConfig( swarm.WithSource("swarm-example-eventbridge"), swarm.WithLogStdErr(), diff --git a/broker/eventbridge/go.mod b/broker/eventbridge/go.mod index ba646ad..2721fff 100644 --- a/broker/eventbridge/go.mod +++ b/broker/eventbridge/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/jsii-runtime-go v1.103.1 github.com/fogfish/it/v2 v2.0.2 github.com/fogfish/scud v0.10.2 - github.com/fogfish/swarm v0.20.0 + github.com/fogfish/swarm v0.20.1 ) require ( diff --git a/broker/eventbridge/go.sum b/broker/eventbridge/go.sum index c770551..25465a2 100644 --- a/broker/eventbridge/go.sum +++ b/broker/eventbridge/go.sum @@ -68,8 +68,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w= github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs= -github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8= -github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo= +github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0= +github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/broker/eventbridge/version.go b/broker/eventbridge/version.go index 492a814..7cefeaa 100644 --- a/broker/eventbridge/version.go +++ b/broker/eventbridge/version.go @@ -8,4 +8,4 @@ package eventbridge -const Version = "broker/eventbridge/v0.20.0" +const Version = "broker/eventbridge/v0.20.1"