Skip to content

Commit

Permalink
align CDK constuct for all brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Apr 7, 2024
1 parent 13d5f93 commit 3f48639
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 22 deletions.
64 changes: 56 additions & 8 deletions broker/websocket/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
"github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2"
authorizers "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2authorizers"
integrations "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2integrations"
"github.com/aws/aws-cdk-go/awscdk/v2/awscertificatemanager"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambda"
"github.com/aws/aws-cdk-go/awscdk/v2/awsroute53"
"github.com/aws/aws-cdk-go/awscdk/v2/awsroute53targets"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/constructs-go/constructs/v10"
"github.com/aws/jsii-runtime-go"
Expand Down Expand Up @@ -91,6 +94,8 @@ type Broker struct {
constructs.Construct
Gateway awsapigatewayv2.WebSocketApi
Authorizer awsapigatewayv2.IWebSocketRouteAuthorizer
domain awsapigatewayv2.DomainName
dns awsroute53.ARecord
acc int
}

Expand Down Expand Up @@ -225,10 +230,7 @@ func (broker *Broker) NewAuthorizerUniversal(props *AuthorizerUniversalProps) aw
jsii.String("default"),
handler,
&authorizers.WebSocketLambdaAuthorizerProps{
IdentitySource: jsii.Strings(
"route.request.querystring.token",
"route.request.querystring.apikey",
),
IdentitySource: jsii.Strings("route.request.querystring.token"),
},
)

Expand All @@ -238,6 +240,8 @@ func (broker *Broker) NewAuthorizerUniversal(props *AuthorizerUniversalProps) aw
type WebSocketApiProps struct {
*awsapigatewayv2.WebSocketApiProps
Throttle *awsapigatewayv2.ThrottleSettings
Host *string
TlsArn *string
}

func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSocketApi {
Expand Down Expand Up @@ -265,18 +269,62 @@ func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSo

broker.Gateway = awsapigatewayv2.NewWebSocketApi(broker.Construct, jsii.String("Gateway"), props.WebSocketApiProps)

var domain *awsapigatewayv2.DomainMappingOptions
if props.Host != nil && props.TlsArn != nil {
broker.domain = awsapigatewayv2.NewDomainName(broker.Construct, jsii.String("DomainName"),
&awsapigatewayv2.DomainNameProps{
EndpointType: awsapigatewayv2.EndpointType_REGIONAL,
DomainName: props.Host,
Certificate: awscertificatemanager.Certificate_FromCertificateArn(broker.Construct, jsii.String("X509"), props.TlsArn),
},
)

domain = &awsapigatewayv2.DomainMappingOptions{
DomainName: broker.domain,
}
}

awsapigatewayv2.NewWebSocketStage(broker.Construct, jsii.String("Stage"),
&awsapigatewayv2.WebSocketStageProps{
AutoDeploy: jsii.Bool(true),
StageName: jsii.String(stage),
Throttle: props.Throttle,
WebSocketApi: broker.Gateway,
AutoDeploy: jsii.Bool(true),
StageName: jsii.String(stage),
Throttle: props.Throttle,
WebSocketApi: broker.Gateway,
DomainMapping: domain,
},
)

if props.Host != nil && props.TlsArn != nil {
broker.createRoute53(*props.Host)
}

return broker.Gateway
}

func (broker *Broker) createRoute53(host string) {
domain := strings.Join(strings.Split(host, ".")[1:], ".")
zone := awsroute53.HostedZone_FromLookup(broker.Construct, jsii.String("HZone"),
&awsroute53.HostedZoneProviderProps{
DomainName: jsii.String(domain),
},
)

broker.dns = awsroute53.NewARecord(broker.Construct, jsii.String("ARecord"),
&awsroute53.ARecordProps{
RecordName: jsii.String(host),
Target: awsroute53.RecordTarget_FromAlias(
awsroute53targets.NewApiGatewayv2DomainProperties(
broker.domain.RegionalDomainName(),
broker.domain.RegionalHostedZoneId(),
),
),
Ttl: awscdk.Duration_Seconds(jsii.Number(60)),
Zone: zone,
},
)

}

func (broker *Broker) NewSink(props *SinkProps) *Sink {
if broker.Gateway == nil {
panic("Gatewaye is not defined.")
Expand Down
17 changes: 11 additions & 6 deletions broker/websocket/lambda/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ func main() {

lambda.Start(
func(evt events.APIGatewayV2CustomAuthorizerV1Request) (events.APIGatewayCustomAuthorizerResponse, error) {
tkn, hastkn := evt.QueryStringParameters["token"]
key, haskey := evt.QueryStringParameters["apikey"]
tkn, has := evt.QueryStringParameters["token"]
if !has || len(tkn) == 0 {
return None, ErrForbidden
}

if jwt != nil && hastkn {
if jwt != nil && strings.HasPrefix(tkn, "ey") {
principal, context, err := jwt.Validate(tkn)
if err != nil {
return None, ErrForbidden
Expand All @@ -46,8 +48,8 @@ func main() {
return AccessPolicy(principal, evt.MethodArn, context), nil
}

if basic != nil && haskey {
principal, context, err := basic.Validate(key)
if basic != nil {
principal, context, err := basic.Validate(tkn)
if err != nil {
return None, ErrForbidden
}
Expand Down Expand Up @@ -98,7 +100,10 @@ func NewAuthBasic() (*AuthBasic, error) {
return nil, errors.New("basic auth is not configured")
}

return &AuthBasic{}, nil
return &AuthBasic{
access: access,
secret: secret,
}, nil
}

func (auth *AuthBasic) Validate(apikey string) (string, map[string]any, error) {
Expand Down
2 changes: 1 addition & 1 deletion queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// Dequeue message
func Dequeue[T any](q swarm.Broker, category ...string) (<-chan swarm.Msg[T], chan<- swarm.Msg[T]) {
cat := categoryOf[T]()
cat := TypeOf[T]()
if len(category) > 0 {
cat = category[0]
}
Expand Down
4 changes: 2 additions & 2 deletions queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Create egress and dead-letter queue channels for the category
func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) {
cat := categoryOf[T]()
cat := TypeOf[T]()
if len(category) > 0 {
cat = category[0]
}
Expand All @@ -29,7 +29,7 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) {
}

// normalized type name
func categoryOf[T any]() string {
func TypeOf[T any]() string {
typ := reflect.TypeOf(new(T)).Elem()
cat := typ.Name()
if typ.Kind() == reflect.Ptr {
Expand Down
2 changes: 1 addition & 1 deletion queue/events/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// Dequeue event
func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan swarm.Msg[*E], chan<- swarm.Msg[*E]) {
catE := categoryOf[E]()
catE := TypeOf[E]()
if len(category) > 0 {
catE = category[0]
}
Expand Down
4 changes: 2 additions & 2 deletions queue/events/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// - to send messages
// - failed messages (dead-letter queue)
func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) {
catE := categoryOf[E]()
catE := TypeOf[E]()
if len(category) > 0 {
catE = category[0]
}
Expand All @@ -32,7 +32,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c
}

// normalized type name
func categoryOf[T any]() string {
func TypeOf[T any]() string {
typ := reflect.TypeOf(new(T)).Elem()
cat := typ.String()
if typ.Kind() == reflect.Ptr {
Expand Down
2 changes: 1 addition & 1 deletion queue/events/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (q queue[T, E]) Enq(cat string, object *E) error {
func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Enqueuer[T, E] {
k := q.(*kernel.Kernel)

catE := categoryOf[E]()
catE := TypeOf[E]()
if len(category) > 0 {
catE = category[0]
}
Expand Down
2 changes: 1 addition & 1 deletion queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (q queue[T]) Enq(cat string, object T) error {
func New[T any](q swarm.Broker, category ...string) Enqueuer[T] {
k := q.(*kernel.Kernel)

cat := categoryOf[T]()
cat := TypeOf[T]()
if len(category) > 0 {
cat = category[0]
}
Expand Down

0 comments on commit 3f48639

Please sign in to comment.