Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update all broker to kernel v0.20.1 #97

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@
- '*.md'
- 'doc/*'
- 'doc/**/*'
- 'examples/**/*'

"[#] core":
- '*.go'
- 'queue/*'
- 'queue/**/*'
- 'internal/*'
- 'internal/**/*'
- 'kernel/*'
- 'kernel/**/*'
- 'dequeue/*'
- 'dequeue/**/*'
- 'enqueue/*'
- 'enqueue/**/*'

"[#] eventbridge":
- 'broker/eventbridge/*'
- 'broker/eventbridge/**/*'

"[#] eventddb":
- 'broker/eventddb/*'
- 'broker/eventddb/**/*'

"[#] events3":
- 'broker/events3/*'
- 'broker/events3/**/*'

"[#] eventsqs":
- 'broker/eventsqs/*'
- 'broker/eventsqs/**/*'

"[#] sqs":
- 'broker/sqs/*'
- 'broker/eventsqs/*'

"[#] eventbridge":
- 'broker/eventbridge/*'

"[#] websocket":
- 'broker/websocket/*'
1 change: 0 additions & 1 deletion broker/eventbridge/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
)

if props.Function != nil {
props.Function.Setenv(EnvEventBridge, *props.System.EventBusName())
sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

sink.Rule.AddTarget(awseventstargets.NewLambdaFunction(
Expand Down
16 changes: 4 additions & 12 deletions broker/eventbridge/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
package eventbridge

import (
"os"
"time"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/kernel/encoding"
)

// Environment variable to config event source
const EnvConfigSourceEventBridge = "CONFIG_SWARM_SOURCE_EVENTBRIDGE"

type Option func(*Client)

var defs = []Option{WithConfig(), WithEnv()}
var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
Expand All @@ -40,13 +42,3 @@ func WithService(service EventBridge) Option {
c.service = service
}
}

const EnvEventBridge = "CONFIG_SWARM_EVENT_BRIDGE"

func WithEnv() Option {
return func(c *Client) {
if val, has := os.LookupEnv(EnvEventBridge); has {
c.bus = val
}
}
}
8 changes: 4 additions & 4 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Client struct {
}

// Create writer to AWS EventBridge
func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(queue, opts...)
func NewEnqueuer(bus string, opts ...Option) (*kernel.Enqueuer, error) {
cli, err := newEventBridge(bus, opts...)
if err != nil {
return nil, err
}
Expand All @@ -45,8 +45,8 @@ func NewEnqueuer(queue string, opts ...Option) (*kernel.Enqueuer, error) {
}

// Create reader from AWS EventBridge
func NewDequeuer(queue string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: queue}
func NewDequeuer(bus string, opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{bus: bus}

for _, opt := range defs {
opt(c)
Expand Down
2 changes: 1 addition & 1 deletion broker/eventbridge/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventbridge

const Version = "broker/eventbridge/v0.20.1"
const Version = "broker/eventbridge/v0.20.2"
3 changes: 3 additions & 0 deletions broker/eventddb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceDynamoDB = "CONFIG_SWARM_SOURCE_DYNAMODB"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/eventddb/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package eventddb

const Version = "broker/eventddb/v0.20.1"
const Version = "broker/eventddb/v0.20.2"
1 change: 0 additions & 1 deletion broker/events3/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type SinkProps struct {
func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

props.Function.Setenv(EnvSourceEventS3, *props.Bucket.BucketName())
sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

eventsource := &awslambdaeventsources.S3EventSourceProps{
Expand Down
22 changes: 4 additions & 18 deletions broker/events3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
package events3

import (
"os"
"time"

"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceS3 = "CONFIG_SWARM_SOURCE_S3"

type Option func(*Client)

var defs = []Option{WithConfig(), WithEnv()}
var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
Expand All @@ -38,19 +40,3 @@ func WithConfig(opts ...swarm.Option) Option {
// c.service = service
// }
// }

const EnvSourceEventS3 = "CONFIG_SWARM_EVENT_S3"

func WithEnv() Option {
return func(c *Client) {
if val, has := os.LookupEnv(EnvSourceEventS3); has {
c.bucket = val
}
}
}

func WithBucket(bucket string) Option {
return func(c *Client) {
c.bucket = bucket
}
}
3 changes: 1 addition & 2 deletions broker/events3/events3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
)

type Client struct {
bucket string
config swarm.Config
}

// Create reader from AWS S3 Events
func NewReader(opts ...Option) (*kernel.Dequeuer, error) {
func NewDequeuer(opts ...Option) (*kernel.Dequeuer, error) {
c := &Client{}

for _, opt := range defs {
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/events3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestReader(t *testing.T) {
bridge := &bridge{kernel.NewBridge(100 * time.Millisecond)}

t.Run("New", func(t *testing.T) {
q, err := NewReader()
q, err := NewDequeuer()
it.Then(t).Should(it.Nil(err))
q.Close()
})
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/examples/dequeue/typed/events3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func main() {
q, err := events3.NewReader(
q, err := events3.NewDequeuer(
events3.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/fogfish/guid/v2 v2.0.4
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/events3/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
2 changes: 1 addition & 1 deletion broker/events3/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package events3

const Version = "broker/events3/v0.20.0"
const Version = "broker/events3/v0.20.1"
3 changes: 3 additions & 0 deletions broker/eventsqs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceSQS = "CONFIG_SWARM_SOURCE_EVENT_SQS"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
4 changes: 2 additions & 2 deletions broker/eventsqs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.103.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/scud v0.10.1
github.com/fogfish/swarm v0.20.0
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
8 changes: 4 additions & 4 deletions broker/eventsqs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.1 h1:eJI/1zQamihBTTwDhgn2VTXlG+74B1qVAOnGGDv4u7E=
github.com/fogfish/scud v0.10.1/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
3 changes: 3 additions & 0 deletions broker/sqs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigSourceSQS = "CONFIG_SWARM_SOURCE_SQS"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/sqs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.37
github.com/aws/aws-sdk-go-v2/service/sqs v1.35.1
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/sqs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ github.com/fogfish/it/v2 v2.0.2 h1:UR6yVemf8zD3WVs6Bq0zE6LJwapZ8urv9zvU5VB5E6o=
github.com/fogfish/it/v2 v2.0.2/go.mod h1:HHwufnTaZTvlRVnSesPl49HzzlMrQtweKbf+8Co/ll4=
github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpNCE=
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
12 changes: 12 additions & 0 deletions broker/websocket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# AWS WebSocket API Broker

The sub-module implements swarm broker for AWS WebSocket API. See [the library documentation](../../README.md) for details.

Note the broker implements only infrastructure required for making serverless applications using WebSocket APIs.

Use cli https://github.com/vi/websocat for testing purposes.

```bash
websocat wss://0000000000.execute-api.eu-west-1.amazonaws.com/ws/\?apikey=dGVzdDp0ZXN0
{"action":"User", "id":"xxx", "text":"some text"}
```
27 changes: 3 additions & 24 deletions broker/websocket/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type SinkProps struct {
func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
sink := &Sink{Construct: constructs.NewConstruct(scope, id)}

defaultEnvironment(props)
props.Function.Setenv(EnvConfigEventType, props.Route)
props.Function.Setenv(EnvConfigSourceWebSocket, aws.ToString(props.Gateway.ApiEndpoint())+"/"+stage)

sink.Handler = scud.NewFunction(sink.Construct, jsii.String("Func"), props.Function)

it := integrations.NewWebSocketLambdaIntegration(jsii.String(props.Route), sink.Handler,
Expand All @@ -66,29 +68,6 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
return sink
}

func defaultEnvironment(props *SinkProps) {
switch f := props.Function.(type) {
case *scud.FunctionGoProps:
if f.FunctionProps == nil {
f.FunctionProps = &awslambda.FunctionProps{}
}
if f.FunctionProps.Environment == nil {
f.FunctionProps.Environment = &map[string]*string{}
}
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
case *scud.ContainerGoProps:
if f.DockerImageFunctionProps == nil {
f.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{}
}
if f.DockerImageFunctionProps.Environment == nil {
f.DockerImageFunctionProps.Environment = &map[string]*string{}
}
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
}
}

//------------------------------------------------------------------------------
//
// AWS CDK Stack Construct
Expand Down
4 changes: 4 additions & 0 deletions broker/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/fogfish/swarm"
)

// Environment variable to config event source
const EnvConfigEventType = "CONFIG_SWARM_WS_EVENT_TYPE"
const EnvConfigSourceWebSocket = "CONFIG_SWARM_WS_URL"

type Option func(*Client)

var defs = []Option{WithConfig()}
Expand Down
2 changes: 1 addition & 1 deletion broker/websocket/examples/dequeue/typed/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type User struct {
}

func main() {
q, err := websocket.New(os.Getenv("CONFIG_SWARM_WS_URL"),
q, err := websocket.New(os.Getenv(websocket.EnvConfigSourceWebSocket),
websocket.WithConfig(
swarm.WithLogStdErr(),
),
Expand Down
2 changes: 1 addition & 1 deletion broker/websocket/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/fogfish/it/v2 v2.0.2
github.com/fogfish/logger/v3 v3.1.1
github.com/fogfish/scud v0.10.2
github.com/fogfish/swarm v0.20.0
github.com/fogfish/swarm v0.20.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions broker/websocket/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/fogfish/logger/v3 v3.1.1 h1:awmTNpBWRvSj086H3RWIUnc+FSu9qXHJgBa49wYpN
github.com/fogfish/logger/v3 v3.1.1/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU=
github.com/fogfish/scud v0.10.2 h1:cFupgZ4brqeGr/HCURnyDaBUNJIVEJTfKRwxEEUrO3w=
github.com/fogfish/scud v0.10.2/go.mod h1:IVtHIfQMsb9lPKFeCI/OGcT2ssmd6onOZdpXgj/ORgs=
github.com/fogfish/swarm v0.20.0 h1:eUlNXFsePfBo72iFNvY3eJ6YIQP0ttzflGF6tNAxhQ8=
github.com/fogfish/swarm v0.20.0/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/fogfish/swarm v0.20.1 h1:XzHkTHxgLVbctkTAcT4dVoGdp7mNKihdzz1Io6YGig0=
github.com/fogfish/swarm v0.20.1/go.mod h1:cdIviTojE3DT+FOIIOeOg6tyMqhyanfy2TZHTtKlOmo=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand Down
Loading