Skip to content

Commit

Permalink
Update eventbridge to kernel v0.20.0 (#80)
Browse files Browse the repository at this point in the history
* Update eventbridge to kernel v0.20.0
* Examples of sending binary data
* Increase test coverage
  • Loading branch information
fogfish authored Sep 29, 2024
1 parent 803b8ab commit 56892d0
Show file tree
Hide file tree
Showing 18 changed files with 576 additions and 343 deletions.
3 changes: 2 additions & 1 deletion broker/eventbridge/awscdk.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down Expand Up @@ -79,6 +79,7 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
)

if props.Function != nil {
props.Function.Setenv(EnvEventBridge, *props.System.EventBusName())
sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

sink.Rule.AddTarget(awseventstargets.NewLambdaFunction(
Expand Down
7 changes: 4 additions & 3 deletions broker/eventbridge/awscdk_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down Expand Up @@ -27,10 +27,11 @@ func TestEventBridgeCDK(t *testing.T) {

broker.NewSink(
&eventbridge.SinkProps{
Source: []string{"swarm-example-eventbridge"},
Source: []string{"swarm-example-eventbridge"},
Categories: []string{"category"},
Function: &scud.FunctionGoProps{
SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge",
SourceCodeLambda: "examples/dequeue",
SourceCodeLambda: "examples/dequeue/typed",
},
},
)
Expand Down
52 changes: 52 additions & 0 deletions broker/eventbridge/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventbridge

import (
"os"
"time"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

type Option func(*Client)

var defs = []Option{WithConfig(), WithEnv()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

// Mandatory overrides
config.PollFrequency = 5 * time.Microsecond
config.Codec = encoding.NewCodecPacket()

c.config = config
}
}

func WithService(service EventBridge) Option {
return func(c *Client) {
c.service = service
}
}

const EnvEventBridge = "CONFIG_SWARM_EVENT_BRIDGE"

func WithEnv() Option {
return func(c *Client) {
if val, has := os.LookupEnv(EnvEventBridge); has {
c.bus = val
}
}
}
107 changes: 43 additions & 64 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand Down Expand Up @@ -27,75 +27,63 @@ type EventBridge interface {
PutEvents(context.Context, *eventbridge.PutEventsInput, ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error)
}

// EventBridge client
type Client struct {
service EventBridge
bus string
config swarm.Config
}

func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
cli, err := NewEventBridge(queue, opts...)
// Create writer to AWS EventBridge
func NewWriter(queue string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(queue, opts...)
if err != nil {
return nil, err
}

config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}
return kernel.NewEnqueuer(cli, cli.config), nil
}

starter := lambda.Start
// Create reader from AWS EventBridge
func NewReader(queue string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: queue}

type Mock interface{ Start(interface{}) }
if config.Service != nil {
service, ok := config.Service.(Mock)
if ok {
starter = service.Start
}
for _, opt := range defs {
opt(c)
}

sls := spawner{f: starter, c: config}

return kernel.New(cli, sls, config), err
}

func NewEventBridge(bus string, opts ...swarm.Option) (*Client, error) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
opt(c)
}

api, err := newService(&config)
if err != nil {
return nil, err
}
bridge := &bridge{kernel.NewBridge(c.config.TimeToFlight)}

return &Client{
service: api,
bus: bus,
config: config,
}, nil
return kernel.NewDequeuer(bridge, c.config), nil
}

func newService(conf *swarm.Config) (EventBridge, error) {
if conf.Service != nil {
service, ok := conf.Service.(EventBridge)
if ok {
return service, nil
}
func newEventBridge(queue string, opts ...Option) (*Client, error) {
c := &Client{bus: queue}

for _, opt := range defs {
opt(c)
}
for _, opt := range opts {
opt(c)
}

aws, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, swarm.ErrServiceIO.New(err)
if c.service == nil {
aws, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, swarm.ErrServiceIO.New(err)
}
c.service = eventbridge.NewFromConfig(aws)
}

return eventbridge.NewFromConfig(aws), nil
return c, nil
}

// Enq enqueues message to broker
func (cli *Client) Enq(bag swarm.Bag) error {
ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout)
func (cli *Client) Enq(ctx context.Context, bag swarm.Bag) error {
ctx, cancel := context.WithTimeout(ctx, cli.config.NetworkTimeout)
defer cancel()

ret, err := cli.service.PutEvents(ctx,
Expand All @@ -104,7 +92,7 @@ func (cli *Client) Enq(bag swarm.Bag) error {
{
EventBusName: aws.String(cli.bus),
Source: aws.String(cli.config.Source),
DetailType: aws.String(bag.Ctx.Category),
DetailType: aws.String(bag.Category),
Detail: aws.String(string(bag.Object)),
},
},
Expand All @@ -126,26 +114,17 @@ func (cli *Client) Enq(bag swarm.Bag) error {

//------------------------------------------------------------------------------

type spawner struct {
c swarm.Config
f func(any)
}
type bridge struct{ *kernel.Bridge }

func (s spawner) Spawn(k *kernel.Kernel) error {
s.f(
func(evt events.CloudWatchEvent) error {
bag := make([]swarm.Bag, 1)
bag[0] = swarm.Bag{
Ctx: swarm.NewContext(context.Background(), evt.DetailType, evt.ID),
Object: evt.Detail,
}
func (s bridge) Run() { lambda.Start(s.run) }

return k.Dispatch(bag, s.c.TimeToFlight)
},
)
func (s bridge) run(evt events.CloudWatchEvent) error {
bag := make([]swarm.Bag, 1)
bag[0] = swarm.Bag{
Category: evt.DetailType,
Digest: evt.ID,
Object: evt.Detail,
}

return nil
return s.Bridge.Dispatch(bag)
}

func (s spawner) Ack(digest string) error { return nil }
func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil }
Loading

0 comments on commit 56892d0

Please sign in to comment.