Skip to content

Commit

Permalink
Update eventbridge to kernel v0.20.1
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 30, 2024
1 parent c50a685 commit 75d8d12
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 16 deletions.
35 changes: 35 additions & 0 deletions broker/eventbridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# AWS EventBridge Broker

The sub-module implements swarm broker for AWS EventBridge. See [the library documentation](../../README.md) for details.


## Serverless


**AWS Event Bridge** has a feature that allows to [match execution of consumer to the pattern](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html#CloudWatchEventsPatterns) of JSON object. Use it to build reliable matching of incoming events:

```go
/*
enq <- &swarm.Event[User]{
Meta: &swarm.Meta{Agent: "swarm:example", Participant: "user"},
Data: &User{ID: "user", Text: "some text"},
}
*/

stack.NewSink(
&eventbridge.SinkProps{
Pattern: map[string]interface{}{
"@type": []string{"[User]"},
"agent": []string{"[swarm:example]"},
"participant": []string{"[user]"},
},
/* ... */
},
)
```


## Limitation

AWS EventBridge cannot transmit bytes. They have to be enveloped as JSON object.
The library implements automatic encapsulation using built-in `kernel/encoding/CodecPacket`. Use these codec (`WithPacketCodec`) if you automatically source events from EventBridge to SQS.
2 changes: 1 addition & 1 deletion broker/eventbridge/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func WithConfig(opts ...swarm.Option) Option {

// Mandatory overrides
config.PollFrequency = 5 * time.Microsecond
config.Codec = encoding.NewCodecPacket()
config.PacketCodec = encoding.NewCodecPacket()

c.config = config
}
Expand Down
4 changes: 2 additions & 2 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client struct {
}

// Create writer to AWS EventBridge
func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) {
func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(queue, opts...)
if err != nil {
return nil, err
Expand All @@ -45,7 +45,7 @@ func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) {
}

// Create reader from AWS EventBridge
func NewReader(queue string, opts ...Option) (*kernel.Dequeuer, error) {
func NewDequeuer(queue string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: queue}

for _, opt := range defs {
Expand Down
6 changes: 3 additions & 3 deletions broker/eventbridge/eventbridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestReader(t *testing.T) {
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}

t.Run("New", func(t *testing.T) {
q, err := NewReader("test")
q, err := NewDequeuer("test")
it.Then(t).Should(it.Nil(err))
q.Close()
})
Expand Down Expand Up @@ -78,15 +78,15 @@ func TestReader(t *testing.T) {

func TestWriter(t *testing.T) {
t.Run("New", func(t *testing.T) {
q, err := NewWriter("test")
q, err := NewEnqueuer("test")
it.Then(t).Should(it.Nil(err))
q.Close()
})

t.Run("Enqueue", func(t *testing.T) {
mock := &mockEventBridge{}

q, err := NewWriter("test", WithService(mock))
q, err := NewEnqueuer("test", WithService(mock))
it.Then(t).Should(it.Nil(err))

err = q.Emitter.Enq(context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/dequeue/bytes/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func main() {
q, err := eventbridge.NewReader("swarm-example-eventbridge",
q, err := eventbridge.NewDequeuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/dequeue/event/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type EventNote struct {
}

func main() {
q, err := eventbridge.NewReader("swarm-example-eventbridge",
q, err := eventbridge.NewDequeuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/dequeue/typed/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Like struct {
}

func main() {
q, err := eventbridge.NewReader("swarm-example-eventbridge",
q, err := eventbridge.NewDequeuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/enqueue/bytes/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func main() {
q, err := eventbridge.NewWriter("swarm-example-eventbridge",
q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithSource("swarm-example-eventbridge"),
swarm.WithLogStdErr(),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/enqueue/event/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type EventNote struct {
}

func main() {
q, err := eventbridge.NewWriter("swarm-example-eventbridge",
q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithSource("swarm-example-eventbridge"),
swarm.WithLogStdErr(),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/examples/enqueue/typed/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Like struct {
}

func main() {
q, err := eventbridge.NewWriter("swarm-example-eventbridge",
q, err := eventbridge.NewEnqueuer("swarm-example-eventbridge",
eventbridge.WithConfig(
swarm.WithSource("swarm-example-eventbridge"),
swarm.WithLogStdErr(),
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/jsii-runtime-go v1.103.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/eventbridge/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventbridge

const Version = "broker/eventbridge/v0.20.0"
const Version = "broker/eventbridge/v0.20.1"

0 comments on commit 75d8d12

Please sign in to comment.