From 4ab0c6521e60f47055c16b8d9b1ac98e8643f6b5 Mon Sep 17 00:00:00 2001 From: fogfish Date: Sat, 18 Nov 2023 17:08:13 +0200 Subject: [PATCH] AWS DynamoDB Streams (#43) * support AWS DynamoDB Streams * fix S3 events * update CI/CD --- .github/workflows/check-code.yml | 4 +- .github/workflows/check-test.yml | 4 +- README.md | 6 + broker/eventddb/awscdk.go | 231 ++++++++++++++++++++++++++ broker/eventddb/awscdk_test.go | 46 +++++ broker/eventddb/broker.go | 106 ++++++++++++ broker/eventddb/type.go | 26 +++ broker/events3/awscdk.go | 1 + broker/events3/awscdk_test.go | 2 +- broker/events3/type.go | 2 +- examples/README.md | 6 + examples/eventddb/dequeue/eventddb.go | 38 +++++ examples/eventddb/serverless/cdk.json | 8 + examples/eventddb/serverless/main.go | 32 ++++ examples/events3/serverless/main.go | 2 +- internal/qtest/qtest.go | 18 +- queue/events/enqueue.go | 17 +- 17 files changed, 523 insertions(+), 26 deletions(-) create mode 100644 broker/eventddb/awscdk.go create mode 100644 broker/eventddb/awscdk_test.go create mode 100644 broker/eventddb/broker.go create mode 100644 broker/eventddb/type.go create mode 100644 examples/eventddb/dequeue/eventddb.go create mode 100644 examples/eventddb/serverless/cdk.json create mode 100644 examples/eventddb/serverless/main.go diff --git a/.github/workflows/check-code.yml b/.github/workflows/check-code.yml index 292d850..319b6a3 100644 --- a/.github/workflows/check-code.yml +++ b/.github/workflows/check-code.yml @@ -14,11 +14,11 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v2 + - uses: actions/setup-go@v4 with: go-version: "1.21" - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: dominikh/staticcheck-action@v1.3.0 with: diff --git a/.github/workflows/check-test.yml b/.github/workflows/check-test.yml index 5d41c86..9a3439e 100644 --- a/.github/workflows/check-test.yml +++ b/.github/workflows/check-test.yml @@ -19,11 +19,11 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v2 + - uses: actions/setup-go@v4 with: go-version: "1.21" - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: go build run: | diff --git a/README.md b/README.md index e06e52a..8837ddb 100644 --- a/README.md +++ b/README.md @@ -404,6 +404,12 @@ stack.NewSink( - [x] [consume message](examples/sqs/dequeue/sqs.go) - [ ] AWS SNS - [ ] produce message +- [x] AWS S3 Event (serverless) + - [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda + - [x] [aws cdk construct](./examples/events3/serverless/main.go) +- [x] AWS DynamoDB Streams (serverless) + - [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda + - [x] [aws cdk construct](./examples/events3/serverless/main.go) - [ ] AWS Kinesis (serverless) - [ ] produce message - [ ] consume message using aws lambda diff --git a/broker/eventddb/awscdk.go b/broker/eventddb/awscdk.go new file mode 100644 index 0000000..4dad904 --- /dev/null +++ b/broker/eventddb/awscdk.go @@ -0,0 +1,231 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "os" + "strconv" + "strings" + + "github.com/aws/aws-cdk-go/awscdk/v2" + "github.com/aws/aws-cdk-go/awscdk/v2/awsdynamodb" + "github.com/aws/aws-cdk-go/awscdk/v2/awslambda" + "github.com/aws/aws-cdk-go/awscdk/v2/awslambdaeventsources" + "github.com/aws/constructs-go/constructs/v10" + "github.com/aws/jsii-runtime-go" + "github.com/fogfish/scud" +) + +//------------------------------------------------------------------------------ +// +// AWS CDK Sink Construct +// +//------------------------------------------------------------------------------ + +type Sink struct { + constructs.Construct + Handler awslambda.IFunction +} + +type SinkProps struct { + Table awsdynamodb.ITable + Lambda *scud.FunctionGoProps + EventSource *awslambdaeventsources.DynamoEventSourceProps +} + +func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink { + sink := &Sink{Construct: constructs.NewConstruct(scope, id)} + + sink.Handler = scud.NewFunctionGo(sink.Construct, jsii.String("Func"), props.Lambda) + + eventsource := &awslambdaeventsources.DynamoEventSourceProps{ + StartingPosition: awslambda.StartingPosition_LATEST, + } + if props.EventSource != nil { + eventsource = props.EventSource + } + + source := awslambdaeventsources.NewDynamoEventSource(props.Table, eventsource) + + sink.Handler.AddEventSource(source) + + return sink +} + +//------------------------------------------------------------------------------ +// +// AWS CDK Stack Construct +// +//------------------------------------------------------------------------------ + +type ServerlessStackProps struct { + *awscdk.StackProps + Version string + System string +} + +type ServerlessStack struct { + awscdk.Stack + acc int + removalPolicy awscdk.RemovalPolicy + Table awsdynamodb.ITable +} + +func NewServerlessStack(app awscdk.App, id *string, props *ServerlessStackProps) *ServerlessStack { + sid := *id + if props.Version != "" { + sid = sid + "-" + props.Version + } + + stack := &ServerlessStack{ + Stack: awscdk.NewStack(app, jsii.String(sid), props.StackProps), + removalPolicy: awscdk.RemovalPolicy_RETAIN, + } + + if strings.HasPrefix(props.Version, "pr") { + stack.removalPolicy = awscdk.RemovalPolicy_DESTROY + } + + return stack +} + +func (stack *ServerlessStack) NewTable(tableName ...string) awsdynamodb.ITable { + name := awscdk.Aws_STACK_NAME() + if len(tableName) > 0 { + name = &tableName[0] + } + + stack.Table = awsdynamodb.NewTable(stack.Stack, jsii.String("Table"), + &awsdynamodb.TableProps{ + TableName: name, + PartitionKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("prefix"), + }, + SortKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("suffix"), + }, + BillingMode: awsdynamodb.BillingMode_PAY_PER_REQUEST, + RemovalPolicy: stack.removalPolicy, + Stream: awsdynamodb.StreamViewType_NEW_IMAGE, + }, + ) + + return stack.Table +} + +func (stack *ServerlessStack) AddTable(tableName string) awsdynamodb.ITable { + stack.Table = awsdynamodb.Table_FromTableName(stack.Stack, jsii.String("Table"), + jsii.String(tableName), + ) + + return stack.Table +} + +func (stack *ServerlessStack) NewGlobalTable(tableName ...string) awsdynamodb.ITable { + name := awscdk.Aws_STACK_NAME() + if len(tableName) > 0 { + name = &tableName[0] + } + + stack.Table = awsdynamodb.NewTableV2(stack.Stack, jsii.String("Table"), + &awsdynamodb.TablePropsV2{ + TableName: name, + PartitionKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("prefix"), + }, + SortKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("suffix"), + }, + Billing: awsdynamodb.Billing_OnDemand(), + RemovalPolicy: stack.removalPolicy, + DynamoStream: awsdynamodb.StreamViewType_NEW_IMAGE, + }, + ) + + return stack.Table +} + +func (stack *ServerlessStack) AddGlobalTable(tableName string) awsdynamodb.ITable { + stack.Table = awsdynamodb.TableV2_FromTableName(stack.Stack, jsii.String("Table"), + jsii.String(tableName), + ) + + return stack.Table +} + +func (stack *ServerlessStack) NewSink(props *SinkProps) *Sink { + if stack.Table == nil { + panic("Table is not defined.") + } + + props.Table = stack.Table + + stack.acc++ + name := "Sink" + strconv.Itoa(stack.acc) + sink := NewSink(stack.Stack, jsii.String(name), props) + + return sink +} + +//------------------------------------------------------------------------------ +// +// AWS CDK App Construct +// +//------------------------------------------------------------------------------ + +type ServerlessApp struct { + awscdk.App +} + +func NewServerlessApp() *ServerlessApp { + app := awscdk.NewApp(nil) + return &ServerlessApp{App: app} +} + +func (app *ServerlessApp) NewStack(name string, props ...*awscdk.StackProps) *ServerlessStack { + config := &awscdk.StackProps{ + Env: &awscdk.Environment{ + Account: jsii.String(os.Getenv("CDK_DEFAULT_ACCOUNT")), + Region: jsii.String(os.Getenv("CDK_DEFAULT_REGION")), + }, + } + + if len(props) == 1 { + config = props[0] + } + + return NewServerlessStack(app.App, jsii.String(name), &ServerlessStackProps{ + StackProps: config, + Version: FromContextVsn(app), + System: name, + }) +} + +func FromContext(app awscdk.App, key string) string { + val := app.Node().TryGetContext(jsii.String(key)) + switch v := val.(type) { + case string: + return v + default: + return "" + } +} + +func FromContextVsn(app awscdk.App) string { + vsn := FromContext(app, "vsn") + if vsn == "" { + return "latest" + } + + return vsn +} diff --git a/broker/eventddb/awscdk_test.go b/broker/eventddb/awscdk_test.go new file mode 100644 index 0000000..5290868 --- /dev/null +++ b/broker/eventddb/awscdk_test.go @@ -0,0 +1,46 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb_test + +import ( + "testing" + + "github.com/aws/aws-cdk-go/awscdk/v2/assertions" + "github.com/aws/jsii-runtime-go" + "github.com/fogfish/scud" + "github.com/fogfish/swarm/broker/eventddb" +) + +func TestEventDdbCDK(t *testing.T) { + app := eventddb.NewServerlessApp() + stack := app.NewStack("swarm-example-eventddb", nil) + stack.NewGlobalTable() + + stack.NewSink( + &eventddb.SinkProps{ + Lambda: &scud.FunctionGoProps{ + SourceCodePackage: "github.com/fogfish/swarm", + SourceCodeLambda: "examples/eventddb/dequeue", + }, + }, + ) + + require := map[*string]*float64{ + jsii.String("AWS::DynamoDB::GlobalTable"): jsii.Number(1), + jsii.String("AWS::Lambda::EventSourceMapping"): jsii.Number(1), + jsii.String("AWS::IAM::Role"): jsii.Number(2), + jsii.String("AWS::Lambda::Function"): jsii.Number(2), + jsii.String("Custom::LogRetention"): jsii.Number(1), + } + + template := assertions.Template_FromStack(stack.Stack, nil) + for key, val := range require { + template.ResourceCountIs(key, val) + } +} diff --git a/broker/eventddb/broker.go b/broker/eventddb/broker.go new file mode 100644 index 0000000..004a8b3 --- /dev/null +++ b/broker/eventddb/broker.go @@ -0,0 +1,106 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "context" + "log/slog" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/fogfish/curie" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/router" +) + +// New creates broker for AWS EventBridge +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + conf := swarm.NewConfig() + for _, opt := range opts { + opt(&conf) + } + + ctx, can := context.WithCancel(context.Background()) + + slog.Info("Broker is created", "type", "ddbstream") + return &broker{ + config: conf, + channels: swarm.NewChannels(), + context: ctx, + cancel: can, + router: router.New(&conf, nil), + }, nil +} + +type broker struct { + config swarm.Config + channels *swarm.Channels + context context.Context + cancel context.CancelFunc + router *router.Router +} + +func (b *broker) Config() swarm.Config { + return b.config +} + +func (b *broker) Close() { + b.channels.Sync() + b.channels.Close() + b.cancel() +} + +func (b *broker) DSync() { + b.channels.Sync() +} + +func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { + panic("not implemented") +} + +func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { + b.channels.Attach(category, channel) + b.router.Register(category) + + return b.router +} + +func (b *broker) Await() { + starter := lambda.Start + + type Mock interface{ Start(interface{}) } + if b.config.Service != nil { + service, ok := b.config.Service.(Mock) + if ok { + starter = service.Start + } + } + + starter( + func(events events.DynamoDBEvent) error { + for _, evt := range events.Records { + bag := swarm.Bag{ + Category: Category, + Event: &Event{ + ID: evt.EventID, + Type: curie.IRI(evt.EventName), + Agent: curie.IRI(evt.EventSourceArn), + Object: &evt, + }, + Digest: swarm.Digest{Brief: evt.EventID}, + } + if err := b.router.Dispatch(bag); err != nil { + return err + } + } + + return b.router.Await(b.config.TimeToFlight) + }, + ) +} diff --git a/broker/eventddb/type.go b/broker/eventddb/type.go new file mode 100644 index 0000000..dead4e3 --- /dev/null +++ b/broker/eventddb/type.go @@ -0,0 +1,26 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "github.com/aws/aws-lambda-go/events" + "github.com/fogfish/swarm" + queue "github.com/fogfish/swarm/queue/events" +) + +const Category = "eventddb.Event" + +type Event swarm.Event[*events.DynamoDBEventRecord] + +func (Event) HKT1(swarm.EventType) {} +func (Event) HKT2(*events.DynamoDBEventRecord) {} + +func Dequeue(q swarm.Broker) (<-chan *Event, chan<- *Event) { + return queue.Dequeue[*events.DynamoDBEventRecord, Event](q) +} diff --git a/broker/events3/awscdk.go b/broker/events3/awscdk.go index 5ee3cdb..923a92d 100644 --- a/broker/events3/awscdk.go +++ b/broker/events3/awscdk.go @@ -47,6 +47,7 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink { eventsource := &awslambdaeventsources.S3EventSourceProps{ Events: &[]awss3.EventType{ awss3.EventType_OBJECT_CREATED, + awss3.EventType_OBJECT_REMOVED, }, } if props.EventSource != nil { diff --git a/broker/events3/awscdk_test.go b/broker/events3/awscdk_test.go index cb04d68..16f2331 100644 --- a/broker/events3/awscdk_test.go +++ b/broker/events3/awscdk_test.go @@ -17,7 +17,7 @@ import ( "github.com/fogfish/swarm/broker/events3" ) -func TestEventBridgeCDK(t *testing.T) { +func TestEventS3CDK(t *testing.T) { app := events3.NewServerlessApp() stack := app.NewStack("swarm-example-events3", nil) stack.NewBucket() diff --git a/broker/events3/type.go b/broker/events3/type.go index 395e4ab..8dc09c4 100644 --- a/broker/events3/type.go +++ b/broker/events3/type.go @@ -6,7 +6,7 @@ import ( queue "github.com/fogfish/swarm/queue/events" ) -const Category = "s3eventrecord:Event" +const Category = "events3.Event" type Event swarm.Event[*events.S3EventRecord] diff --git a/examples/README.md b/examples/README.md index 10f9a63..cdc1e05 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,6 +13,12 @@ - AWS SQS - [produce message](examples/sqs/enqueue/sqs.go) - [consume message](examples/sqs/dequeue/sqs.go) +- AWS S3 Serverless + - [consume message](./events3/dequeue/ddbstream.go) + - [serverless app](./events3/serverless/main.go) +- AWS DynamoDB Stream Serverless + - [consume message](./eventddb/dequeue/ddbstream.go) + - [serverless app](./eventddb/serverless/main.go) ## Examples about different data types diff --git a/examples/eventddb/dequeue/eventddb.go b/examples/eventddb/dequeue/eventddb.go new file mode 100644 index 0000000..9e99ef4 --- /dev/null +++ b/examples/eventddb/dequeue/eventddb.go @@ -0,0 +1,38 @@ +// +// Copyright (C) 2021 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "encoding/json" + "fmt" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventddb" + "github.com/fogfish/swarm/internal/qtest" + "github.com/fogfish/swarm/queue" +) + +func main() { + qtest.NewLogger() + + q := queue.Must(eventddb.New("swarm-test", swarm.WithLogStdErr())) + + go common(eventddb.Dequeue(q)) + + q.Await() +} + +func common(rcv <-chan *eventddb.Event, ack chan<- *eventddb.Event) { + for msg := range rcv { + + v, _ := json.MarshalIndent(msg, "", " ") + fmt.Printf("ddb event > \n %s\n", v) + ack <- msg + } +} diff --git a/examples/eventddb/serverless/cdk.json b/examples/eventddb/serverless/cdk.json new file mode 100644 index 0000000..db2c983 --- /dev/null +++ b/examples/eventddb/serverless/cdk.json @@ -0,0 +1,8 @@ +{ + "app": "go run main.go", + "requireApproval": "never", + "context": { + "@aws-cdk/core:stackRelativeExports": true, + "@aws-cdk/aws-lambda:recognizeVersionProps": true + } +} \ No newline at end of file diff --git a/examples/eventddb/serverless/main.go b/examples/eventddb/serverless/main.go new file mode 100644 index 0000000..de780bd --- /dev/null +++ b/examples/eventddb/serverless/main.go @@ -0,0 +1,32 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "github.com/fogfish/scud" + "github.com/fogfish/swarm/broker/eventddb" +) + +func main() { + app := eventddb.NewServerlessApp() + + stack := app.NewStack("swarm-example-eventddb") + stack.NewGlobalTable() + + stack.NewSink( + &eventddb.SinkProps{ + Lambda: &scud.FunctionGoProps{ + SourceCodePackage: "github.com/fogfish/swarm", + SourceCodeLambda: "examples/eventddb/dequeue", + }, + }, + ) + + app.Synth(nil) +} diff --git a/examples/events3/serverless/main.go b/examples/events3/serverless/main.go index d3714e1..4b9c0f8 100644 --- a/examples/events3/serverless/main.go +++ b/examples/events3/serverless/main.go @@ -29,7 +29,7 @@ func main() { stack.NewSink( &events3.SinkProps{ - // Note: the default property of EventSource captures OBJECT_CREATED events + // Note: the default property of EventSource captures OBJECT_CREATED and OBJECT_REMOVED events Lambda: &scud.FunctionGoProps{ SourceCodePackage: "github.com/fogfish/swarm", SourceCodeLambda: "examples/events3/dequeue", diff --git a/internal/qtest/qtest.go b/internal/qtest/qtest.go index 4b2fc27..0703594 100644 --- a/internal/qtest/qtest.go +++ b/internal/qtest/qtest.go @@ -24,10 +24,9 @@ import ( const ( Category = "Note" Message = "{\"some\":\"message\"}" - Event = "" Receipt = "0x123456789abcdef" - EventCategory = "Event[Note]" + EventCategory = "qtest.EventNote" ) var ( @@ -42,6 +41,11 @@ type User struct { Some string `json:"some"` } +type EventNote swarm.Event[*Note] + +func (EventNote) HKT1(swarm.EventType) {} +func (EventNote) HKT2(*Note) {} + type effect = chan string type queueName = string type category = string @@ -124,11 +128,11 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) { q := factory(eff, "test-queue", EventCategory, retry200ms) - note, _ := events.Enqueue[*Note, swarm.Event[*Note]](q) + note, _ := events.Enqueue[*Note, EventNote](q) user, dlq := events.Enqueue[*User, swarm.Event[*User]](q) t.Run("Enqueue", func(t *testing.T) { - note <- &swarm.Event[*Note]{ + note <- &EventNote{ Object: &Note{Some: "message"}, } @@ -140,7 +144,7 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) { it.Then(t). Should(it.Nil(err)). Should(it.Equal(*val.Object, Note{Some: "message"})). - Should(it.Equal(val.Type, "note:Event[Note]")). + Should(it.Equal(val.Type, "qtest.EventNote")). ShouldNot(it.Equal(len(val.ID), 0)). ShouldNot(it.True(val.Created.IsZero())) @@ -231,7 +235,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) { eff := make(chan string, 1) t.Run("Typed", func(t *testing.T) { - event := swarm.Event[*Note]{ + event := &EventNote{ ID: "id", Type: "type", Agent: "agent", @@ -243,7 +247,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) { q := factory(eff, "test-queue", EventCategory, string(message), Receipt, retry200ms) - msg, ack := events.Dequeue[*Note, swarm.Event[*Note]](q) + msg, ack := events.Dequeue[*Note, EventNote](q) go q.Await() val := <-msg diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 6a44349..227e95d 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -29,7 +29,6 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c conf := q.Config() ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity) - catT := strings.ToLower(categoryOf[T]()) catE := categoryOf[E]() if len(category) > 0 { catE = category[0] @@ -45,7 +44,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c _, knd, src, _ := shape.Get(object) if knd == "" { - knd = curie.IRI(catT + ":" + catE) + knd = curie.IRI(catE) } if src == "" { @@ -83,17 +82,11 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c // normalized type name func categoryOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() - cat := typ.Name() + cat := typ.String() if typ.Kind() == reflect.Ptr { - cat = typ.Elem().Name() + cat = typ.Elem().String() } - seq := strings.Split(strings.Trim(cat, "]"), "[") - tkn := make([]string, len(seq)) - for i, s := range seq { - r := strings.Split(s, ".") - tkn[i] = r[len(r)-1] - } - - return strings.Join(tkn, "[") + strings.Repeat("]", len(tkn)-1) + seq := strings.Split(cat, "[") + return seq[0] }