diff --git a/broker/websocket/awscdk.go b/broker/websocket/awscdk.go index 178b605..f8c95f6 100644 --- a/broker/websocket/awscdk.go +++ b/broker/websocket/awscdk.go @@ -16,7 +16,10 @@ import ( "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2" authorizers "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2authorizers" integrations "github.com/aws/aws-cdk-go/awscdk/v2/awsapigatewayv2integrations" + "github.com/aws/aws-cdk-go/awscdk/v2/awscertificatemanager" "github.com/aws/aws-cdk-go/awscdk/v2/awslambda" + "github.com/aws/aws-cdk-go/awscdk/v2/awsroute53" + "github.com/aws/aws-cdk-go/awscdk/v2/awsroute53targets" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/constructs-go/constructs/v10" "github.com/aws/jsii-runtime-go" @@ -91,6 +94,8 @@ type Broker struct { constructs.Construct Gateway awsapigatewayv2.WebSocketApi Authorizer awsapigatewayv2.IWebSocketRouteAuthorizer + domain awsapigatewayv2.DomainName + dns awsroute53.ARecord acc int } @@ -100,19 +105,28 @@ func NewBroker(scope constructs.Construct, id *string, props *BrokerProps) *Brok return broker } -func (broker *Broker) NewAuthorizerApiKey(access, secret string) awsapigatewayv2.IWebSocketRouteAuthorizer { +type AuthorizerApiKeyProps struct { + Access string + Secret string +} + +func (broker *Broker) NewAuthorizerApiKey(props *AuthorizerApiKeyProps) awsapigatewayv2.IWebSocketRouteAuthorizer { if broker.Gateway != nil { panic("Authorizer MUST be defined before the gateway is instantiated.") } + if props.Access == "" || props.Secret == "" { + panic("Authorizer MUST define access and secret api keys") + } + handler := scud.NewFunctionGo(broker.Construct, jsii.String("Authorizer"), &scud.FunctionGoProps{ SourceCodePackage: "github.com/fogfish/swarm", - SourceCodeLambda: "broker/websocket/lambda/authkey", + SourceCodeLambda: "broker/websocket/lambda/auth", FunctionProps: &awslambda.FunctionProps{ Environment: &map[string]*string{ - "CONFIG_SWARM_WS_AUTHORIZER_ACCESS": jsii.String(secret), - "CONFIG_SWARM_WS_AUTHORIZER_SECRET": jsii.String(secret), + "CONFIG_SWARM_WS_AUTHORIZER_ACCESS": jsii.String(props.Access), + "CONFIG_SWARM_WS_AUTHORIZER_SECRET": jsii.String(props.Secret), }, }, }, @@ -129,27 +143,84 @@ func (broker *Broker) NewAuthorizerApiKey(access, secret string) awsapigatewayv2 return broker.Authorizer } -func (broker *Broker) NewAuthorizerJWT(issuer, audience string) awsapigatewayv2.IWebSocketRouteAuthorizer { +type AuthorizerJwtProps struct { + Issuer string + Audience string +} + +func (broker *Broker) NewAuthorizerJwt(props *AuthorizerJwtProps) awsapigatewayv2.IWebSocketRouteAuthorizer { + if broker.Gateway != nil { + panic("Authorizer MUST be defined before the gateway is instantiated.") + } + + if !strings.HasPrefix(props.Issuer, "https://") { + panic("Issuer URL MUST start with https://") + } + + if !strings.HasSuffix(props.Issuer, "/") { + props.Issuer += "/" + } + + handler := scud.NewFunctionGo(broker.Construct, jsii.String("Authorizer"), + &scud.FunctionGoProps{ + SourceCodePackage: "github.com/fogfish/swarm", + SourceCodeLambda: "broker/websocket/lambda/auth", + FunctionProps: &awslambda.FunctionProps{ + Environment: &map[string]*string{ + "CONFIG_SWARM_WS_AUTHORIZER_ISS": jsii.String(props.Issuer), + "CONFIG_SWARM_WS_AUTHORIZER_AUD": jsii.String(props.Audience), + }, + }, + }, + ) + + broker.Authorizer = authorizers.NewWebSocketLambdaAuthorizer( + jsii.String("default"), + handler, + &authorizers.WebSocketLambdaAuthorizerProps{ + IdentitySource: jsii.Strings("route.request.querystring.token"), + }, + ) + + return broker.Authorizer +} + +type AuthorizerUniversalProps struct { + AuthorizerApiKey *AuthorizerApiKeyProps + AuthorizerJwt *AuthorizerJwtProps +} + +func (broker *Broker) NewAuthorizerUniversal(props *AuthorizerUniversalProps) awsapigatewayv2.IWebSocketRouteAuthorizer { if broker.Gateway != nil { panic("Authorizer MUST be defined before the gateway is instantiated.") } - if !strings.HasPrefix(issuer, "https://") { + if props.AuthorizerApiKey == nil || props.AuthorizerJwt == nil { + panic("Universal Authorizer requires definition of all members") + } + + if props.AuthorizerApiKey.Access == "" || props.AuthorizerApiKey.Secret == "" { + panic("Authorizer MUST define access and secret api keys") + } + + if !strings.HasPrefix(props.AuthorizerJwt.Issuer, "https://") { panic("Issuer URL MUST start with https://") } - if !strings.HasSuffix(issuer, "/") { - issuer += "/" + if !strings.HasSuffix(props.AuthorizerJwt.Issuer, "/") { + props.AuthorizerJwt.Issuer += "/" } handler := scud.NewFunctionGo(broker.Construct, jsii.String("Authorizer"), &scud.FunctionGoProps{ SourceCodePackage: "github.com/fogfish/swarm", - SourceCodeLambda: "broker/websocket/lambda/authjwt", + SourceCodeLambda: "broker/websocket/lambda/auth", FunctionProps: &awslambda.FunctionProps{ Environment: &map[string]*string{ - "CONFIG_SWARM_WS_AUTHORIZER_ISS": jsii.String(issuer), - "CONFIG_SWARM_WS_AUTHORIZER_AUD": jsii.String(audience), + "CONFIG_SWARM_WS_AUTHORIZER_ACCESS": jsii.String(props.AuthorizerApiKey.Access), + "CONFIG_SWARM_WS_AUTHORIZER_SECRET": jsii.String(props.AuthorizerApiKey.Secret), + "CONFIG_SWARM_WS_AUTHORIZER_ISS": jsii.String(props.AuthorizerJwt.Issuer), + "CONFIG_SWARM_WS_AUTHORIZER_AUD": jsii.String(props.AuthorizerJwt.Audience), }, }, }, @@ -169,6 +240,8 @@ func (broker *Broker) NewAuthorizerJWT(issuer, audience string) awsapigatewayv2. type WebSocketApiProps struct { *awsapigatewayv2.WebSocketApiProps Throttle *awsapigatewayv2.ThrottleSettings + Host *string + TlsArn *string } func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSocketApi { @@ -196,18 +269,62 @@ func (broker *Broker) NewGateway(props *WebSocketApiProps) awsapigatewayv2.WebSo broker.Gateway = awsapigatewayv2.NewWebSocketApi(broker.Construct, jsii.String("Gateway"), props.WebSocketApiProps) + var domain *awsapigatewayv2.DomainMappingOptions + if props.Host != nil && props.TlsArn != nil { + broker.domain = awsapigatewayv2.NewDomainName(broker.Construct, jsii.String("DomainName"), + &awsapigatewayv2.DomainNameProps{ + EndpointType: awsapigatewayv2.EndpointType_REGIONAL, + DomainName: props.Host, + Certificate: awscertificatemanager.Certificate_FromCertificateArn(broker.Construct, jsii.String("X509"), props.TlsArn), + }, + ) + + domain = &awsapigatewayv2.DomainMappingOptions{ + DomainName: broker.domain, + } + } + awsapigatewayv2.NewWebSocketStage(broker.Construct, jsii.String("Stage"), &awsapigatewayv2.WebSocketStageProps{ - AutoDeploy: jsii.Bool(true), - StageName: jsii.String(stage), - Throttle: props.Throttle, - WebSocketApi: broker.Gateway, + AutoDeploy: jsii.Bool(true), + StageName: jsii.String(stage), + Throttle: props.Throttle, + WebSocketApi: broker.Gateway, + DomainMapping: domain, }, ) + if props.Host != nil && props.TlsArn != nil { + broker.createRoute53(*props.Host) + } + return broker.Gateway } +func (broker *Broker) createRoute53(host string) { + domain := strings.Join(strings.Split(host, ".")[1:], ".") + zone := awsroute53.HostedZone_FromLookup(broker.Construct, jsii.String("HZone"), + &awsroute53.HostedZoneProviderProps{ + DomainName: jsii.String(domain), + }, + ) + + broker.dns = awsroute53.NewARecord(broker.Construct, jsii.String("ARecord"), + &awsroute53.ARecordProps{ + RecordName: jsii.String(host), + Target: awsroute53.RecordTarget_FromAlias( + awsroute53targets.NewApiGatewayv2DomainProperties( + broker.domain.RegionalDomainName(), + broker.domain.RegionalHostedZoneId(), + ), + ), + Ttl: awscdk.Duration_Seconds(jsii.Number(60)), + Zone: zone, + }, + ) + +} + func (broker *Broker) NewSink(props *SinkProps) *Sink { if broker.Gateway == nil { panic("Gatewaye is not defined.") diff --git a/broker/websocket/lambda/auth/auth.go b/broker/websocket/lambda/auth/auth.go new file mode 100644 index 0000000..2738650 --- /dev/null +++ b/broker/websocket/lambda/auth/auth.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "crypto/sha256" + "crypto/subtle" + "encoding/base64" + "errors" + "log/slog" + "net/url" + "os" + "strings" + "time" + + "github.com/auth0/go-jwt-middleware/v2/jwks" + "github.com/auth0/go-jwt-middleware/v2/validator" + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + _ "github.com/fogfish/logger/v3" +) + +func main() { + basic, err := NewAuthBasic() + if err != nil { + slog.Warn("Basic Auth disabled.") + basic = nil + } + + jwt, err := NewAuthJWT() + if err != nil { + slog.Warn("JWT Auth disabled.") + jwt = nil + } + + lambda.Start( + func(evt events.APIGatewayV2CustomAuthorizerV1Request) (events.APIGatewayCustomAuthorizerResponse, error) { + tkn, has := evt.QueryStringParameters["token"] + if !has || len(tkn) == 0 { + return None, ErrForbidden + } + + if jwt != nil && strings.HasPrefix(tkn, "ey") { + principal, context, err := jwt.Validate(tkn) + if err != nil { + return None, ErrForbidden + } + + return AccessPolicy(principal, evt.MethodArn, context), nil + } + + if basic != nil { + principal, context, err := basic.Validate(tkn) + if err != nil { + return None, ErrForbidden + } + + return AccessPolicy(principal, evt.MethodArn, context), nil + } + + return None, ErrForbidden + }, + ) + +} + +var ( + None = events.APIGatewayCustomAuthorizerResponse{} + ErrForbidden = errors.New("forbidden") +) + +//------------------------------------------------------------------------------ + +// Grant the access to WebSocket with the policy +func AccessPolicy(principal, method string, context map[string]any) events.APIGatewayCustomAuthorizerResponse { + return events.APIGatewayCustomAuthorizerResponse{ + PrincipalID: principal, + PolicyDocument: events.APIGatewayCustomAuthorizerPolicy{ + Version: "2012-10-17", + Statement: []events.IAMPolicyStatement{ + { + Action: []string{"execute-api:*"}, + Effect: "Allow", + Resource: []string{method}, + }, + }, + }, + Context: context, + } +} + +//------------------------------------------------------------------------------ + +type AuthBasic struct{ access, secret string } + +func NewAuthBasic() (*AuthBasic, error) { + access := os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_ACCESS") + secret := os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_SECRET") + + if access == "" || secret == "" { + return nil, errors.New("basic auth is not configured") + } + + return &AuthBasic{ + access: access, + secret: secret, + }, nil +} + +func (auth *AuthBasic) Validate(apikey string) (string, map[string]any, error) { + c, err := base64.RawStdEncoding.DecodeString(apikey) + if err != nil { + return "", nil, ErrForbidden + } + + access, secret, ok := strings.Cut(string(c), ":") + if !ok { + return "", nil, ErrForbidden + } + + gaccess := sha256.Sum256([]byte(access)) + gsecret := sha256.Sum256([]byte(secret)) + haccess := sha256.Sum256([]byte(auth.access)) + hsecret := sha256.Sum256([]byte(auth.secret)) + + accessMatch := (subtle.ConstantTimeCompare(gaccess[:], haccess[:]) == 1) + secretMatch := (subtle.ConstantTimeCompare(gsecret[:], hsecret[:]) == 1) + + if accessMatch && secretMatch { + return access, map[string]any{"auth": "basic"}, nil + } + + return "", nil, ErrForbidden +} + +//------------------------------------------------------------------------------ + +type AuthJWT struct { + *validator.Validator +} + +type Claims struct { + Scope string `json:"scope"` +} + +func (c Claims) Validate(ctx context.Context) error { return nil } + +func NewAuthJWT() (*AuthJWT, error) { + iss := os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_ISS") + aud := os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_AUD") + + if iss == "" || aud == "" { + return nil, errors.New("jwt auth is not configured") + } + + issuer, err := url.Parse(iss) + if err != nil { + return nil, err + } + + provider := jwks.NewCachingProvider(issuer, 5*time.Minute) + + auth, err := validator.New( + provider.KeyFunc, + validator.RS256, + iss, + []string{aud}, + validator.WithCustomClaims(func() validator.CustomClaims { return &Claims{} }), + validator.WithAllowedClockSkew(time.Minute), + ) + if err != nil { + return nil, err + } + + return &AuthJWT{Validator: auth}, nil +} + +func (auth *AuthJWT) Validate(token string) (string, map[string]any, error) { + claims, err := auth.ValidateToken(context.Background(), token) + if err != nil { + return "", nil, ErrForbidden + } + + switch c := claims.(type) { + case *validator.ValidatedClaims: + ctx := map[string]any{ + "iss": c.RegisteredClaims.Issuer, + "sub": c.RegisteredClaims.Subject, + // "aud": c.RegisteredClaims.Audience, + "exp": c.RegisteredClaims.Expiry, + "nbf": c.RegisteredClaims.NotBefore, + "iat": c.RegisteredClaims.IssuedAt, + "scope": c.CustomClaims.(*Claims).Scope, + "auth": "jwt", + } + + return c.RegisteredClaims.Subject, ctx, nil + default: + return "", nil, ErrForbidden + } +} diff --git a/broker/websocket/lambda/authjwt/authjwt.go b/broker/websocket/lambda/authjwt/authjwt.go deleted file mode 100644 index 8be09b4..0000000 --- a/broker/websocket/lambda/authjwt/authjwt.go +++ /dev/null @@ -1,98 +0,0 @@ -package main - -import ( - "context" - "errors" - "log/slog" - "net/url" - "os" - "time" - - "github.com/auth0/go-jwt-middleware/v2/jwks" - "github.com/auth0/go-jwt-middleware/v2/validator" - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" -) - -var ( - none = events.APIGatewayCustomAuthorizerResponse{} - errForbidden = errors.New("forbidden") -) - -type Claims struct { - Scope string `json:"scope"` -} - -func (c Claims) Validate(ctx context.Context) error { return nil } - -func main() { - uri := os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_ISS") - issuer, err := url.Parse(uri) - if err != nil { - slog.Error("Invalid issuer url.", "err", err, "url", uri) - panic(err) - } - - provider := jwks.NewCachingProvider(issuer, 5*time.Minute) - - auth, err := validator.New( - provider.KeyFunc, - validator.RS256, - uri, - []string{os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_AUD")}, - validator.WithCustomClaims(func() validator.CustomClaims { return &Claims{} }), - validator.WithAllowedClockSkew(time.Minute), - ) - if err != nil { - slog.Error("Validator failed.", "err", err) - panic(err) - } - - lambda.Start( - func(evt events.APIGatewayV2CustomAuthorizerV1Request) (events.APIGatewayCustomAuthorizerResponse, error) { - token, has := evt.QueryStringParameters["token"] - if !has { - return none, errForbidden - } - - claims, err := auth.ValidateToken(context.Background(), token) - if err != nil { - return none, errForbidden - } - - switch c := claims.(type) { - case *validator.ValidatedClaims: - ctx := map[string]any{ - "iss": c.RegisteredClaims.Issuer, - "sub": c.RegisteredClaims.Subject, - // "aud": c.RegisteredClaims.Audience, - "exp": c.RegisteredClaims.Expiry, - "nbf": c.RegisteredClaims.NotBefore, - "iat": c.RegisteredClaims.IssuedAt, - "scope": c.CustomClaims.(*Claims).Scope, - } - - return accessPolicy(c.RegisteredClaims.Subject, evt.MethodArn, ctx), nil - default: - return none, errForbidden - } - }, - ) -} - -func accessPolicy(principal, method string, context map[string]any) events.APIGatewayCustomAuthorizerResponse { - return events.APIGatewayCustomAuthorizerResponse{ - PrincipalID: principal, - PolicyDocument: events.APIGatewayCustomAuthorizerPolicy{ - Version: "2012-10-17", - Statement: []events.IAMPolicyStatement{ - { - Action: []string{"execute-api:*"}, - Effect: "Allow", - Resource: []string{method}, - }, - }, - }, - Context: context, - } -} diff --git a/broker/websocket/lambda/authkey/authkey.go b/broker/websocket/lambda/authkey/authkey.go deleted file mode 100644 index 7f59f66..0000000 --- a/broker/websocket/lambda/authkey/authkey.go +++ /dev/null @@ -1,88 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package main - -import ( - "crypto/sha256" - "crypto/subtle" - "encoding/base64" - "errors" - "os" - "strings" - - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" -) - -var ( - none = events.APIGatewayCustomAuthorizerResponse{} - errForbidden = errors.New("forbidden") -) - -// Inspired by https://gist.github.com/praveen001/1b045d1c31cd9c72e4e6638e9f883f83 - -func main() { - lambda.Start( - func(evt events.APIGatewayV2CustomAuthorizerV1Request) (events.APIGatewayCustomAuthorizerResponse, error) { - key, has := evt.QueryStringParameters["apikey"] - if !has { - return none, errForbidden - } - - if principal, context, err := validate(key); err == nil { - return accessPolicy(principal, evt.MethodArn, context), nil - } - return none, errForbidden - - }, - ) -} - -func validate(apikey string) (string, map[string]any, error) { - c, err := base64.RawStdEncoding.DecodeString(apikey) - if err != nil { - return "", nil, errForbidden - } - - access, secret, ok := strings.Cut(string(c), ":") - if !ok { - return "", nil, errForbidden - } - - gaccess := sha256.Sum256([]byte(access)) - gsecret := sha256.Sum256([]byte(secret)) - haccess := sha256.Sum256([]byte(os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_ACCESS"))) - hsecret := sha256.Sum256([]byte(os.Getenv("CONFIG_SWARM_WS_AUTHORIZER_SECRET"))) - - accessMatch := (subtle.ConstantTimeCompare(gaccess[:], haccess[:]) == 1) - secretMatch := (subtle.ConstantTimeCompare(gsecret[:], hsecret[:]) == 1) - - if accessMatch && secretMatch { - return access, map[string]any{}, nil - } - - return "", nil, errForbidden -} - -func accessPolicy(principal, method string, context map[string]any) events.APIGatewayCustomAuthorizerResponse { - return events.APIGatewayCustomAuthorizerResponse{ - PrincipalID: principal, - PolicyDocument: events.APIGatewayCustomAuthorizerPolicy{ - Version: "2012-10-17", - Statement: []events.IAMPolicyStatement{ - { - Action: []string{"execute-api:*"}, - Effect: "Allow", - Resource: []string{method}, - }, - }, - }, - Context: context, - } -} diff --git a/examples/websocket/serverless/main.go b/examples/websocket/serverless/main.go index 1192d37..93c1a3a 100644 --- a/examples/websocket/serverless/main.go +++ b/examples/websocket/serverless/main.go @@ -30,7 +30,12 @@ func main() { ) broker := websocket.NewBroker(stack, jsii.String("Broker"), nil) - broker.NewAuthorizerApiKey("test", "test") + broker.NewAuthorizerApiKey( + &websocket.AuthorizerApiKeyProps{ + Access: "test", + Secret: "test", + }, + ) broker.NewGateway(&websocket.WebSocketApiProps{ Throttle: &awsapigatewayv2.ThrottleSettings{ diff --git a/go.mod b/go.mod index d7a324b..b7d0150 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/fogfish/guid/v2 v2.0.4 github.com/fogfish/it v1.0.0 github.com/fogfish/it/v2 v2.0.1 + github.com/fogfish/logger/v3 v3.1.0 github.com/fogfish/scud v0.6.1 ) diff --git a/go.sum b/go.sum index 51a475a..1eeaa17 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/fogfish/it v1.0.0 h1:kiwFHZcrkRLUydZoIoY0gTuMfj38trwvLo0YRyIkeG8= github.com/fogfish/it v1.0.0/go.mod h1:NQJG4Ygvek85y7zGj0Gny8+6ygAnHjfBORhI7TdQhp4= github.com/fogfish/it/v2 v2.0.1 h1:vu3kV2xzYDPHoMHMABxXeu5CoMcTfRc4gkWkzOUkRJY= github.com/fogfish/it/v2 v2.0.1/go.mod h1:h5FdKaEQT4sUEykiVkB8VV4jX27XabFVeWhoDZaRZtE= +github.com/fogfish/logger/v3 v3.1.0 h1:Kl3atzqBxfOkUDdFJqG8wFpzvzfhm+DHwSBoFtA6xPI= +github.com/fogfish/logger/v3 v3.1.0/go.mod h1:hsucoJz/3OX90UdYrXykcKvjjteBnPcYSTr4Rie0ZqU= github.com/fogfish/scud v0.6.1 h1:Z799o7WlbFLZWcM7ynE0Td+h+Lc7893Ui+CYxSzTcd4= github.com/fogfish/scud v0.6.1/go.mod h1:jM6+Iufr6K9ScxkySmGgoAyL1Udi8bCaYw2vsGvfh9Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= diff --git a/queue/dequeue.go b/queue/dequeue.go index b8e281e..76f03b9 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -15,7 +15,7 @@ import ( // Dequeue message func Dequeue[T any](q swarm.Broker, category ...string) (<-chan swarm.Msg[T], chan<- swarm.Msg[T]) { - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] } diff --git a/queue/enqueue.go b/queue/enqueue.go index a097659..4d2687c 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -18,7 +18,7 @@ import ( // Create egress and dead-letter queue channels for the category func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] } @@ -29,7 +29,7 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { } // normalized type name -func categoryOf[T any]() string { +func TypeOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() cat := typ.Name() if typ.Kind() == reflect.Ptr { diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index d3be6e7..03299b3 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -15,7 +15,7 @@ import ( // Dequeue event func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan swarm.Msg[*E], chan<- swarm.Msg[*E]) { - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 5178e57..9339e5d 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -20,7 +20,7 @@ import ( // - to send messages // - failed messages (dead-letter queue) func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) { - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } @@ -32,7 +32,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c } // normalized type name -func categoryOf[T any]() string { +func TypeOf[T any]() string { typ := reflect.TypeOf(new(T)).Elem() cat := typ.String() if typ.Kind() == reflect.Ptr { diff --git a/queue/events/queue.go b/queue/events/queue.go index 99c9475..9d75b49 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -50,7 +50,7 @@ func (q queue[T, E]) Enq(cat string, object *E) error { func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Enqueuer[T, E] { k := q.(*kernel.Kernel) - catE := categoryOf[E]() + catE := TypeOf[E]() if len(category) > 0 { catE = category[0] } diff --git a/queue/queue.go b/queue/queue.go index 7270399..dbe7f70 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -52,7 +52,7 @@ func (q queue[T]) Enq(cat string, object T) error { func New[T any](q swarm.Broker, category ...string) Enqueuer[T] { k := q.(*kernel.Kernel) - cat := categoryOf[T]() + cat := TypeOf[T]() if len(category) > 0 { cat = category[0] }