Skip to content

Commit

Permalink
Update eventsqs to kernel v0.20.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 29, 2024
1 parent 6a074f3 commit 4d25de2
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 148 deletions.
2 changes: 1 addition & 1 deletion broker/eventsqs/awscdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestEventBridgeCDK(t *testing.T) {
&eventsqs.SinkProps{
Function: &scud.FunctionGoProps{
SourceCodeModule: "github.com/fogfish/swarm/broker/eventsqs",
SourceCodeLambda: "examples/dequeue",
SourceCodeLambda: "examples/dequeue/typed",
},
},
)
Expand Down
33 changes: 33 additions & 0 deletions broker/eventsqs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventsqs

import (
"time"

"github.com/fogfish/swarm"
)

type Option func(*Client)

var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

// Mandatory overrides
config.PollFrequency = 5 * time.Microsecond

c.config = config
}
}
67 changes: 24 additions & 43 deletions broker/eventsqs/eventsqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,51 @@
package eventsqs

import (
"context"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/kernel"
)

type Client struct {
config swarm.Config
}

// New creates broker for AWS SQS (serverless events)
func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
cli, err := sqs.NewSQS(queue, opts...)
if err != nil {
return nil, err
}
func NewDequeuer(opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{}

config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
for _, opt := range defs {
opt(c)
}

starter := lambda.Start

type Mock interface{ Start(interface{}) }
if config.Service != nil {
service, ok := config.Service.(Mock)
if ok {
starter = service.Start
}
for _, opt := range opts {
opt(c)
}

sls := spawner{f: starter, c: config}
bridge := &bridge{kernel.NewBridge(c.config.TimeToFlight)}

return kernel.New(cli, sls, config), nil
return kernel.NewDequeuer(bridge, c.config), nil
}

//------------------------------------------------------------------------------

type spawner struct {
c swarm.Config
f func(any)
}
type bridge struct{ *kernel.Bridge }

func (s spawner) Spawn(k *kernel.Kernel) error {
s.f(
func(events events.SQSEvent) error {
bag := make([]swarm.Bag, len(events.Records))
for i, evt := range events.Records {
bag[i] = swarm.Bag{
Ctx: swarm.NewContext(context.Background(), attr(&evt, "Category"), evt.ReceiptHandle),
Object: []byte(evt.Body),
}
}
func (s bridge) Run() { lambda.Start(s.run) }

return k.Dispatch(bag, s.c.TimeToFlight)
},
)
func (s bridge) run(events events.SQSEvent) error {
bag := make([]swarm.Bag, len(events.Records))
for i, evt := range events.Records {
bag[i] = swarm.Bag{
Category: attr(&evt, "Category"),
Digest: evt.ReceiptHandle,
Object: []byte(evt.Body),
}
}

return nil
return s.Bridge.Dispatch(bag)
}

func (s spawner) Ack(digest string) error { return nil }
func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil }

func attr(msg *events.SQSMessage, key string) string {
val, exists := msg.MessageAttributes[key]
if !exists {
Expand Down
86 changes: 85 additions & 1 deletion broker/eventsqs/eventsqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,89 @@
// https://github.com/fogfish/swarm
//

package eventsqs_test
package eventsqs

import (
"context"
"testing"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel"
)

func TestReader(t *testing.T) {
var bag []swarm.Bag
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}

t.Run("New", func(t *testing.T) {
q, err := NewDequeuer()
it.Then(t).Should(it.Nil(err))
q.Close()
})

t.Run("Dequeue", func(t *testing.T) {
go func() {
bag, _ = bridge.Ask(context.Background())
for _, m := range bag {
bridge.Ack(context.Background(), m.Digest)
}
}()

err := bridge.run(
events.SQSEvent{
Records: []events.SQSMessage{
{
MessageId: "abc-def",
ReceiptHandle: "receipt",
Body: `{"sut":"test"}`,
MessageAttributes: map[string]events.SQSMessageAttribute{
"Category": {StringValue: aws.String("cat")},
},
},
},
},
)

it.Then(t).Should(
it.Nil(err),
it.Equal(len(bag), 1),
it.Equal(bag[0].Category, "cat"),
it.Equal(bag[0].Digest, "receipt"),
it.Equiv(bag[0].Object, []byte(`{"sut":"test"}`)),
)
})

t.Run("Dequeue.Timeout", func(t *testing.T) {
go func() {
bag, _ = bridge.Ask(context.Background())
}()

err := bridge.run(
events.SQSEvent{
Records: []events.SQSMessage{
{
MessageId: "abc-def",
ReceiptHandle: "receipt",
Body: `{"sut":"test"}`,
MessageAttributes: map[string]events.SQSMessageAttribute{
"Category": {StringValue: aws.String("cat")},
},
},
},
},
)

it.Then(t).ShouldNot(
it.Nil(err),
)
})
}

/*
import (
"context"
Expand Down Expand Up @@ -86,3 +168,5 @@ func (mock *mockLambda) Start(handler interface{}) {
mock.loopback <- mock.returnReceipt
}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventsqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/dequeue"
)

type User struct {
Expand All @@ -32,11 +32,19 @@ type Like struct {
}

func main() {
q := queue.Must(eventsqs.New("swarm-example-sqs-latest", swarm.WithLogStdErr()))
q, err := eventsqs.NewDequeuer(
eventsqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("eventsqs reader has failed", "err", err)
return
}

go actor[User]("user").handle(queue.Dequeue[User](q))
go actor[Note]("note").handle(queue.Dequeue[Note](q))
go actor[Like]("like").handle(queue.Dequeue[Like](q))
go actor[User]("user").handle(dequeue.Typed[User](q))
go actor[Note]("note").handle(dequeue.Typed[Note](q))
go actor[Like]("like").handle(dequeue.Typed[Like](q))

q.Await()
}
Expand Down
46 changes: 0 additions & 46 deletions broker/eventsqs/examples/enqueue/eventsqs.go

This file was deleted.

20 changes: 2 additions & 18 deletions broker/eventsqs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,15 @@ require (
github.com/aws/aws-cdk-go/awscdk/v2 v2.160.0
github.com/aws/aws-lambda-go v1.47.0
github.com/aws/aws-sdk-go-v2 v1.31.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.1
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.103.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.1
github.com/fogfish/swarm v0.16.0
github.com/fogfish/swarm/broker/sqs v0.16.1
github.com/fogfish/swarm/qtest v0.16.1
github.com/fogfish/swarm/queue v0.16.1
github.com/fogfish/swarm v0.20.0
)

require (
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.37 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.35 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.31.1 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/cdklabs/awscdk-asset-awscli-go/awscliv1/v2 v2.2.202 // indirect
github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect
Expand All @@ -39,9 +25,7 @@ require (
github.com/fogfish/faults v0.2.0 // indirect
github.com/fogfish/golem/hseq v1.2.0 // indirect
github.com/fogfish/golem/optics v0.13.0 // indirect
github.com/fogfish/golem/pure v0.10.1 // indirect
github.com/fogfish/guid/v2 v2.0.4 // indirect
github.com/fogfish/it/v2 v2.0.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/yuin/goldmark v1.5.3 // indirect
Expand Down
Loading

0 comments on commit 4d25de2

Please sign in to comment.