Skip to content

Commit

Permalink
AWS DynamoDB Streams (#43)
Browse files Browse the repository at this point in the history
* support AWS DynamoDB Streams
* fix S3 events
* update CI/CD
  • Loading branch information
fogfish authored Nov 18, 2023
1 parent eae5c21 commit 4ab0c65
Show file tree
Hide file tree
Showing 17 changed files with 523 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/check-code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ jobs:
runs-on: ubuntu-latest
steps:

- uses: actions/setup-go@v2
- uses: actions/setup-go@v4
with:
go-version: "1.21"

- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: dominikh/[email protected]
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/check-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ jobs:
runs-on: ubuntu-latest
steps:

- uses: actions/setup-go@v2
- uses: actions/setup-go@v4
with:
go-version: "1.21"

- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: go build
run: |
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ stack.NewSink(
- [x] [consume message](examples/sqs/dequeue/sqs.go)
- [ ] AWS SNS
- [ ] produce message
- [x] AWS S3 Event (serverless)
- [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda
- [x] [aws cdk construct](./examples/events3/serverless/main.go)
- [x] AWS DynamoDB Streams (serverless)
- [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda
- [x] [aws cdk construct](./examples/events3/serverless/main.go)
- [ ] AWS Kinesis (serverless)
- [ ] produce message
- [ ] consume message using aws lambda
Expand Down
231 changes: 231 additions & 0 deletions broker/eventddb/awscdk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb

import (
"os"
"strconv"
"strings"

"github.com/aws/aws-cdk-go/awscdk/v2"
"github.com/aws/aws-cdk-go/awscdk/v2/awsdynamodb"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambda"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambdaeventsources"
"github.com/aws/constructs-go/constructs/v10"
"github.com/aws/jsii-runtime-go"
"github.com/fogfish/scud"
)

//------------------------------------------------------------------------------
//
// AWS CDK Sink Construct
//
//------------------------------------------------------------------------------

type Sink struct {
constructs.Construct
Handler awslambda.IFunction
}

type SinkProps struct {
Table awsdynamodb.ITable
Lambda *scud.FunctionGoProps
EventSource *awslambdaeventsources.DynamoEventSourceProps
}

func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

sink.Handler = scud.NewFunctionGo(sink.Construct, jsii.String("Func"), props.Lambda)

eventsource := &awslambdaeventsources.DynamoEventSourceProps{
StartingPosition: awslambda.StartingPosition_LATEST,
}
if props.EventSource != nil {
eventsource = props.EventSource
}

source := awslambdaeventsources.NewDynamoEventSource(props.Table, eventsource)

sink.Handler.AddEventSource(source)

return sink
}

//------------------------------------------------------------------------------
//
// AWS CDK Stack Construct
//
//------------------------------------------------------------------------------

type ServerlessStackProps struct {
*awscdk.StackProps
Version string
System string
}

type ServerlessStack struct {
awscdk.Stack
acc int
removalPolicy awscdk.RemovalPolicy
Table awsdynamodb.ITable
}

func NewServerlessStack(app awscdk.App, id *string, props *ServerlessStackProps) *ServerlessStack {
sid := *id
if props.Version != "" {
sid = sid + "-" + props.Version
}

stack := &ServerlessStack{
Stack: awscdk.NewStack(app, jsii.String(sid), props.StackProps),
removalPolicy: awscdk.RemovalPolicy_RETAIN,
}

if strings.HasPrefix(props.Version, "pr") {
stack.removalPolicy = awscdk.RemovalPolicy_DESTROY
}

return stack
}

func (stack *ServerlessStack) NewTable(tableName ...string) awsdynamodb.ITable {
name := awscdk.Aws_STACK_NAME()
if len(tableName) > 0 {
name = &tableName[0]
}

stack.Table = awsdynamodb.NewTable(stack.Stack, jsii.String("Table"),
&awsdynamodb.TableProps{
TableName: name,
PartitionKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("prefix"),
},
SortKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("suffix"),
},
BillingMode: awsdynamodb.BillingMode_PAY_PER_REQUEST,
RemovalPolicy: stack.removalPolicy,
Stream: awsdynamodb.StreamViewType_NEW_IMAGE,
},
)

return stack.Table
}

func (stack *ServerlessStack) AddTable(tableName string) awsdynamodb.ITable {
stack.Table = awsdynamodb.Table_FromTableName(stack.Stack, jsii.String("Table"),
jsii.String(tableName),
)

return stack.Table
}

func (stack *ServerlessStack) NewGlobalTable(tableName ...string) awsdynamodb.ITable {
name := awscdk.Aws_STACK_NAME()
if len(tableName) > 0 {
name = &tableName[0]
}

stack.Table = awsdynamodb.NewTableV2(stack.Stack, jsii.String("Table"),
&awsdynamodb.TablePropsV2{
TableName: name,
PartitionKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("prefix"),
},
SortKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("suffix"),
},
Billing: awsdynamodb.Billing_OnDemand(),
RemovalPolicy: stack.removalPolicy,
DynamoStream: awsdynamodb.StreamViewType_NEW_IMAGE,
},
)

return stack.Table
}

func (stack *ServerlessStack) AddGlobalTable(tableName string) awsdynamodb.ITable {
stack.Table = awsdynamodb.TableV2_FromTableName(stack.Stack, jsii.String("Table"),
jsii.String(tableName),
)

return stack.Table
}

func (stack *ServerlessStack) NewSink(props *SinkProps) *Sink {
if stack.Table == nil {
panic("Table is not defined.")
}

props.Table = stack.Table

stack.acc++
name := "Sink" + strconv.Itoa(stack.acc)
sink := NewSink(stack.Stack, jsii.String(name), props)

return sink
}

//------------------------------------------------------------------------------
//
// AWS CDK App Construct
//
//------------------------------------------------------------------------------

type ServerlessApp struct {
awscdk.App
}

func NewServerlessApp() *ServerlessApp {
app := awscdk.NewApp(nil)
return &ServerlessApp{App: app}
}

func (app *ServerlessApp) NewStack(name string, props ...*awscdk.StackProps) *ServerlessStack {
config := &awscdk.StackProps{
Env: &awscdk.Environment{
Account: jsii.String(os.Getenv("CDK_DEFAULT_ACCOUNT")),
Region: jsii.String(os.Getenv("CDK_DEFAULT_REGION")),
},
}

if len(props) == 1 {
config = props[0]
}

return NewServerlessStack(app.App, jsii.String(name), &ServerlessStackProps{
StackProps: config,
Version: FromContextVsn(app),
System: name,
})
}

func FromContext(app awscdk.App, key string) string {
val := app.Node().TryGetContext(jsii.String(key))
switch v := val.(type) {
case string:
return v
default:
return ""
}
}

func FromContextVsn(app awscdk.App) string {
vsn := FromContext(app, "vsn")
if vsn == "" {
return "latest"
}

return vsn
}
46 changes: 46 additions & 0 deletions broker/eventddb/awscdk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb_test

import (
"testing"

"github.com/aws/aws-cdk-go/awscdk/v2/assertions"
"github.com/aws/jsii-runtime-go"
"github.com/fogfish/scud"
"github.com/fogfish/swarm/broker/eventddb"
)

func TestEventDdbCDK(t *testing.T) {
app := eventddb.NewServerlessApp()
stack := app.NewStack("swarm-example-eventddb", nil)
stack.NewGlobalTable()

stack.NewSink(
&eventddb.SinkProps{
Lambda: &scud.FunctionGoProps{
SourceCodePackage: "github.com/fogfish/swarm",
SourceCodeLambda: "examples/eventddb/dequeue",
},
},
)

require := map[*string]*float64{
jsii.String("AWS::DynamoDB::GlobalTable"): jsii.Number(1),
jsii.String("AWS::Lambda::EventSourceMapping"): jsii.Number(1),
jsii.String("AWS::IAM::Role"): jsii.Number(2),
jsii.String("AWS::Lambda::Function"): jsii.Number(2),
jsii.String("Custom::LogRetention"): jsii.Number(1),
}

template := assertions.Template_FromStack(stack.Stack, nil)
for key, val := range require {
template.ResourceCountIs(key, val)
}
}
Loading

0 comments on commit 4ab0c65

Please sign in to comment.