Skip to content

Commit

Permalink
Update all broker to kernel v0.20.1 (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Sep 30, 2024
1 parent 4e037e0 commit dd34094
Show file tree
Hide file tree
Showing 27 changed files with 84 additions and 91 deletions.
30 changes: 22 additions & 8 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@
- '*.md'
- 'doc/*'
- 'doc/**/*'
- 'examples/**/*'

"[#] core":
- '*.go'
- 'queue/*'
- 'queue/**/*'
- 'internal/*'
- 'internal/**/*'
- 'kernel/*'
- 'kernel/**/*'
- 'dequeue/*'
- 'dequeue/**/*'
- 'enqueue/*'
- 'enqueue/**/*'

"[#] eventbridge":
- 'broker/eventbridge/*'
- 'broker/eventbridge/**/*'

"[#] eventddb":
- 'broker/eventddb/*'
- 'broker/eventddb/**/*'

"[#] events3":
- 'broker/events3/*'
- 'broker/events3/**/*'

"[#] eventsqs":
- 'broker/eventsqs/*'
- 'broker/eventsqs/**/*'

"[#] sqs":
- 'broker/sqs/*'
- 'broker/eventsqs/*'

"[#] eventbridge":
- 'broker/eventbridge/*'

"[#] websocket":
- 'broker/websocket/*'
1 change: 0 additions & 1 deletion broker/eventbridge/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
)

if props.Function != nil {
props.Function.Setenv(EnvEventBridge, *props.System.EventBusName())
sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

sink.Rule.AddTarget(awseventstargets.NewLambdaFunction(
Expand Down
16 changes: 4 additions & 12 deletions broker/eventbridge/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
package eventbridge

import (
"os"
"time"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

// Environment variable to config event source
const EnvConfigSourceEventBridge = "CONFIG_SWARM_SOURCE_EVENTBRIDGE"

type Option func(*Client)

var defs = []Option{WithConfig(), WithEnv()}
var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
Expand All @@ -40,13 +42,3 @@ func WithService(service EventBridge) Option {
c.service = service
}
}

const EnvEventBridge = "CONFIG_SWARM_EVENT_BRIDGE"

func WithEnv() Option {
return func(c *Client) {
if val, has := os.LookupEnv(EnvEventBridge); has {
c.bus = val
}
}
}
8 changes: 4 additions & 4 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Client struct {
}

// Create writer to AWS EventBridge
func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(queue, opts...)
func NewEnqueuer(bus string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(bus, opts...)
if err != nil {
return nil, err
}
Expand All @@ -45,8 +45,8 @@ func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) {
}

// Create reader from AWS EventBridge
func NewDequeuer(queue string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: queue}
func NewDequeuer(bus string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: bus}

for _, opt := range defs {
opt(c)
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventbridge

const Version = "broker/eventbridge/v0.20.1"
const Version = "broker/eventbridge/v0.20.2"
3 changes: 3 additions & 0 deletions broker/eventddb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceDynamoDB = "CONFIG_SWARM_SOURCE_DYNAMODB"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/eventddb/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventddb

const Version = "broker/eventddb/v0.20.1"
const Version = "broker/eventddb/v0.20.2"
1 change: 0 additions & 1 deletion broker/events3/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type SinkProps struct {
func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

props.Function.Setenv(EnvSourceEventS3, *props.Bucket.BucketName())
sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

eventsource := &awslambdaeventsources.S3EventSourceProps{
Expand Down
22 changes: 4 additions & 18 deletions broker/events3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
package events3

import (
"os"
"time"

"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceS3 = "CONFIG_SWARM_SOURCE_S3"

type Option func(*Client)

var defs = []Option{WithConfig(), WithEnv()}
var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
Expand All @@ -38,19 +40,3 @@ func WithConfig(opts ...swarm.Option) Option {
// c.service = service
// }
// }

const EnvSourceEventS3 = "CONFIG_SWARM_EVENT_S3"

func WithEnv() Option {
return func(c *Client) {
if val, has := os.LookupEnv(EnvSourceEventS3); has {
c.bucket = val
}
}
}

func WithBucket(bucket string) Option {
return func(c *Client) {
c.bucket = bucket
}
}
3 changes: 1 addition & 2 deletions broker/events3/events3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
)

type Client struct {
bucket string
config swarm.Config
}

// Create reader from AWS S3 Events
func NewReader(opts ...Option) (*kernel.Dequeuer, error) {
func NewDequeuer(opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{}

for _, opt := range defs {
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/events3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestReader(t *testing.T) {
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}

t.Run("New", func(t *testing.T) {
q, err := NewReader()
q, err := NewDequeuer()
it.Then(t).Should(it.Nil(err))
q.Close()
})
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/examples/dequeue/typed/events3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func main() {
q, err := events3.NewReader(
q, err := events3.NewDequeuer(
events3.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/fogfish/guid/v2 v2.0.4
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/events3/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package events3

const Version = "broker/events3/v0.20.0"
const Version = "broker/events3/v0.20.1"
3 changes: 3 additions & 0 deletions broker/eventsqs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceSQS = "CONFIG_SWARM_SOURCE_EVENT_SQS"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
4 changes: 2 additions & 2 deletions broker/eventsqs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.103.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.1
github.com/fogfish/swarm v0.20.0
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
8 changes: 4 additions & 4 deletions broker/eventsqs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.1 h1:eJI/1zQamihBTTwDhgn2VTXlG+74B1qVAOnGGDv4u7E=
github.com/fogfish/scud v0.10.1/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
3 changes: 3 additions & 0 deletions broker/sqs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceSQS = "CONFIG_SWARM_SOURCE_SQS"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/sqs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.37
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/sqs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
12 changes: 12 additions & 0 deletions broker/websocket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# AWS WebSocket API Broker

The sub-module implements swarm broker for AWS WebSocket API. See [the library documentation](../../README.md) for details.

Note the broker implements only infrastructure required for making serverless applications using WebSocket APIs.

Use cli https://github.com/vi/websocat for testing purposes.

```bash
websocat wss://0000000000.execute-api.eu-west-1.amazonaws.com/ws/\?apikey=dGVzdDp0ZXN0
{"action":"User", "id":"xxx", "text":"some text"}
```
27 changes: 3 additions & 24 deletions broker/websocket/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type SinkProps struct {
func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

defaultEnvironment(props)
props.Function.Setenv(EnvConfigEventType, props.Route)
props.Function.Setenv(EnvConfigSourceWebSocket, aws.ToString(props.Gateway.ApiEndpoint())+"/"+stage)

sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

it := integrations.NewWebSocketLambdaIntegration(jsii.String(props.Route), sink.Handler,
Expand All @@ -66,29 +68,6 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
return sink
}

func defaultEnvironment(props *SinkProps) {
switch f := props.Function.(type) {
case *scud.FunctionGoProps:
if f.FunctionProps == nil {
f.FunctionProps = &awslambda.FunctionProps{}
}
if f.FunctionProps.Environment == nil {
f.FunctionProps.Environment = &map[string]*string{}
}
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
case *scud.ContainerGoProps:
if f.DockerImageFunctionProps == nil {
f.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{}
}
if f.DockerImageFunctionProps.Environment == nil {
f.DockerImageFunctionProps.Environment = &map[string]*string{}
}
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
}
}

//------------------------------------------------------------------------------
//
// AWS CDK Stack Construct
Expand Down
4 changes: 4 additions & 0 deletions broker/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigEventType = "CONFIG_SWARM_WS_EVENT_TYPE"
const EnvConfigSourceWebSocket = "CONFIG_SWARM_WS_URL"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/websocket/examples/dequeue/typed/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type User struct {
}

func main() {
q, err := websocket.New(os.Getenv("CONFIG_SWARM_WS_URL"),
q, err := websocket.New(os.Getenv(websocket.EnvConfigSourceWebSocket),
websocket.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/websocket/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/logger/v3 v3.1.1
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/websocket/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand Down

0 comments on commit dd34094

Please sign in to comment.