From 2eca42c2c680ee337688f01f56e3b4c89412ae21 Mon Sep 17 00:00:00 2001 From: fogfish Date: Wed, 25 Sep 2024 18:52:41 +0300 Subject: [PATCH] Codec for binary serialization into json protocol (#72) --- broker/eventbridge/eventbridge.go | 5 +++- broker/websocket/awscdk.go | 48 ++++++++++--------------------- codec.go | 21 ++++++++++++++ internal/kernel/kernel.go | 2 +- queue/bytes/queue.go | 9 ++++-- 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/broker/eventbridge/eventbridge.go b/broker/eventbridge/eventbridge.go index 55a6cac..5772bac 100644 --- a/broker/eventbridge/eventbridge.go +++ b/broker/eventbridge/eventbridge.go @@ -115,7 +115,10 @@ func (cli *Client) Enq(bag swarm.Bag) error { } if ret.FailedEntryCount > 0 { - return fmt.Errorf("%v: %v", ret.Entries[0].ErrorCode, ret.Entries[0].ErrorMessage) + return fmt.Errorf("%s: %s", + aws.ToString(ret.Entries[0].ErrorCode), + aws.ToString(ret.Entries[0].ErrorMessage), + ) } return nil diff --git a/broker/websocket/awscdk.go b/broker/websocket/awscdk.go index 5eac1bc..b453380 100644 --- a/broker/websocket/awscdk.go +++ b/broker/websocket/awscdk.go @@ -23,7 +23,6 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/constructs-go/constructs/v10" "github.com/aws/jsii-runtime-go" - "github.com/fogfish/golem/optics" "github.com/fogfish/scud" ) @@ -67,43 +66,26 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink { return sink } -var ( - lensFunction = optics.ForProduct1[scud.FunctionGoProps, *map[string]*string]("Environment") - lensContainer = optics.ForProduct1[scud.ContainerGoProps, *map[string]*string]("Environment") -) - -func defineLambdaEnvironment[T any](lens optics.Lens[T, *map[string]*string], props *SinkProps, fprops *T) { - env := lens.Get(fprops) - - if env == nil { - env = &map[string]*string{} - } - - if _, has := (*env)["CONFIG_SWARM_WS_EVENT_TYPE"]; !has { - (*env)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route) - } - - if _, has := (*env)["CONFIG_SWARM_WS_URL"]; !has { - url := aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage - (*env)["CONFIG_SWARM_WS_URL"] = aws.String(url) - } - - lens.Put(fprops, env) -} - func defaultEnvironment(props *SinkProps) { - switch fprops := props.Function.(type) { + switch f := props.Function.(type) { case *scud.FunctionGoProps: - if fprops.FunctionProps == nil { - fprops.FunctionProps = &awslambda.FunctionProps{} + if f.FunctionProps == nil { + f.FunctionProps = &awslambda.FunctionProps{} } - - defineLambdaEnvironment(lensFunction, props, fprops) + if f.FunctionProps.Environment == nil { + f.FunctionProps.Environment = &map[string]*string{} + } + (*f.FunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route) + (*f.FunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage) case *scud.ContainerGoProps: - if fprops.DockerImageFunctionProps == nil { - fprops.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{} + if f.DockerImageFunctionProps == nil { + f.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{} + } + if f.DockerImageFunctionProps.Environment == nil { + f.DockerImageFunctionProps.Environment = &map[string]*string{} } - defineLambdaEnvironment(lensContainer, props, fprops) + (*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route) + (*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage) } } diff --git a/codec.go b/codec.go index 62e99eb..94a1fe5 100644 --- a/codec.go +++ b/codec.go @@ -45,6 +45,27 @@ func NewCodecByte() CodecByte { return CodecByte{} } //------------------------------------------------------------------------------ +// Encode Bytes as "JSON packet" +type CodecPacket struct{} + +type packet struct { + Octets []byte `json:"p,omitempty"` +} + +func (CodecPacket) Encode(x []byte) ([]byte, error) { + b, err := json.Marshal(packet{Octets: x}) + return b, err +} +func (CodecPacket) Decode(x []byte) ([]byte, error) { + var pckt packet + err := json.Unmarshal(x, &pckt) + return pckt.Octets, err +} + +func NewCodecPacket() CodecPacket { return CodecPacket{} } + +//------------------------------------------------------------------------------ + // Event codec for I/O kernel type CodecEvent[T any, E EventKind[T]] struct { source string diff --git a/internal/kernel/kernel.go b/internal/kernel/kernel.go index 21333b6..f28adb8 100644 --- a/internal/kernel/kernel.go +++ b/internal/kernel/kernel.go @@ -44,7 +44,7 @@ type Kernel struct { sync.WaitGroup sync.RWMutex - // Kernel configuartion + // Kernel configuration Config swarm.Config // Control-plane stop channel. It is used to notify the kernel to terminate. diff --git a/queue/bytes/queue.go b/queue/bytes/queue.go index 11e80c0..f9ba234 100644 --- a/queue/bytes/queue.go +++ b/queue/bytes/queue.go @@ -30,10 +30,15 @@ type queue struct { func (q queue) Put(object []byte) error { return q.Enq(q.cat, object) } func (q queue) Enq(cat string, object []byte) error { + obj, err := q.codec.Encode(object) + if err != nil { + return err + } + ctx := swarm.NewContext(context.Background(), cat, "") - bag := swarm.Bag{Ctx: ctx, Object: object} + bag := swarm.Bag{Ctx: ctx, Object: obj} - err := q.emit.Enq(bag) + err = q.emit.Enq(bag) if err != nil { return err }