Skip to content

Commit

Permalink
Codec for binary serialization into json protocol (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Sep 25, 2024
1 parent e5819f5 commit 2eca42c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 37 deletions.
5 changes: 4 additions & 1 deletion broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ func (cli *Client) Enq(bag swarm.Bag) error {
}

if ret.FailedEntryCount > 0 {
return fmt.Errorf("%v: %v", ret.Entries[0].ErrorCode, ret.Entries[0].ErrorMessage)
return fmt.Errorf("%s: %s",
aws.ToString(ret.Entries[0].ErrorCode),
aws.ToString(ret.Entries[0].ErrorMessage),
)
}

return nil
Expand Down
48 changes: 15 additions & 33 deletions broker/websocket/awscdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/constructs-go/constructs/v10"
"github.com/aws/jsii-runtime-go"
"github.com/fogfish/golem/optics"
"github.com/fogfish/scud"
)

Expand Down Expand Up @@ -67,43 +66,26 @@ func NewSink(scope constructs.Construct, id *string, props *SinkProps) *Sink {
return sink
}

var (
lensFunction = optics.ForProduct1[scud.FunctionGoProps, *map[string]*string]("Environment")
lensContainer = optics.ForProduct1[scud.ContainerGoProps, *map[string]*string]("Environment")
)

func defineLambdaEnvironment[T any](lens optics.Lens[T, *map[string]*string], props *SinkProps, fprops *T) {
env := lens.Get(fprops)

if env == nil {
env = &map[string]*string{}
}

if _, has := (*env)["CONFIG_SWARM_WS_EVENT_TYPE"]; !has {
(*env)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
}

if _, has := (*env)["CONFIG_SWARM_WS_URL"]; !has {
url := aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage
(*env)["CONFIG_SWARM_WS_URL"] = aws.String(url)
}

lens.Put(fprops, env)
}

func defaultEnvironment(props *SinkProps) {
switch fprops := props.Function.(type) {
switch f := props.Function.(type) {
case *scud.FunctionGoProps:
if fprops.FunctionProps == nil {
fprops.FunctionProps = &awslambda.FunctionProps{}
if f.FunctionProps == nil {
f.FunctionProps = &awslambda.FunctionProps{}
}

defineLambdaEnvironment(lensFunction, props, fprops)
if f.FunctionProps.Environment == nil {
f.FunctionProps.Environment = &map[string]*string{}
}
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.FunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
case *scud.ContainerGoProps:
if fprops.DockerImageFunctionProps == nil {
fprops.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{}
if f.DockerImageFunctionProps == nil {
f.DockerImageFunctionProps = &awslambda.DockerImageFunctionProps{}
}
if f.DockerImageFunctionProps.Environment == nil {
f.DockerImageFunctionProps.Environment = &map[string]*string{}
}
defineLambdaEnvironment(lensContainer, props, fprops)
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_EVENT_TYPE"] = jsii.String(props.Route)
(*f.DockerImageFunctionProps.Environment)["CONFIG_SWARM_WS_URL"] = jsii.String(aws.ToString(props.Gateway.ApiEndpoint()) + "/" + stage)
}
}

Expand Down
21 changes: 21 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ func NewCodecByte() CodecByte { return CodecByte{} }

//------------------------------------------------------------------------------

// Encode Bytes as "JSON packet"
type CodecPacket struct{}

type packet struct {
Octets []byte `json:"p,omitempty"`
}

func (CodecPacket) Encode(x []byte) ([]byte, error) {
b, err := json.Marshal(packet{Octets: x})
return b, err
}
func (CodecPacket) Decode(x []byte) ([]byte, error) {
var pckt packet
err := json.Unmarshal(x, &pckt)
return pckt.Octets, err
}

func NewCodecPacket() CodecPacket { return CodecPacket{} }

//------------------------------------------------------------------------------

// Event codec for I/O kernel
type CodecEvent[T any, E EventKind[T]] struct {
source string
Expand Down
2 changes: 1 addition & 1 deletion internal/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Kernel struct {
sync.WaitGroup
sync.RWMutex

// Kernel configuartion
// Kernel configuration
Config swarm.Config

// Control-plane stop channel. It is used to notify the kernel to terminate.
Expand Down
9 changes: 7 additions & 2 deletions queue/bytes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ type queue struct {
func (q queue) Put(object []byte) error { return q.Enq(q.cat, object) }

func (q queue) Enq(cat string, object []byte) error {
obj, err := q.codec.Encode(object)
if err != nil {
return err
}

ctx := swarm.NewContext(context.Background(), cat, "")
bag := swarm.Bag{Ctx: ctx, Object: object}
bag := swarm.Bag{Ctx: ctx, Object: obj}

err := q.emit.Enq(bag)
err = q.emit.Enq(bag)
if err != nil {
return err
}
Expand Down

0 comments on commit 2eca42c

Please sign in to comment.