diff --git a/README.md b/README.md index e06e52a..8837ddb 100644 --- a/README.md +++ b/README.md @@ -404,6 +404,12 @@ stack.NewSink( - [x] [consume message](examples/sqs/dequeue/sqs.go) - [ ] AWS SNS - [ ] produce message +- [x] AWS S3 Event (serverless) + - [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda + - [x] [aws cdk construct](./examples/events3/serverless/main.go) +- [x] AWS DynamoDB Streams (serverless) + - [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda + - [x] [aws cdk construct](./examples/events3/serverless/main.go) - [ ] AWS Kinesis (serverless) - [ ] produce message - [ ] consume message using aws lambda diff --git a/broker/eventddb/awscdk.go b/broker/eventddb/awscdk.go new file mode 100644 index 0000000..83a14c0 --- /dev/null +++ b/broker/eventddb/awscdk.go @@ -0,0 +1,223 @@ +package eventddb + +import ( + "os" + "strconv" + "strings" + + "github.com/aws/aws-cdk-go/awscdk/v2" + "github.com/aws/aws-cdk-go/awscdk/v2/awsdynamodb" + "github.com/aws/aws-cdk-go/awscdk/v2/awslambda" + "github.com/aws/aws-cdk-go/awscdk/v2/awslambdaeventsources" + "github.com/aws/constructs-go/constructs/v10" + "github.com/aws/jsii-runtime-go" + "github.com/fogfish/scud" +) + +//------------------------------------------------------------------------------ +// +// AWS CDK Sink Construct +// +//------------------------------------------------------------------------------ + +type Sink struct { + constructs.Construct + Handler awslambda.IFunction +} + +type SinkProps struct { + Table awsdynamodb.ITable + Lambda *scud.FunctionGoProps + EventSource *awslambdaeventsources.DynamoEventSourceProps +} + +func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink { + sink := &Sink{Construct: constructs.NewConstruct(scope, id)} + + sink.Handler = scud.NewFunctionGo(sink.Construct, jsii.String("Func"), props.Lambda) + + eventsource := &awslambdaeventsources.DynamoEventSourceProps{ + StartingPosition: awslambda.StartingPosition_LATEST, + } + if props.EventSource != nil { + eventsource = props.EventSource + } + + source := awslambdaeventsources.NewDynamoEventSource(props.Table, eventsource) + + sink.Handler.AddEventSource(source) + + return sink +} + +//------------------------------------------------------------------------------ +// +// AWS CDK Stack Construct +// +//------------------------------------------------------------------------------ + +type ServerlessStackProps struct { + *awscdk.StackProps + Version string + System string +} + +type ServerlessStack struct { + awscdk.Stack + acc int + removalPolicy awscdk.RemovalPolicy + Table awsdynamodb.ITable +} + +func NewServerlessStack(app awscdk.App, id *string, props *ServerlessStackProps) *ServerlessStack { + sid := *id + if props.Version != "" { + sid = sid + "-" + props.Version + } + + stack := &ServerlessStack{ + Stack: awscdk.NewStack(app, jsii.String(sid), props.StackProps), + removalPolicy: awscdk.RemovalPolicy_RETAIN, + } + + if strings.HasPrefix(props.Version, "pr") { + stack.removalPolicy = awscdk.RemovalPolicy_DESTROY + } + + return stack +} + +func (stack *ServerlessStack) NewTable(tableName ...string) awsdynamodb.ITable { + name := awscdk.Aws_STACK_NAME() + if len(tableName) > 0 { + name = &tableName[0] + } + + stack.Table = awsdynamodb.NewTable(stack.Stack, jsii.String("Table"), + &awsdynamodb.TableProps{ + TableName: name, + PartitionKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("prefix"), + }, + SortKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("suffix"), + }, + BillingMode: awsdynamodb.BillingMode_PAY_PER_REQUEST, + RemovalPolicy: stack.removalPolicy, + Stream: awsdynamodb.StreamViewType_NEW_IMAGE, + }, + ) + + return stack.Table +} + +func (stack *ServerlessStack) AddTable(tableName string) awsdynamodb.ITable { + stack.Table = awsdynamodb.Table_FromTableName(stack.Stack, jsii.String("Table"), + jsii.String(tableName), + ) + + return stack.Table +} + +func (stack *ServerlessStack) NewGlobalTable(tableName ...string) awsdynamodb.ITable { + name := awscdk.Aws_STACK_NAME() + if len(tableName) > 0 { + name = &tableName[0] + } + + stack.Table = awsdynamodb.NewTableV2(stack.Stack, jsii.String("Table"), + &awsdynamodb.TablePropsV2{ + TableName: name, + PartitionKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("prefix"), + }, + SortKey: &awsdynamodb.Attribute{ + Type: awsdynamodb.AttributeType_STRING, + Name: jsii.String("suffix"), + }, + Billing: awsdynamodb.Billing_OnDemand(), + RemovalPolicy: stack.removalPolicy, + DynamoStream: awsdynamodb.StreamViewType_NEW_IMAGE, + }, + ) + + return stack.Table +} + +func (stack *ServerlessStack) AddGlobalTable(tableName string) awsdynamodb.ITable { + stack.Table = awsdynamodb.TableV2_FromTableName(stack.Stack, jsii.String("Table"), + jsii.String(tableName), + ) + + return stack.Table +} + +func (stack *ServerlessStack) NewSink(props *SinkProps) *Sink { + if stack.Table == nil { + panic("Table is not defined.") + } + + props.Table = stack.Table + + stack.acc++ + name := "Sink" + strconv.Itoa(stack.acc) + sink := NewSink(stack.Stack, jsii.String(name), props) + + return sink +} + +//------------------------------------------------------------------------------ +// +// AWS CDK App Construct +// +//------------------------------------------------------------------------------ + +type ServerlessApp struct { + awscdk.App +} + +func NewServerlessApp() *ServerlessApp { + app := awscdk.NewApp(nil) + return &ServerlessApp{App: app} +} + +func (app *ServerlessApp) NewStack(name string, props ...*awscdk.StackProps) *ServerlessStack { + config := &awscdk.StackProps{ + Env: &awscdk.Environment{ + Account: jsii.String(os.Getenv("CDK_DEFAULT_ACCOUNT")), + Region: jsii.String(os.Getenv("CDK_DEFAULT_REGION")), + }, + } + + if len(props) == 1 { + config = props[0] + } + + return NewServerlessStack(app.App, jsii.String(name), &ServerlessStackProps{ + StackProps: config, + Version: FromContextVsn(app), + System: name, + }) +} + +func FromContext(app awscdk.App, key string) string { + val := app.Node().TryGetContext(jsii.String(key)) + switch v := val.(type) { + case string: + return v + default: + return "" + } +} + +func FromContextVsn(app awscdk.App) string { + vsn := FromContext(app, "vsn") + if vsn == "" { + return "latest" + } + + return vsn +} diff --git a/broker/eventddb/awscdk_test.go b/broker/eventddb/awscdk_test.go new file mode 100644 index 0000000..5290868 --- /dev/null +++ b/broker/eventddb/awscdk_test.go @@ -0,0 +1,46 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb_test + +import ( + "testing" + + "github.com/aws/aws-cdk-go/awscdk/v2/assertions" + "github.com/aws/jsii-runtime-go" + "github.com/fogfish/scud" + "github.com/fogfish/swarm/broker/eventddb" +) + +func TestEventDdbCDK(t *testing.T) { + app := eventddb.NewServerlessApp() + stack := app.NewStack("swarm-example-eventddb", nil) + stack.NewGlobalTable() + + stack.NewSink( + &eventddb.SinkProps{ + Lambda: &scud.FunctionGoProps{ + SourceCodePackage: "github.com/fogfish/swarm", + SourceCodeLambda: "examples/eventddb/dequeue", + }, + }, + ) + + require := map[*string]*float64{ + jsii.String("AWS::DynamoDB::GlobalTable"): jsii.Number(1), + jsii.String("AWS::Lambda::EventSourceMapping"): jsii.Number(1), + jsii.String("AWS::IAM::Role"): jsii.Number(2), + jsii.String("AWS::Lambda::Function"): jsii.Number(2), + jsii.String("Custom::LogRetention"): jsii.Number(1), + } + + template := assertions.Template_FromStack(stack.Stack, nil) + for key, val := range require { + template.ResourceCountIs(key, val) + } +} diff --git a/broker/eventddb/broker.go b/broker/eventddb/broker.go new file mode 100644 index 0000000..004a8b3 --- /dev/null +++ b/broker/eventddb/broker.go @@ -0,0 +1,106 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "context" + "log/slog" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/fogfish/curie" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/router" +) + +// New creates broker for AWS EventBridge +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + conf := swarm.NewConfig() + for _, opt := range opts { + opt(&conf) + } + + ctx, can := context.WithCancel(context.Background()) + + slog.Info("Broker is created", "type", "ddbstream") + return &broker{ + config: conf, + channels: swarm.NewChannels(), + context: ctx, + cancel: can, + router: router.New(&conf, nil), + }, nil +} + +type broker struct { + config swarm.Config + channels *swarm.Channels + context context.Context + cancel context.CancelFunc + router *router.Router +} + +func (b *broker) Config() swarm.Config { + return b.config +} + +func (b *broker) Close() { + b.channels.Sync() + b.channels.Close() + b.cancel() +} + +func (b *broker) DSync() { + b.channels.Sync() +} + +func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { + panic("not implemented") +} + +func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { + b.channels.Attach(category, channel) + b.router.Register(category) + + return b.router +} + +func (b *broker) Await() { + starter := lambda.Start + + type Mock interface{ Start(interface{}) } + if b.config.Service != nil { + service, ok := b.config.Service.(Mock) + if ok { + starter = service.Start + } + } + + starter( + func(events events.DynamoDBEvent) error { + for _, evt := range events.Records { + bag := swarm.Bag{ + Category: Category, + Event: &Event{ + ID: evt.EventID, + Type: curie.IRI(evt.EventName), + Agent: curie.IRI(evt.EventSourceArn), + Object: &evt, + }, + Digest: swarm.Digest{Brief: evt.EventID}, + } + if err := b.router.Dispatch(bag); err != nil { + return err + } + } + + return b.router.Await(b.config.TimeToFlight) + }, + ) +} diff --git a/broker/eventddb/type.go b/broker/eventddb/type.go new file mode 100644 index 0000000..dead4e3 --- /dev/null +++ b/broker/eventddb/type.go @@ -0,0 +1,26 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "github.com/aws/aws-lambda-go/events" + "github.com/fogfish/swarm" + queue "github.com/fogfish/swarm/queue/events" +) + +const Category = "eventddb.Event" + +type Event swarm.Event[*events.DynamoDBEventRecord] + +func (Event) HKT1(swarm.EventType) {} +func (Event) HKT2(*events.DynamoDBEventRecord) {} + +func Dequeue(q swarm.Broker) (<-chan *Event, chan<- *Event) { + return queue.Dequeue[*events.DynamoDBEventRecord, Event](q) +} diff --git a/broker/events3/awscdk_test.go b/broker/events3/awscdk_test.go index cb04d68..16f2331 100644 --- a/broker/events3/awscdk_test.go +++ b/broker/events3/awscdk_test.go @@ -17,7 +17,7 @@ import ( "github.com/fogfish/swarm/broker/events3" ) -func TestEventBridgeCDK(t *testing.T) { +func TestEventS3CDK(t *testing.T) { app := events3.NewServerlessApp() stack := app.NewStack("swarm-example-events3", nil) stack.NewBucket() diff --git a/broker/events3/type.go b/broker/events3/type.go index 395e4ab..680d08f 100644 --- a/broker/events3/type.go +++ b/broker/events3/type.go @@ -6,7 +6,7 @@ import ( queue "github.com/fogfish/swarm/queue/events" ) -const Category = "s3eventrecord:Event" +const Category = "events3:Event" type Event swarm.Event[*events.S3EventRecord] diff --git a/examples/README.md b/examples/README.md index 10f9a63..cdc1e05 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,6 +13,12 @@ - AWS SQS - [produce message](examples/sqs/enqueue/sqs.go) - [consume message](examples/sqs/dequeue/sqs.go) +- AWS S3 Serverless + - [consume message](./events3/dequeue/ddbstream.go) + - [serverless app](./events3/serverless/main.go) +- AWS DynamoDB Stream Serverless + - [consume message](./eventddb/dequeue/ddbstream.go) + - [serverless app](./eventddb/serverless/main.go) ## Examples about different data types diff --git a/examples/eventddb/dequeue/eventddb.go b/examples/eventddb/dequeue/eventddb.go new file mode 100644 index 0000000..9e99ef4 --- /dev/null +++ b/examples/eventddb/dequeue/eventddb.go @@ -0,0 +1,38 @@ +// +// Copyright (C) 2021 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "encoding/json" + "fmt" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/eventddb" + "github.com/fogfish/swarm/internal/qtest" + "github.com/fogfish/swarm/queue" +) + +func main() { + qtest.NewLogger() + + q := queue.Must(eventddb.New("swarm-test", swarm.WithLogStdErr())) + + go common(eventddb.Dequeue(q)) + + q.Await() +} + +func common(rcv <-chan *eventddb.Event, ack chan<- *eventddb.Event) { + for msg := range rcv { + + v, _ := json.MarshalIndent(msg, "", " ") + fmt.Printf("ddb event > \n %s\n", v) + ack <- msg + } +} diff --git a/examples/eventddb/serverless/cdk.json b/examples/eventddb/serverless/cdk.json new file mode 100644 index 0000000..db2c983 --- /dev/null +++ b/examples/eventddb/serverless/cdk.json @@ -0,0 +1,8 @@ +{ + "app": "go run main.go", + "requireApproval": "never", + "context": { + "@aws-cdk/core:stackRelativeExports": true, + "@aws-cdk/aws-lambda:recognizeVersionProps": true + } +} \ No newline at end of file diff --git a/examples/eventddb/serverless/main.go b/examples/eventddb/serverless/main.go new file mode 100644 index 0000000..b7c4b51 --- /dev/null +++ b/examples/eventddb/serverless/main.go @@ -0,0 +1,32 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package main + +import ( + "github.com/fogfish/scud" + "github.com/fogfish/swarm/broker/eventddb" +) + +func main() { + app := eventddb.NewServerlessApp() + + stack := app.NewStack("swarm-example-ddbstream") + stack.NewGlobalTable() + + stack.NewSink( + &eventddb.SinkProps{ + Lambda: &scud.FunctionGoProps{ + SourceCodePackage: "github.com/fogfish/swarm", + SourceCodeLambda: "examples/eventddb/dequeue", + }, + }, + ) + + app.Synth(nil) +} diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 6a44349..848fa32 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -83,17 +83,9 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c // normalized type name func categoryOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() - cat := typ.Name() if typ.Kind() == reflect.Ptr { - cat = typ.Elem().Name() + return typ.Elem().String() } - seq := strings.Split(strings.Trim(cat, "]"), "[") - tkn := make([]string, len(seq)) - for i, s := range seq { - r := strings.Split(s, ".") - tkn[i] = r[len(r)-1] - } - - return strings.Join(tkn, "[") + strings.Repeat("]", len(tkn)-1) + return typ.String() }