Skip to content

Commit

Permalink
AWS DynamoDB Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Nov 18, 2023
1 parent eae5c21 commit 3d9a359
Show file tree
Hide file tree
Showing 12 changed files with 495 additions and 12 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ stack.NewSink(
- [x] [consume message](examples/sqs/dequeue/sqs.go)
- [ ] AWS SNS
- [ ] produce message
- [x] AWS S3 Event (serverless)
- [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda
- [x] [aws cdk construct](./examples/events3/serverless/main.go)
- [x] AWS DynamoDB Streams (serverless)
- [x] [consume message](./examples/events3/dequeue/events3.go) using aws lambda
- [x] [aws cdk construct](./examples/events3/serverless/main.go)
- [ ] AWS Kinesis (serverless)
- [ ] produce message
- [ ] consume message using aws lambda
Expand Down
223 changes: 223 additions & 0 deletions broker/eventddb/awscdk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package eventddb

import (
"os"
"strconv"
"strings"

"github.com/aws/aws-cdk-go/awscdk/v2"
"github.com/aws/aws-cdk-go/awscdk/v2/awsdynamodb"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambda"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambdaeventsources"
"github.com/aws/constructs-go/constructs/v10"
"github.com/aws/jsii-runtime-go"
"github.com/fogfish/scud"
)

//------------------------------------------------------------------------------
//
// AWS CDK Sink Construct
//
//------------------------------------------------------------------------------

type Sink struct {
constructs.Construct
Handler awslambda.IFunction
}

type SinkProps struct {
Table awsdynamodb.ITable
Lambda *scud.FunctionGoProps
EventSource *awslambdaeventsources.DynamoEventSourceProps
}

func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

sink.Handler = scud.NewFunctionGo(sink.Construct, jsii.String("Func"), props.Lambda)

eventsource := &awslambdaeventsources.DynamoEventSourceProps{
StartingPosition: awslambda.StartingPosition_LATEST,
}
if props.EventSource != nil {
eventsource = props.EventSource
}

source := awslambdaeventsources.NewDynamoEventSource(props.Table, eventsource)

sink.Handler.AddEventSource(source)

return sink
}

//------------------------------------------------------------------------------
//
// AWS CDK Stack Construct
//
//------------------------------------------------------------------------------

type ServerlessStackProps struct {
*awscdk.StackProps
Version string
System string
}

type ServerlessStack struct {
awscdk.Stack
acc int
removalPolicy awscdk.RemovalPolicy
Table awsdynamodb.ITable
}

func NewServerlessStack(app awscdk.App, id *string, props *ServerlessStackProps) *ServerlessStack {
sid := *id
if props.Version != "" {
sid = sid + "-" + props.Version
}

stack := &ServerlessStack{
Stack: awscdk.NewStack(app, jsii.String(sid), props.StackProps),
removalPolicy: awscdk.RemovalPolicy_RETAIN,
}

if strings.HasPrefix(props.Version, "pr") {
stack.removalPolicy = awscdk.RemovalPolicy_DESTROY
}

return stack
}

func (stack *ServerlessStack) NewTable(tableName ...string) awsdynamodb.ITable {
name := awscdk.Aws_STACK_NAME()
if len(tableName) > 0 {
name = &tableName[0]
}

stack.Table = awsdynamodb.NewTable(stack.Stack, jsii.String("Table"),
&awsdynamodb.TableProps{
TableName: name,
PartitionKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("prefix"),
},
SortKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("suffix"),
},
BillingMode: awsdynamodb.BillingMode_PAY_PER_REQUEST,
RemovalPolicy: stack.removalPolicy,
Stream: awsdynamodb.StreamViewType_NEW_IMAGE,
},
)

return stack.Table
}

func (stack *ServerlessStack) AddTable(tableName string) awsdynamodb.ITable {
stack.Table = awsdynamodb.Table_FromTableName(stack.Stack, jsii.String("Table"),
jsii.String(tableName),
)

return stack.Table
}

func (stack *ServerlessStack) NewGlobalTable(tableName ...string) awsdynamodb.ITable {
name := awscdk.Aws_STACK_NAME()
if len(tableName) > 0 {
name = &tableName[0]
}

stack.Table = awsdynamodb.NewTableV2(stack.Stack, jsii.String("Table"),
&awsdynamodb.TablePropsV2{
TableName: name,
PartitionKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("prefix"),
},
SortKey: &awsdynamodb.Attribute{
Type: awsdynamodb.AttributeType_STRING,
Name: jsii.String("suffix"),
},
Billing: awsdynamodb.Billing_OnDemand(),
RemovalPolicy: stack.removalPolicy,
DynamoStream: awsdynamodb.StreamViewType_NEW_IMAGE,
},
)

return stack.Table
}

func (stack *ServerlessStack) AddGlobalTable(tableName string) awsdynamodb.ITable {
stack.Table = awsdynamodb.TableV2_FromTableName(stack.Stack, jsii.String("Table"),
jsii.String(tableName),
)

return stack.Table
}

func (stack *ServerlessStack) NewSink(props *SinkProps) *Sink {
if stack.Table == nil {
panic("Table is not defined.")
}

props.Table = stack.Table

stack.acc++
name := "Sink" + strconv.Itoa(stack.acc)
sink := NewSink(stack.Stack, jsii.String(name), props)

return sink
}

//------------------------------------------------------------------------------
//
// AWS CDK App Construct
//
//------------------------------------------------------------------------------

type ServerlessApp struct {
awscdk.App
}

func NewServerlessApp() *ServerlessApp {
app := awscdk.NewApp(nil)
return &ServerlessApp{App: app}
}

func (app *ServerlessApp) NewStack(name string, props ...*awscdk.StackProps) *ServerlessStack {
config := &awscdk.StackProps{
Env: &awscdk.Environment{
Account: jsii.String(os.Getenv("CDK_DEFAULT_ACCOUNT")),
Region: jsii.String(os.Getenv("CDK_DEFAULT_REGION")),
},
}

if len(props) == 1 {
config = props[0]
}

return NewServerlessStack(app.App, jsii.String(name), &ServerlessStackProps{
StackProps: config,
Version: FromContextVsn(app),
System: name,
})
}

func FromContext(app awscdk.App, key string) string {
val := app.Node().TryGetContext(jsii.String(key))
switch v := val.(type) {
case string:
return v
default:
return ""
}
}

func FromContextVsn(app awscdk.App) string {
vsn := FromContext(app, "vsn")
if vsn == "" {
return "latest"
}

return vsn
}
46 changes: 46 additions & 0 deletions broker/eventddb/awscdk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb_test

import (
"testing"

"github.com/aws/aws-cdk-go/awscdk/v2/assertions"
"github.com/aws/jsii-runtime-go"
"github.com/fogfish/scud"
"github.com/fogfish/swarm/broker/eventddb"
)

func TestEventDdbCDK(t *testing.T) {
app := eventddb.NewServerlessApp()
stack := app.NewStack("swarm-example-eventddb", nil)
stack.NewGlobalTable()

stack.NewSink(
&eventddb.SinkProps{
Lambda: &scud.FunctionGoProps{
SourceCodePackage: "github.com/fogfish/swarm",
SourceCodeLambda: "examples/eventddb/dequeue",
},
},
)

require := map[*string]*float64{
jsii.String("AWS::DynamoDB::GlobalTable"): jsii.Number(1),
jsii.String("AWS::Lambda::EventSourceMapping"): jsii.Number(1),
jsii.String("AWS::IAM::Role"): jsii.Number(2),
jsii.String("AWS::Lambda::Function"): jsii.Number(2),
jsii.String("Custom::LogRetention"): jsii.Number(1),
}

template := assertions.Template_FromStack(stack.Stack, nil)
for key, val := range require {
template.ResourceCountIs(key, val)
}
}
106 changes: 106 additions & 0 deletions broker/eventddb/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//
// Copyright (C) 2021 - 2022 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb

import (
"context"
"log/slog"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/fogfish/curie"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/router"
)

// New creates broker for AWS EventBridge
func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
conf := swarm.NewConfig()
for _, opt := range opts {
opt(&conf)
}

ctx, can := context.WithCancel(context.Background())

slog.Info("Broker is created", "type", "ddbstream")
return &broker{
config: conf,
channels: swarm.NewChannels(),
context: ctx,
cancel: can,
router: router.New(&conf, nil),
}, nil
}

type broker struct {
config swarm.Config
channels *swarm.Channels
context context.Context
cancel context.CancelFunc
router *router.Router
}

func (b *broker) Config() swarm.Config {
return b.config
}

func (b *broker) Close() {
b.channels.Sync()
b.channels.Close()
b.cancel()
}

func (b *broker) DSync() {
b.channels.Sync()
}

func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue {
panic("not implemented")
}

func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue {
b.channels.Attach(category, channel)
b.router.Register(category)

return b.router
}

func (b *broker) Await() {
starter := lambda.Start

type Mock interface{ Start(interface{}) }
if b.config.Service != nil {
service, ok := b.config.Service.(Mock)
if ok {
starter = service.Start
}
}

starter(
func(events events.DynamoDBEvent) error {
for _, evt := range events.Records {
bag := swarm.Bag{
Category: Category,
Event: &Event{
ID: evt.EventID,
Type: curie.IRI(evt.EventName),
Agent: curie.IRI(evt.EventSourceArn),
Object: &evt,
},
Digest: swarm.Digest{Brief: evt.EventID},
}
if err := b.router.Dispatch(bag); err != nil {
return err
}
}

return b.router.Await(b.config.TimeToFlight)
},
)
}
Loading

0 comments on commit 3d9a359

Please sign in to comment.