From 3f4863925872aa5aaa8966468bf10e20d8e0faa2 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Sun, 7 Apr 2024 17:55:56 +0300 Subject: [PATCH] align CDK constuct for all brokers --- broker/websocket/awscdk.go | 64 ++++++++++++++++++++++++---- broker/websocket/lambda/auth/auth.go | 17 +++++--- queue/dequeue.go | 2 +- queue/enqueue.go | 4 +- queue/events/dequeue.go | 2 +- queue/events/enqueue.go | 4 +- queue/events/queue.go | 2 +- queue/queue.go | 2 +- 8 files changed, 75 insertions(+), 22 deletions(-) diff --git a/broker/websocket/awscdk.go b/broker/websocket/awscdk.go index 280015f..f8c95f6 100644 --- a/broker/websocket/awscdk.go +++ b/broker/websocket/awscdk.go @@ -16,7 +16,10 @@ import ( "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2" authorizers "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2authorizers" integrations "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2integrations" + "github.com/aws/aws-cdk-go/awscdk/v2/awscertificatemanager" "github.com/aws/aws-cdk-go/awscdk/v2/awslambda" + "github.com/aws/aws-cdk-go/awscdk/v2/awsroute53" + "github.com/aws/aws-cdk-go/awscdk/v2/awsroute53targets" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/constructs-go/constructs/v10" "github.com/aws/jsii-runtime-go" @@ -91,6 +94,8 @@ type Broker struct { constructs.Construct Gateway awsapigatewayv2.WebSocketApi Authorizer awsapigatewayv2.IWebSocketRouteAuthorizer + domain awsapigatewayv2.DomainName + dns awsroute53.ARecord acc int } @@ -225,10 +230,7 @@ func (broker *Broker) NewAuthorizerUniversal(props *AuthorizerUniversalProps) aw jsii.String("default"), handler, &authorizers.WebSocketLambdaAuthorizerProps{ - IdentitySource: jsii.Strings( - "route.request.querystring.token", - "route.request.querystring.apikey", - ), + IdentitySource: jsii.Strings("route.request.querystring.token"), }, ) @@ -238,6 +240,8 @@ func (broker *Broker) NewAuthorizerUniversal(props *AuthorizerUniversalProps) aw type WebSocketApiProps struct { *awsapigatewayv2.WebSocketApiProps Throttle *awsapigatewayv2.ThrottleSettings + Host *string + TlsArn *string } func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSocketApi { @@ -265,18 +269,62 @@ func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSo broker.Gateway = awsapigatewayv2.NewWebSocketApi(broker.Construct, jsii.String("Gateway"), props.WebSocketApiProps) + var domain *awsapigatewayv2.DomainMappingOptions + if props.Host != nil && props.TlsArn != nil { + broker.domain = awsapigatewayv2.NewDomainName(broker.Construct, jsii.String("DomainName"), + &awsapigatewayv2.DomainNameProps{ + EndpointType: awsapigatewayv2.EndpointType_REGIONAL, + DomainName: props.Host, + Certificate: awscertificatemanager.Certificate_FromCertificateArn(broker.Construct, jsii.String("X509"), props.TlsArn), + }, + ) + + domain = &awsapigatewayv2.DomainMappingOptions{ + DomainName: broker.domain, + } + } + awsapigatewayv2.NewWebSocketStage(broker.Construct, jsii.String("Stage"), &awsapigatewayv2.WebSocketStageProps{ - AutoDeploy: jsii.Bool(true), - StageName: jsii.String(stage), - Throttle: props.Throttle, - WebSocketApi: broker.Gateway, + AutoDeploy: jsii.Bool(true), + StageName: jsii.String(stage), + Throttle: props.Throttle, + WebSocketApi: broker.Gateway, + DomainMapping: domain, }, ) + if props.Host != nil && props.TlsArn != nil { + broker.createRoute53(*props.Host) + } + return broker.Gateway } +func (broker *Broker) createRoute53(host string) { + domain := strings.Join(strings.Split(host, ".")[1:], ".") + zone := awsroute53.HostedZone_FromLookup(broker.Construct, jsii.String("HZone"), + &awsroute53.HostedZoneProviderProps{ + DomainName: jsii.String(domain), + }, + ) + + broker.dns = awsroute53.NewARecord(broker.Construct, jsii.String("ARecord"), + &awsroute53.ARecordProps{ + RecordName: jsii.String(host), + Target: awsroute53.RecordTarget_FromAlias( + awsroute53targets.NewApiGatewayv2DomainProperties( + broker.domain.RegionalDomainName(), + broker.domain.RegionalHostedZoneId(), + ), + ), + Ttl: awscdk.Duration_Seconds(jsii.Number(60)), + Zone: zone, + }, + ) + +} + func (broker *Broker) NewSink(props *SinkProps) *Sink { if broker.Gateway == nil { panic("Gatewaye is not defined.") diff --git a/broker/websocket/lambda/auth/auth.go b/broker/websocket/lambda/auth/auth.go index 8ce668b..2738650 100644 --- a/broker/websocket/lambda/auth/auth.go +++ b/broker/websocket/lambda/auth/auth.go @@ -34,10 +34,12 @@ func main() { lambda.Start( func(evt events.APIGatewayV2CustomAuthorizerV1Request) (events.APIGatewayCustomAuthorizerResponse, error) { - tkn, hastkn := evt.QueryStringParameters["token"] - key, haskey := evt.QueryStringParameters["apikey"] + tkn, has := evt.QueryStringParameters["token"] + if !has || len(tkn) == 0 { + return None, ErrForbidden + } - if jwt != nil && hastkn { + if jwt != nil && strings.HasPrefix(tkn, "ey") { principal, context, err := jwt.Validate(tkn) if err != nil { return None, ErrForbidden @@ -46,8 +48,8 @@ func main() { return AccessPolicy(principal, evt.MethodArn, context), nil } - if basic != nil && haskey { - principal, context, err := basic.Validate(key) + if basic != nil { + principal, context, err := basic.Validate(tkn) if err != nil { return None, ErrForbidden } @@ -98,7 +100,10 @@ func NewAuthBasic() (*AuthBasic, error) { return nil, errors.New("basic auth is not configured") } - return &AuthBasic{}, nil + return &AuthBasic{ + access: access, + secret: secret, + }, nil } func (auth *AuthBasic) Validate(apikey string) (string, map[string]any, error) { diff --git a/queue/dequeue.go b/queue/dequeue.go index b8e281e..76f03b9 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -15,7 +15,7 @@ import ( // Dequeue message func Dequeue[T any](q swarm.Broker, category ...string) (<-chan swarm.Msg[T], chan<- swarm.Msg[T]) { - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] } diff --git a/queue/enqueue.go b/queue/enqueue.go index a097659..4d2687c 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -18,7 +18,7 @@ import ( // Create egress and dead-letter queue channels for the category func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] } @@ -29,7 +29,7 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { } // normalized type name -func categoryOf[T any]() string { +func TypeOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() cat := typ.Name() if typ.Kind() == reflect.Ptr { diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index d3be6e7..03299b3 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -15,7 +15,7 @@ import ( // Dequeue event func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan swarm.Msg[*E], chan<- swarm.Msg[*E]) { - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 5178e57..9339e5d 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -20,7 +20,7 @@ import ( // - to send messages // - failed messages (dead-letter queue) func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) { - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } @@ -32,7 +32,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c } // normalized type name -func categoryOf[T any]() string { +func TypeOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() cat := typ.String() if typ.Kind() == reflect.Ptr { diff --git a/queue/events/queue.go b/queue/events/queue.go index 99c9475..9d75b49 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -50,7 +50,7 @@ func (q queue[T, E]) Enq(cat string, object *E) error { func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Enqueuer[T, E] { k := q.(*kernel.Kernel) - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } diff --git a/queue/queue.go b/queue/queue.go index 7270399..dbe7f70 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -52,7 +52,7 @@ func (q queue[T]) Enq(cat string, object T) error { func New[T any](q swarm.Broker, category ...string) Enqueuer[T] { k := q.(*kernel.Kernel) - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] }