Skip to content

Commit

Permalink
Update eventddb to kernel v0.20.0 (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Sep 29, 2024
1 parent 51a1683 commit 881a660
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 102 deletions.
42 changes: 0 additions & 42 deletions broker/eventddb/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,48 +78,6 @@ func NewBroker(scope constructs.Construct, id *string, props *BrokerProps) *Brok
return broker
}

// func (broker *Broker) NewTable(props *awsdynamodb.TableProps) awsdynamodb.ITable {
// if props == nil {
// props = &awsdynamodb.TableProps{}
// }

// if props.TableName == nil {
// props.TableName = awscdk.Aws_STACK_NAME()
// }

// if props.PartitionKey == nil && props.SortKey == nil {
// props.PartitionKey = &awsdynamodb.Attribute{
// Type: awsdynamodb.AttributeType_STRING,
// Name: jsii.String("prefix"),
// }

// props.SortKey = &awsdynamodb.Attribute{
// Type: awsdynamodb.AttributeType_STRING,
// Name: jsii.String("suffix"),
// }
// }

// if props.BillingMode == "" {
// props.BillingMode = awsdynamodb.BillingMode_PAY_PER_REQUEST
// }

// if props.Stream == "" {
// props.Stream = awsdynamodb.StreamViewType_NEW_IMAGE
// }

// broker.Table = awsdynamodb.NewTable(broker.Construct, jsii.String("Table"), props)

// return broker.Table
// }

// func (broker *Broker) AddTable(tableName string) awsdynamodb.ITable {
// broker.Table = awsdynamodb.Table_FromTableName(broker.Construct, jsii.String("Table"),
// jsii.String(tableName),
// )

// return broker.Table
// }

func (broker *Broker) NewTable(props *awsdynamodb.TablePropsV2) awsdynamodb.ITable {
if props == nil {
props = &awsdynamodb.TablePropsV2{}
Expand Down
2 changes: 1 addition & 1 deletion broker/eventddb/awscdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestEventDdbCDK(t *testing.T) {
&eventddb.SinkProps{
Function: &scud.FunctionGoProps{
SourceCodeModule: "github.com/fogfish/swarm/broker/eventddb",
SourceCodeLambda: "examples/dequeue",
SourceCodeLambda: "examples/dequeue/typed",
},
},
)
Expand Down
33 changes: 33 additions & 0 deletions broker/eventddb/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb

import (
"time"

"github.com/fogfish/swarm"
)

type Option func(*Client)

var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

// Mandatory overrides
config.PollFrequency = 5 * time.Microsecond

c.config = config
}
}
62 changes: 26 additions & 36 deletions broker/eventddb/eventddb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package eventddb

import (
"context"
"encoding/json"

"github.com/aws/aws-lambda-go/lambda"
Expand All @@ -18,56 +17,47 @@ import (
"github.com/fogfish/swarm/kernel"
)

type Client struct {
config swarm.Config
}

// New creates broker for AWS EventBridge
func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
config := swarm.NewConfig()
func NewReader(opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{}

for _, opt := range defs {
opt(c)
}
for _, opt := range opts {
opt(&config)
opt(c)
}

starter := lambda.Start
bridge := &bridge{kernel.NewBridge(c.config.TimeToFlight)}

type Mock interface{ Start(interface{}) }
if config.Service != nil {
service, ok := config.Service.(Mock)
if ok {
starter = service.Start
}
}
return kernel.NewDequeuer(bridge, c.config), nil

sls := spawner{f: starter, c: config}

return kernel.New(nil, sls, config), nil
}

//------------------------------------------------------------------------------

type spawner struct {
c swarm.Config
f func(any)
}
type bridge struct{ *kernel.Bridge }

// Note: events.DynamoDBEvent decodes all records, the swarm kernel protocol requires bytes.
type DynamoDBEvent struct {
Records []json.RawMessage `json:"Records"`
}

func (s spawner) Spawn(k *kernel.Kernel) error {
s.f(
func(events DynamoDBEvent) error {
bag := make([]swarm.Bag, len(events.Records))
for i, obj := range events.Records {
bag[i] = swarm.Bag{
Ctx: swarm.NewContext(context.Background(), Category, guid.G(guid.Clock).String()),
Object: obj,
}
}
func (s bridge) Run() { lambda.Start(s.run) }

return k.Dispatch(bag, s.c.TimeToFlight)
},
)
func (s bridge) run(events DynamoDBEvent) error {
bag := make([]swarm.Bag, len(events.Records))
for i, obj := range events.Records {
bag[i] = swarm.Bag{
Category: Category,
Digest: guid.G(guid.Clock).String(),
Object: obj,
}
}

return nil
return s.Bridge.Dispatch(bag)
}

func (s spawner) Ack(digest string) error { return nil }
func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil }
73 changes: 73 additions & 0 deletions broker/eventddb/eventddb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package eventddb

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/fogfish/it/v2"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel"
)

func TestReader(t *testing.T) {
var bag []swarm.Bag
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}

t.Run("NewReader", func(t *testing.T) {
q, err := NewReader(
WithConfig(
swarm.WithLogStdErr(),
),
)
it.Then(t).Should(it.Nil(err))
q.Close()
})

t.Run("Dequeue", func(t *testing.T) {
go func() {
bag, _ = bridge.Ask(context.Background())
for _, m := range bag {
bridge.Ack(context.Background(), m.Digest)
}
}()

err := bridge.run(
DynamoDBEvent{
Records: []json.RawMessage{[]byte(`{"sut":"test"}`)},
},
)

it.Then(t).Should(
it.Nil(err),
it.Equal(len(bag), 1),
it.Equal(bag[0].Category, Category),
it.Equiv(bag[0].Object, []byte(`{"sut":"test"}`)),
)
})

t.Run("Dequeue.Timeout", func(t *testing.T) {
go func() {
bag, _ = bridge.Ask(context.Background())
}()

err := bridge.run(
DynamoDBEvent{
Records: []json.RawMessage{[]byte(`{"sut":"test"}`)},
},
)

it.Then(t).ShouldNot(
it.Nil(err),
)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ package main
import (
"encoding/json"
"fmt"
"log/slog"

"github.com/aws/aws-lambda-go/events"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventddb"
"github.com/fogfish/swarm/queue"
)

func main() {
q := queue.Must(eventddb.New("swarm-test", swarm.WithLogStdErr()))
q, err := eventddb.NewReader(
eventddb.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("eventddb reader has failed", "err", err)
return
}

go common(eventddb.Dequeue(q))
go common(eventddb.Source(q))

q.Await()
}

func common(rcv <-chan swarm.Msg[*events.DynamoDBEventRecord], ack chan<- swarm.Msg[*events.DynamoDBEventRecord]) {
for msg := range rcv {

v, _ := json.MarshalIndent(msg, "", " ")
fmt.Printf("ddb event > \n %s\n", v)
ack <- msg
Expand Down
2 changes: 1 addition & 1 deletion broker/eventddb/examples/serverless/cdk.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"app": "go run main.go",
"app": "go run eventddb.go",
"requireApproval": "never",
"context": {
"@aws-cdk/core:stackRelativeExports": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func main() {
broker.NewSink(
&eventddb.SinkProps{
Function: &scud.FunctionGoProps{
SourceCodeModule: "github.com/fogfish/swarm",
SourceCodeLambda: "examples/eventddb/dequeue",
SourceCodeModule: "github.com/fogfish/swarm/broker/eventddb",
SourceCodeLambda: "examples/dequeue/typed",
},
},
)
Expand Down
7 changes: 3 additions & 4 deletions broker/eventddb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ require (
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.103.1
github.com/fogfish/guid/v2 v2.0.4
github.com/fogfish/scud v0.10.1
github.com/fogfish/swarm v0.16.0
github.com/fogfish/swarm/queue v0.16.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
)

require (
Expand All @@ -24,7 +24,6 @@ require (
github.com/fogfish/faults v0.2.0 // indirect
github.com/fogfish/golem/hseq v1.2.0 // indirect
github.com/fogfish/golem/optics v0.13.0 // indirect
github.com/fogfish/golem/pure v0.10.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/yuin/goldmark v1.5.3 // indirect
Expand Down
14 changes: 6 additions & 8 deletions broker/eventddb/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,18 @@ github.com/fogfish/golem/hseq v1.2.0 h1:B6yrzOHQNoTqSlhLb+AvK7dhEAELjHThrCQTF/uq
github.com/fogfish/golem/hseq v1.2.0/go.mod h1:17XORt8nNKl6KOhF43MHSmjK8NksbkBsohAoJGiinUs=
github.com/fogfish/golem/optics v0.13.0 h1:U3htppjVTMbICQIzPTTe151+WziSGEppNVmkanKa440=
github.com/fogfish/golem/optics v0.13.0/go.mod h1:U1y90OVcXF/A61dIP3abQ0x2GweTmzVHPC15pv0pcM0=
github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE=
github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw=
github.com/fogfish/guid/v2 v2.0.4 h1:EZiPlM4UAghqf7DU5/nLEF+iRH7ODe0AiFuYOMRvITQ=
github.com/fogfish/guid/v2 v2.0.4/go.mod h1:KkZ5T4EE3BqWQJFZBPLSHV/tBe23Xq4KvuPfwtNtepU=
github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8=
github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4=
github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/scud v0.10.1 h1:eJI/1zQamihBTTwDhgn2VTXlG+74B1qVAOnGGDv4u7E=
github.com/fogfish/scud v0.10.1/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.16.0 h1:6AviPpPbrSraLcjH2GwW0oVUb7tb6IZMqdT+FdXOCHU=
github.com/fogfish/swarm v0.16.0/go.mod h1:u5uJmXu3xHz1OTzyxhu9viI6eq+GmkJJkDqdrf8IlJw=
github.com/fogfish/swarm/queue v0.16.1 h1:bReiuwm2a1h0hP4ElSzETksqs1gT2yKRHtA6aVHQnKY=
github.com/fogfish/swarm/queue v0.16.1/go.mod h1:tiSPnknVI/nHs2TL8H17YszUvgBT1E2ksaOGBQV/E6Q=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
11 changes: 8 additions & 3 deletions broker/eventddb/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ package eventddb
import (
"github.com/aws/aws-lambda-go/events"
"github.com/fogfish/swarm"
queue "github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/dequeue"
"github.com/fogfish/swarm/kernel"
)

const Category = "DynamoDBEventRecord"

func Dequeue(q swarm.Broker) (<-chan swarm.Msg[*events.DynamoDBEventRecord], chan<- swarm.Msg[*events.DynamoDBEventRecord]) {
return queue.Dequeue[*events.DynamoDBEventRecord](q)
// The broker produces only [events.DynamoDBEventRecord], the function is helper.
func Source(q *kernel.Dequeuer) (
<-chan swarm.Msg[*events.DynamoDBEventRecord],
chan<- swarm.Msg[*events.DynamoDBEventRecord],
) {
return dequeue.Typed[*events.DynamoDBEventRecord](q)
}
2 changes: 1 addition & 1 deletion broker/eventddb/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventddb

const Version = "broker/eventddb/v0.16.1"
const Version = "broker/eventddb/v0.20.0"

0 comments on commit 881a660

Please sign in to comment.