Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor broker loop #47

Merged
merged 7 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (msg *Msg[T]) Fail(err error) *Msg[T] {
// Bag is used by the transport to abstract message on the wire.
type Bag struct {
Category string
Event any
Object []byte
Digest Digest
}
27 changes: 0 additions & 27 deletions broker.go

This file was deleted.

114 changes: 0 additions & 114 deletions broker/eventbridge/broker.go

This file was deleted.

78 changes: 72 additions & 6 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,65 @@ import (
"context"
"fmt"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/eventbridge"
"github.com/aws/aws-sdk-go-v2/service/eventbridge/types"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/kernel"
)

type client struct {
// EventBridge declares the subset of interface from AWS SDK used by the lib.
type EventBridge interface {
PutEvents(context.Context, *eventbridge.PutEventsInput, ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error)
}

type Client struct {
service EventBridge
bus string
config *swarm.Config
config swarm.Config
}

func newClient(bus string, config *swarm.Config) (*client, error) {
api, err := newService(config)
func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
cli, err := NewEventBridge(queue, opts...)
if err != nil {
return nil, err
}

return &client{
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

starter := lambda.Start

type Mock interface{ Start(interface{}) }
if config.Service != nil {
service, ok := config.Service.(Mock)
if ok {
starter = service.Start
}
}

sls := spawner{f: starter, c: config}

return kernel.New(cli, sls, config), err
}

func NewEventBridge(bus string, opts ...swarm.Option) (*Client, error) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

api, err := newService(&config)
if err != nil {
return nil, err
}

return &Client{
service: api,
bus: bus,
config: config,
Expand All @@ -55,7 +94,7 @@ func newService(conf *swarm.Config) (EventBridge, error) {
}

// Enq enqueues message to broker
func (cli *client) Enq(bag swarm.Bag) error {
func (cli *Client) Enq(bag swarm.Bag) error {
ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout)
defer cancel()

Expand All @@ -81,3 +120,30 @@ func (cli *client) Enq(bag swarm.Bag) error {

return nil
}

//------------------------------------------------------------------------------

type spawner struct {
c swarm.Config
f func(any)
}

func (s spawner) Spawn(k *kernel.Kernel) error {
s.f(
func(evt events.CloudWatchEvent) error {
bag := make([]swarm.Bag, 1)
bag[0] = swarm.Bag{
Category: evt.DetailType,
Object: evt.Detail,
Digest: swarm.Digest{Brief: evt.ID},
}

return k.Dispatch(bag, s.c.TimeToFlight)
},
)

return nil
}

func (s spawner) Ack(digest string) error { return nil }
func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil }
106 changes: 0 additions & 106 deletions broker/eventddb/broker.go

This file was deleted.

Loading
Loading