diff --git a/broker/eventddb/awscdk.go b/broker/eventddb/awscdk.go index 502e8d4..9da55a9 100644 --- a/broker/eventddb/awscdk.go +++ b/broker/eventddb/awscdk.go @@ -78,48 +78,6 @@ func NewBroker(scope constructs.Construct, id *string, props *BrokerProps) *Brok return broker } -// func (broker *Broker) NewTable(props *awsdynamodb.TableProps) awsdynamodb.ITable { -// if props == nil { -// props = &awsdynamodb.TableProps{} -// } - -// if props.TableName == nil { -// props.TableName = awscdk.Aws_STACK_NAME() -// } - -// if props.PartitionKey == nil && props.SortKey == nil { -// props.PartitionKey = &awsdynamodb.Attribute{ -// Type: awsdynamodb.AttributeType_STRING, -// Name: jsii.String("prefix"), -// } - -// props.SortKey = &awsdynamodb.Attribute{ -// Type: awsdynamodb.AttributeType_STRING, -// Name: jsii.String("suffix"), -// } -// } - -// if props.BillingMode == "" { -// props.BillingMode = awsdynamodb.BillingMode_PAY_PER_REQUEST -// } - -// if props.Stream == "" { -// props.Stream = awsdynamodb.StreamViewType_NEW_IMAGE -// } - -// broker.Table = awsdynamodb.NewTable(broker.Construct, jsii.String("Table"), props) - -// return broker.Table -// } - -// func (broker *Broker) AddTable(tableName string) awsdynamodb.ITable { -// broker.Table = awsdynamodb.Table_FromTableName(broker.Construct, jsii.String("Table"), -// jsii.String(tableName), -// ) - -// return broker.Table -// } - func (broker *Broker) NewTable(props *awsdynamodb.TablePropsV2) awsdynamodb.ITable { if props == nil { props = &awsdynamodb.TablePropsV2{} diff --git a/broker/eventddb/awscdk_test.go b/broker/eventddb/awscdk_test.go index 3e90880..02daf2a 100644 --- a/broker/eventddb/awscdk_test.go +++ b/broker/eventddb/awscdk_test.go @@ -29,7 +29,7 @@ func TestEventDdbCDK(t *testing.T) { &eventddb.SinkProps{ Function: &scud.FunctionGoProps{ SourceCodeModule: "github.com/fogfish/swarm/broker/eventddb", - SourceCodeLambda: "examples/dequeue", + SourceCodeLambda: "examples/dequeue/typed", }, }, ) diff --git a/broker/eventddb/config.go b/broker/eventddb/config.go new file mode 100644 index 0000000..e7e349f --- /dev/null +++ b/broker/eventddb/config.go @@ -0,0 +1,33 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "time" + + "github.com/fogfish/swarm" +) + +type Option func(*Client) + +var defs = []Option{WithConfig()} + +func WithConfig(opts ...swarm.Option) Option { + return func(c *Client) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + // Mandatory overrides + config.PollFrequency = 5 * time.Microsecond + + c.config = config + } +} diff --git a/broker/eventddb/eventddb.go b/broker/eventddb/eventddb.go index 4381a81..593feeb 100644 --- a/broker/eventddb/eventddb.go +++ b/broker/eventddb/eventddb.go @@ -9,7 +9,6 @@ package eventddb import ( - "context" "encoding/json" "github.com/aws/aws-lambda-go/lambda" @@ -18,56 +17,47 @@ import ( "github.com/fogfish/swarm/kernel" ) +type Client struct { + config swarm.Config +} + // New creates broker for AWS EventBridge -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - config := swarm.NewConfig() +func NewReader(opts ...Option) (*kernel.Dequeuer, error) { + c := &Client{} + + for _, opt := range defs { + opt(c) + } for _, opt := range opts { - opt(&config) + opt(c) } - starter := lambda.Start + bridge := &bridge{kernel.NewBridge(c.config.TimeToFlight)} - type Mock interface{ Start(interface{}) } - if config.Service != nil { - service, ok := config.Service.(Mock) - if ok { - starter = service.Start - } - } + return kernel.NewDequeuer(bridge, c.config), nil - sls := spawner{f: starter, c: config} - - return kernel.New(nil, sls, config), nil } //------------------------------------------------------------------------------ -type spawner struct { - c swarm.Config - f func(any) -} +type bridge struct{ *kernel.Bridge } +// Note: events.DynamoDBEvent decodes all records, the swarm kernel protocol requires bytes. type DynamoDBEvent struct { Records []json.RawMessage `json:"Records"` } -func (s spawner) Spawn(k *kernel.Kernel) error { - s.f( - func(events DynamoDBEvent) error { - bag := make([]swarm.Bag, len(events.Records)) - for i, obj := range events.Records { - bag[i] = swarm.Bag{ - Ctx: swarm.NewContext(context.Background(), Category, guid.G(guid.Clock).String()), - Object: obj, - } - } +func (s bridge) Run() { lambda.Start(s.run) } - return k.Dispatch(bag, s.c.TimeToFlight) - }, - ) +func (s bridge) run(events DynamoDBEvent) error { + bag := make([]swarm.Bag, len(events.Records)) + for i, obj := range events.Records { + bag[i] = swarm.Bag{ + Category: Category, + Digest: guid.G(guid.Clock).String(), + Object: obj, + } + } - return nil + return s.Bridge.Dispatch(bag) } - -func (s spawner) Ack(digest string) error { return nil } -func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } diff --git a/broker/eventddb/eventddb_test.go b/broker/eventddb/eventddb_test.go new file mode 100644 index 0000000..731053a --- /dev/null +++ b/broker/eventddb/eventddb_test.go @@ -0,0 +1,73 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/fogfish/it/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/kernel" +) + +func TestReader(t *testing.T) { + var bag []swarm.Bag + bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)} + + t.Run("NewReader", func(t *testing.T) { + q, err := NewReader( + WithConfig( + swarm.WithLogStdErr(), + ), + ) + it.Then(t).Should(it.Nil(err)) + q.Close() + }) + + t.Run("Dequeue", func(t *testing.T) { + go func() { + bag, _ = bridge.Ask(context.Background()) + for _, m := range bag { + bridge.Ack(context.Background(), m.Digest) + } + }() + + err := bridge.run( + DynamoDBEvent{ + Records: []json.RawMessage{[]byte(`{"sut":"test"}`)}, + }, + ) + + it.Then(t).Should( + it.Nil(err), + it.Equal(len(bag), 1), + it.Equal(bag[0].Category, Category), + it.Equiv(bag[0].Object, []byte(`{"sut":"test"}`)), + ) + }) + + t.Run("Dequeue.Timeout", func(t *testing.T) { + go func() { + bag, _ = bridge.Ask(context.Background()) + }() + + err := bridge.run( + DynamoDBEvent{ + Records: []json.RawMessage{[]byte(`{"sut":"test"}`)}, + }, + ) + + it.Then(t).ShouldNot( + it.Nil(err), + ) + }) +} diff --git a/broker/eventddb/examples/dequeue/eventddb.go b/broker/eventddb/examples/dequeue/typed/eventddb.go similarity index 75% rename from broker/eventddb/examples/dequeue/eventddb.go rename to broker/eventddb/examples/dequeue/typed/eventddb.go index 8741475..2d70b3d 100644 --- a/broker/eventddb/examples/dequeue/eventddb.go +++ b/broker/eventddb/examples/dequeue/typed/eventddb.go @@ -11,24 +11,31 @@ package main import ( "encoding/json" "fmt" + "log/slog" "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/eventddb" - "github.com/fogfish/swarm/queue" ) func main() { - q := queue.Must(eventddb.New("swarm-test", swarm.WithLogStdErr())) + q, err := eventddb.NewReader( + eventddb.WithConfig( + swarm.WithLogStdErr(), + ), + ) + if err != nil { + slog.Error("eventddb reader has failed", "err", err) + return + } - go common(eventddb.Dequeue(q)) + go common(eventddb.Source(q)) q.Await() } func common(rcv <-chan swarm.Msg[*events.DynamoDBEventRecord], ack chan<- swarm.Msg[*events.DynamoDBEventRecord]) { for msg := range rcv { - v, _ := json.MarshalIndent(msg, "", " ") fmt.Printf("ddb event > \n %s\n", v) ack <- msg diff --git a/broker/eventddb/examples/serverless/cdk.json b/broker/eventddb/examples/serverless/cdk.json index db2c983..78afba3 100644 --- a/broker/eventddb/examples/serverless/cdk.json +++ b/broker/eventddb/examples/serverless/cdk.json @@ -1,5 +1,5 @@ { - "app": "go run main.go", + "app": "go run eventddb.go", "requireApproval": "never", "context": { "@aws-cdk/core:stackRelativeExports": true, diff --git a/broker/eventddb/examples/serverless/main.go b/broker/eventddb/examples/serverless/eventddb.go similarity index 88% rename from broker/eventddb/examples/serverless/main.go rename to broker/eventddb/examples/serverless/eventddb.go index a12dea8..47d3149 100644 --- a/broker/eventddb/examples/serverless/main.go +++ b/broker/eventddb/examples/serverless/eventddb.go @@ -34,8 +34,8 @@ func main() { broker.NewSink( &eventddb.SinkProps{ Function: &scud.FunctionGoProps{ - SourceCodeModule: "github.com/fogfish/swarm", - SourceCodeLambda: "examples/eventddb/dequeue", + SourceCodeModule: "github.com/fogfish/swarm/broker/eventddb", + SourceCodeLambda: "examples/dequeue/typed", }, }, ) diff --git a/broker/eventddb/go.mod b/broker/eventddb/go.mod index 65bda1a..3c8018b 100644 --- a/broker/eventddb/go.mod +++ b/broker/eventddb/go.mod @@ -8,9 +8,9 @@ require ( github.com/aws/constructs-go/constructs/v10 v10.3.0 github.com/aws/jsii-runtime-go v1.103.1 github.com/fogfish/guid/v2 v2.0.4 - github.com/fogfish/scud v0.10.1 - github.com/fogfish/swarm v0.16.0 - github.com/fogfish/swarm/queue v0.16.1 + github.com/fogfish/it/v2 v2.0.2 + github.com/fogfish/scud v0.10.2 + github.com/fogfish/swarm v0.20.0 ) require ( @@ -24,7 +24,6 @@ require ( github.com/fogfish/faults v0.2.0 // indirect github.com/fogfish/golem/hseq v1.2.0 // indirect github.com/fogfish/golem/optics v0.13.0 // indirect - github.com/fogfish/golem/pure v0.10.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/yuin/goldmark v1.5.3 // indirect diff --git a/broker/eventddb/go.sum b/broker/eventddb/go.sum index 9914bf7..cbb7061 100644 --- a/broker/eventddb/go.sum +++ b/broker/eventddb/go.sum @@ -28,20 +28,18 @@ github.com/fogfish/golem/hseq v1.2.0 h1:B6yrzOHQNoTqSlhLb+AvK7dhEAELjHThrCQTF/uq github.com/fogfish/golem/hseq v1.2.0/go.mod h1:17XORt8nNKl6KOhF43MHSmjK8NksbkBsohAoJGiinUs= github.com/fogfish/golem/optics v0.13.0 h1:U3htppjVTMbICQIzPTTe151+WziSGEppNVmkanKa440= github.com/fogfish/golem/optics v0.13.0/go.mod h1:U1y90OVcXF/A61dIP3abQ0x2GweTmzVHPC15pv0pcM0= -github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE= -github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw= github.com/fogfish/guid/v2 v2.0.4 h1:EZiPlM4UAghqf7DU5/nLEF+iRH7ODe0AiFuYOMRvITQ= github.com/fogfish/guid/v2 v2.0.4/go.mod h1:KkZ5T4EE3BqWQJFZBPLSHV/tBe23Xq4KvuPfwtNtepU= github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o= github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4= -github.com/fogfish/scud v0.10.1 h1:eJI/1zQamihBTTwDhgn2VTXlG+74B1qVAOnGGDv4u7E= -github.com/fogfish/scud v0.10.1/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs= -github.com/fogfish/swarm v0.16.0 h1:6AviPpPbrSraLcjH2GwW0oVUb7tb6IZMqdT+FdXOCHU= -github.com/fogfish/swarm v0.16.0/go.mod h1:u5uJmXu3xHz1OTzyxhu9viI6eq+GmkJJkDqdrf8IlJw= -github.com/fogfish/swarm/queue v0.16.1 h1:bReiuwm2a1h0hP4ElSzETksqs1gT2yKRHtA6aVHQnKY= -github.com/fogfish/swarm/queue v0.16.1/go.mod h1:tiSPnknVI/nHs2TL8H17YszUvgBT1E2ksaOGBQV/E6Q= +github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE= +github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= +github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w= +github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs= +github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8= +github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/broker/eventddb/type.go b/broker/eventddb/type.go index eb715aa..1d86be4 100644 --- a/broker/eventddb/type.go +++ b/broker/eventddb/type.go @@ -11,11 +11,16 @@ package eventddb import ( "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" - queue "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/dequeue" + "github.com/fogfish/swarm/kernel" ) const Category = "DynamoDBEventRecord" -func Dequeue(q swarm.Broker) (<-chan swarm.Msg[*events.DynamoDBEventRecord], chan<- swarm.Msg[*events.DynamoDBEventRecord]) { - return queue.Dequeue[*events.DynamoDBEventRecord](q) +// The broker produces only [events.DynamoDBEventRecord], the function is helper. +func Source(q *kernel.Dequeuer) ( + <-chan swarm.Msg[*events.DynamoDBEventRecord], + chan<- swarm.Msg[*events.DynamoDBEventRecord], +) { + return dequeue.Typed[*events.DynamoDBEventRecord](q) } diff --git a/broker/eventddb/version.go b/broker/eventddb/version.go index 9b7d905..aaeb5ef 100644 --- a/broker/eventddb/version.go +++ b/broker/eventddb/version.go @@ -8,4 +8,4 @@ package eventddb -const Version = "broker/eventddb/v0.16.1" +const Version = "broker/eventddb/v0.20.0"