diff --git a/bag.go b/bag.go index b216d44..a9999ef 100644 --- a/bag.go +++ b/bag.go @@ -33,7 +33,6 @@ func (msg *Msg[T]) Fail(err error) *Msg[T] { // Bag is used by the transport to abstract message on the wire. type Bag struct { Category string - Event any Object []byte Digest Digest } diff --git a/broker.go b/broker.go deleted file mode 100644 index 338fb0c..0000000 --- a/broker.go +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package swarm - -type Broker interface { - Config() Config - Close() - DSync() - Await() - Enqueue(string, Channel) Enqueue - Dequeue(string, Channel) Dequeue -} - -type Enqueue interface { - Enq(Bag) error -} - -type Dequeue interface { - Deq(string) (Bag, error) - Ack(Bag) error -} diff --git a/broker/eventbridge/broker.go b/broker/eventbridge/broker.go deleted file mode 100644 index 0c3001c..0000000 --- a/broker/eventbridge/broker.go +++ /dev/null @@ -1,114 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package eventbridge - -import ( - "context" - "log/slog" - - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" - "github.com/aws/aws-sdk-go-v2/service/eventbridge" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/router" -) - -// EventBridge declares the subset of interface from AWS SDK used by the lib. -type EventBridge interface { - PutEvents(context.Context, *eventbridge.PutEventsInput, ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) -} - -type broker struct { - config swarm.Config - client *client - channels *swarm.Channels - context context.Context - cancel context.CancelFunc - router *router.Router -} - -// New create broker for AWS EventBridge service -func New(bus string, opts ...swarm.Option) (swarm.Broker, error) { - conf := swarm.NewConfig() - for _, opt := range opts { - opt(&conf) - } - - cli, err := newClient(bus, &conf) - if err != nil { - return nil, err - } - - ctx, can := context.WithCancel(context.Background()) - - slog.Info("Broker is created", "type", "eventbridge") - return &broker{ - config: conf, - client: cli, - channels: swarm.NewChannels(), - context: ctx, - cancel: can, - router: router.New(&conf, nil), - }, nil -} - -func (b *broker) Config() swarm.Config { - return b.config -} - -func (b *broker) Close() { - b.channels.Sync() - b.channels.Close() - b.cancel() -} - -func (b *broker) DSync() { - b.channels.Sync() -} - -func (b *broker) Await() { - starter := lambda.Start - - type Mock interface{ Start(interface{}) } - if b.config.Service != nil { - service, ok := b.config.Service.(Mock) - if ok { - starter = service.Start - } - } - - starter( - func(evt events.CloudWatchEvent) error { - bag := swarm.Bag{ - Category: evt.DetailType, - Object: evt.Detail, - Digest: swarm.Digest{Brief: evt.ID}, - } - - if err := b.router.Dispatch(bag); err != nil { - return err - } - - return b.router.Await(b.config.TimeToFlight) - }, - ) -} - -func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { - b.channels.Attach(category, channel) - - return b.client -} - -func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { - b.channels.Attach(category, channel) - b.router.Register(category) - - return b.router -} diff --git a/broker/eventbridge/eventbridge.go b/broker/eventbridge/eventbridge.go index 2c88eea..025e357 100644 --- a/broker/eventbridge/eventbridge.go +++ b/broker/eventbridge/eventbridge.go @@ -12,26 +12,65 @@ import ( "context" "fmt" + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/eventbridge" "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" ) -type client struct { +// EventBridge declares the subset of interface from AWS SDK used by the lib. +type EventBridge interface { + PutEvents(context.Context, *eventbridge.PutEventsInput, ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) +} + +type Client struct { service EventBridge bus string - config *swarm.Config + config swarm.Config } -func newClient(bus string, config *swarm.Config) (*client, error) { - api, err := newService(config) +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + cli, err := NewEventBridge(queue, opts...) if err != nil { return nil, err } - return &client{ + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + starter := lambda.Start + + type Mock interface{ Start(interface{}) } + if config.Service != nil { + service, ok := config.Service.(Mock) + if ok { + starter = service.Start + } + } + + sls := spawner{f: starter, c: config} + + return kernel.New(cli, sls, config), err +} + +func NewEventBridge(bus string, opts ...swarm.Option) (*Client, error) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + api, err := newService(&config) + if err != nil { + return nil, err + } + + return &Client{ service: api, bus: bus, config: config, @@ -55,7 +94,7 @@ func newService(conf *swarm.Config) (EventBridge, error) { } // Enq enqueues message to broker -func (cli *client) Enq(bag swarm.Bag) error { +func (cli *Client) Enq(bag swarm.Bag) error { ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) defer cancel() @@ -81,3 +120,30 @@ func (cli *client) Enq(bag swarm.Bag) error { return nil } + +//------------------------------------------------------------------------------ + +type spawner struct { + c swarm.Config + f func(any) +} + +func (s spawner) Spawn(k *kernel.Kernel) error { + s.f( + func(evt events.CloudWatchEvent) error { + bag := make([]swarm.Bag, 1) + bag[0] = swarm.Bag{ + Category: evt.DetailType, + Object: evt.Detail, + Digest: swarm.Digest{Brief: evt.ID}, + } + + return k.Dispatch(bag, s.c.TimeToFlight) + }, + ) + + return nil +} + +func (s spawner) Ack(digest string) error { return nil } +func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } diff --git a/broker/eventddb/broker.go b/broker/eventddb/broker.go deleted file mode 100644 index 004a8b3..0000000 --- a/broker/eventddb/broker.go +++ /dev/null @@ -1,106 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package eventddb - -import ( - "context" - "log/slog" - - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" - "github.com/fogfish/curie" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/router" -) - -// New creates broker for AWS EventBridge -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - conf := swarm.NewConfig() - for _, opt := range opts { - opt(&conf) - } - - ctx, can := context.WithCancel(context.Background()) - - slog.Info("Broker is created", "type", "ddbstream") - return &broker{ - config: conf, - channels: swarm.NewChannels(), - context: ctx, - cancel: can, - router: router.New(&conf, nil), - }, nil -} - -type broker struct { - config swarm.Config - channels *swarm.Channels - context context.Context - cancel context.CancelFunc - router *router.Router -} - -func (b *broker) Config() swarm.Config { - return b.config -} - -func (b *broker) Close() { - b.channels.Sync() - b.channels.Close() - b.cancel() -} - -func (b *broker) DSync() { - b.channels.Sync() -} - -func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { - panic("not implemented") -} - -func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { - b.channels.Attach(category, channel) - b.router.Register(category) - - return b.router -} - -func (b *broker) Await() { - starter := lambda.Start - - type Mock interface{ Start(interface{}) } - if b.config.Service != nil { - service, ok := b.config.Service.(Mock) - if ok { - starter = service.Start - } - } - - starter( - func(events events.DynamoDBEvent) error { - for _, evt := range events.Records { - bag := swarm.Bag{ - Category: Category, - Event: &Event{ - ID: evt.EventID, - Type: curie.IRI(evt.EventName), - Agent: curie.IRI(evt.EventSourceArn), - Object: &evt, - }, - Digest: swarm.Digest{Brief: evt.EventID}, - } - if err := b.router.Dispatch(bag); err != nil { - return err - } - } - - return b.router.Await(b.config.TimeToFlight) - }, - ) -} diff --git a/broker/eventddb/eventddb.go b/broker/eventddb/eventddb.go new file mode 100644 index 0000000..1083194 --- /dev/null +++ b/broker/eventddb/eventddb.go @@ -0,0 +1,73 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package eventddb + +import ( + "encoding/json" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/fogfish/guid/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" +) + +// New creates broker for AWS EventBridge +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + starter := lambda.Start + + type Mock interface{ Start(interface{}) } + if config.Service != nil { + service, ok := config.Service.(Mock) + if ok { + starter = service.Start + } + } + + sls := spawner{f: starter, c: config} + + return kernel.New(nil, sls, config), nil +} + +//------------------------------------------------------------------------------ + +type spawner struct { + c swarm.Config + f func(any) +} + +type DynamoDBEvent struct { + Records []json.RawMessage `json:"Records"` +} + +func (s spawner) Spawn(k *kernel.Kernel) error { + s.f( + func(events DynamoDBEvent) error { + bag := make([]swarm.Bag, len(events.Records)) + for i, obj := range events.Records { + bag[i] = swarm.Bag{ + Category: Category, + Object: obj, + Digest: swarm.Digest{Brief: guid.G(guid.Clock).String()}, + } + } + + return k.Dispatch(bag, s.c.TimeToFlight) + }, + ) + + return nil +} + +func (s spawner) Ack(digest string) error { return nil } +func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } diff --git a/broker/eventddb/type.go b/broker/eventddb/type.go index dead4e3..eb715aa 100644 --- a/broker/eventddb/type.go +++ b/broker/eventddb/type.go @@ -11,16 +11,11 @@ package eventddb import ( "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" - queue "github.com/fogfish/swarm/queue/events" + queue "github.com/fogfish/swarm/queue" ) -const Category = "eventddb.Event" +const Category = "DynamoDBEventRecord" -type Event swarm.Event[*events.DynamoDBEventRecord] - -func (Event) HKT1(swarm.EventType) {} -func (Event) HKT2(*events.DynamoDBEventRecord) {} - -func Dequeue(q swarm.Broker) (<-chan *Event, chan<- *Event) { - return queue.Dequeue[*events.DynamoDBEventRecord, Event](q) +func Dequeue(q swarm.Broker) (<-chan swarm.Msg[*events.DynamoDBEventRecord], chan<- swarm.Msg[*events.DynamoDBEventRecord]) { + return queue.Dequeue[*events.DynamoDBEventRecord](q) } diff --git a/broker/events3/broker.go b/broker/events3/broker.go deleted file mode 100644 index 9c60281..0000000 --- a/broker/events3/broker.go +++ /dev/null @@ -1,101 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package events3 - -import ( - "context" - "log/slog" - - "github.com/aws/aws-lambda-go/events" - "github.com/aws/aws-lambda-go/lambda" - "github.com/fogfish/guid/v2" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/router" -) - -// New creates broker for AWS EventBridge -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - conf := swarm.NewConfig() - for _, opt := range opts { - opt(&conf) - } - - ctx, can := context.WithCancel(context.Background()) - - slog.Info("Broker is created", "type", "event-s3") - return &broker{ - config: conf, - channels: swarm.NewChannels(), - context: ctx, - cancel: can, - router: router.New(&conf, nil), - }, nil -} - -type broker struct { - config swarm.Config - channels *swarm.Channels - context context.Context - cancel context.CancelFunc - router *router.Router -} - -func (b *broker) Config() swarm.Config { - return b.config -} - -func (b *broker) Close() { - b.channels.Sync() - b.channels.Close() - b.cancel() -} - -func (b *broker) DSync() { - b.channels.Sync() -} - -func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { - panic("not implemented") -} - -func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { - b.channels.Attach(category, channel) - b.router.Register(category) - - return b.router -} - -func (b *broker) Await() { - starter := lambda.Start - - type Mock interface{ Start(interface{}) } - if b.config.Service != nil { - service, ok := b.config.Service.(Mock) - if ok { - starter = service.Start - } - } - - starter( - func(events events.S3Event) error { - for _, evt := range events.Records { - bag := swarm.Bag{ - Category: Category, - Event: &Event{Object: &evt}, - Digest: swarm.Digest{Brief: guid.G(guid.Clock).String()}, - } - if err := b.router.Dispatch(bag); err != nil { - return err - } - } - - return b.router.Await(b.config.TimeToFlight) - }, - ) -} diff --git a/broker/events3/events3.go b/broker/events3/events3.go new file mode 100644 index 0000000..cd58509 --- /dev/null +++ b/broker/events3/events3.go @@ -0,0 +1,73 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package events3 + +import ( + "encoding/json" + + "github.com/aws/aws-lambda-go/lambda" + "github.com/fogfish/guid/v2" + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" +) + +// New creates broker for AWS S3 (serverless events) +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + starter := lambda.Start + + type Mock interface{ Start(interface{}) } + if config.Service != nil { + service, ok := config.Service.(Mock) + if ok { + starter = service.Start + } + } + + sls := spawner{f: starter, c: config} + + return kernel.New(nil, sls, config), nil +} + +//------------------------------------------------------------------------------ + +type spawner struct { + c swarm.Config + f func(any) +} + +type S3Event struct { + Records []json.RawMessage `json:"Records"` +} + +func (s spawner) Spawn(k *kernel.Kernel) error { + s.f( + func(events S3Event) error { + bag := make([]swarm.Bag, len(events.Records)) + for i, obj := range events.Records { + bag[i] = swarm.Bag{ + Category: Category, + Object: obj, + Digest: swarm.Digest{Brief: guid.G(guid.Clock).String()}, + } + } + + return k.Dispatch(bag, s.c.TimeToFlight) + }, + ) + + return nil +} + +func (s spawner) Ack(digest string) error { return nil } +func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } diff --git a/broker/events3/type.go b/broker/events3/type.go index 8dc09c4..64d9f8d 100644 --- a/broker/events3/type.go +++ b/broker/events3/type.go @@ -3,16 +3,11 @@ package events3 import ( "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" - queue "github.com/fogfish/swarm/queue/events" + queue "github.com/fogfish/swarm/queue" ) -const Category = "events3.Event" +const Category = "S3EventRecord" -type Event swarm.Event[*events.S3EventRecord] - -func (Event) HKT1(swarm.EventType) {} -func (Event) HKT2(*events.S3EventRecord) {} - -func Dequeue(q swarm.Broker) (<-chan *Event, chan<- *Event) { - return queue.Dequeue[*events.S3EventRecord, Event](q) +func Dequeue(q swarm.Broker) (<-chan swarm.Msg[*events.S3EventRecord], chan<- swarm.Msg[*events.S3EventRecord]) { + return queue.Dequeue[*events.S3EventRecord](q) } diff --git a/broker/eventsqs/broker.go b/broker/eventsqs/eventsqs.go similarity index 51% rename from broker/eventsqs/broker.go rename to broker/eventsqs/eventsqs.go index 329c2d1..393c0b0 100644 --- a/broker/eventsqs/broker.go +++ b/broker/eventsqs/eventsqs.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -9,77 +9,69 @@ package eventsqs import ( - "log/slog" - "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/internal/router" + "github.com/fogfish/swarm/internal/kernel" ) -// New creates broker for AWS EventBridge +// New creates broker for AWS SQS (serverless events) func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - conf := swarm.NewConfig() - for _, opt := range opts { - opt(&conf) - } - - bro, err := sqs.New(queue, opts...) + cli, err := sqs.NewSQS(queue, opts...) if err != nil { return nil, err } - slog.Info("Broker is created", "type", "event-sqs") - return &broker{ - Broker: bro, - config: conf, - router: router.New(&conf, nil), - }, nil -} - -type broker struct { - swarm.Broker - config swarm.Config - router *router.Router -} - -func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { - b.Broker.Dequeue(category, channel) - b.router.Register(category) - - return b.router -} + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } -func (b *broker) Await() { starter := lambda.Start type Mock interface{ Start(interface{}) } - if b.config.Service != nil { - service, ok := b.config.Service.(Mock) + if config.Service != nil { + service, ok := config.Service.(Mock) if ok { starter = service.Start } } - starter( + sls := spawner{f: starter, c: config} + + return kernel.New(cli, sls, config), nil +} + +//------------------------------------------------------------------------------ + +type spawner struct { + c swarm.Config + f func(any) +} + +func (s spawner) Spawn(k *kernel.Kernel) error { + s.f( func(events events.SQSEvent) error { - for _, evt := range events.Records { - bag := swarm.Bag{ + bag := make([]swarm.Bag, len(events.Records)) + for i, evt := range events.Records { + bag[i] = swarm.Bag{ Category: attr(&evt, "Category"), Object: []byte(evt.Body), Digest: swarm.Digest{Brief: evt.ReceiptHandle}, } - if err := b.router.Dispatch(bag); err != nil { - return err - } } - return b.router.Await(b.config.TimeToFlight) + return k.Dispatch(bag, s.c.TimeToFlight) }, ) + + return nil } +func (s spawner) Ack(digest string) error { return nil } +func (s spawner) Ask() ([]swarm.Bag, error) { return nil, nil } + func attr(msg *events.SQSMessage, key string) string { val, exists := msg.MessageAttributes[key] if !exists { diff --git a/broker/sqs/broker.go b/broker/sqs/broker.go deleted file mode 100644 index 9b90e35..0000000 --- a/broker/sqs/broker.go +++ /dev/null @@ -1,124 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package sqs - -import ( - "context" - "log/slog" - - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/router" -) - -// SQS -type SQS interface { - GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) - SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) - ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) - DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) -} - -type broker struct { - config swarm.Config - client *client - channels *swarm.Channels - context context.Context - cancel context.CancelFunc - router *router.Router -} - -// New creates broker for AWS SQS -func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { - conf := swarm.NewConfig() - for _, opt := range opts { - opt(&conf) - } - - cli, err := newClient(queue, &conf) - if err != nil { - return nil, err - } - - ctx, can := context.WithCancel(context.Background()) - - slog.Info("Broker is created", "broker", "sqs") - return &broker{ - config: conf, - client: cli, - channels: swarm.NewChannels(), - context: ctx, - cancel: can, - router: router.New(&conf, cli.Ack), - }, nil -} - -func (b *broker) Config() swarm.Config { - return b.config -} - -func (b *broker) Close() { - b.channels.Sync() - b.channels.Close() - b.cancel() -} - -func (b *broker) DSync() { - b.channels.Sync() -} - -func (b *broker) Await() { - for { - select { - case <-b.context.Done(): - return - default: - var bag swarm.Bag - err := b.config.Backoff.Retry(func() (err error) { - bag, err = b.client.Deq("") - return - }) - if err != nil { - if b.config.StdErr != nil { - b.config.StdErr <- err - } - continue - } - - if bag.Object != nil { - if err := b.router.Dispatch(bag); err != nil { - if b.config.StdErr != nil { - b.config.StdErr <- err - } - continue - } - - if err := b.router.Await(b.config.TimeToFlight); err != nil { - if b.config.StdErr != nil { - b.config.StdErr <- err - } - continue - } - } - } - } -} - -func (b *broker) Enqueue(category string, channel swarm.Channel) swarm.Enqueue { - b.channels.Attach(category, channel) - - return b.client -} - -func (b *broker) Dequeue(category string, channel swarm.Channel) swarm.Dequeue { - b.channels.Attach(category, channel) - b.router.Register(category) - - return b.router -} diff --git a/broker/sqs/sqs.go b/broker/sqs/sqs.go index eeaaaa7..5354d57 100644 --- a/broker/sqs/sqs.go +++ b/broker/sqs/sqs.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov // // This file may be modified and distributed under the terms // of the Apache License Version 2.0. See the LICENSE file for details. @@ -17,17 +17,45 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" ) -type client struct { +// SQS +type SQS interface { + GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) + SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) + ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) + DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) +} + +type Client struct { service SQS - config *swarm.Config + config swarm.Config queue *string isFIFO bool } -func newClient(queue string, config *swarm.Config) (*client, error) { - api, err := newService(config) +func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { + cli, err := NewSQS(queue, opts...) + if err != nil { + return nil, err + } + + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + return kernel.New(cli, cli, config), err +} + +func NewSQS(queue string, opts ...swarm.Option) (*Client, error) { + config := swarm.NewConfig() + for _, opt := range opts { + opt(&config) + } + + api, err := newService(&config) if err != nil { return nil, err } @@ -44,7 +72,7 @@ func newClient(queue string, config *swarm.Config) (*client, error) { return nil, swarm.ErrServiceIO.New(err) } - return &client{ + return &Client{ service: api, config: config, queue: spec.QueueUrl, @@ -69,7 +97,7 @@ func newService(conf *swarm.Config) (SQS, error) { } // Enq enqueues message to broker -func (cli *client) Enq(bag swarm.Bag) error { +func (cli *Client) Enq(bag swarm.Bag) error { ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) defer cancel() @@ -96,15 +124,14 @@ func (cli *client) Enq(bag swarm.Bag) error { return nil } -// Ack acknowledges message to broker -func (cli *client) Ack(bag swarm.Bag) error { +func (cli *Client) Ack(digest string) error { ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) defer cancel() _, err := cli.service.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: cli.queue, - ReceiptHandle: aws.String(string(bag.Digest.Brief)), + ReceiptHandle: aws.String(digest), }, ) if err != nil { @@ -115,32 +142,34 @@ func (cli *client) Ack(bag swarm.Bag) error { } // Deq dequeues message from broker -func (cli client) Deq(cat string) (swarm.Bag, error) { - ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout) +func (cli Client) Ask() ([]swarm.Bag, error) { + ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout*2) defer cancel() result, err := cli.service.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ MessageAttributeNames: []string{string(types.QueueAttributeNameAll)}, QueueUrl: cli.queue, - MaxNumberOfMessages: 1, // TODO - WaitTimeSeconds: 10, // TODO + MaxNumberOfMessages: 1, // TODO + WaitTimeSeconds: int32(cli.config.NetworkTimeout.Seconds()), }, ) if err != nil { - return swarm.Bag{}, swarm.ErrDequeue.New(err) + return nil, swarm.ErrDequeue.New(err) } if len(result.Messages) == 0 { - return swarm.Bag{}, nil + return nil, nil } head := result.Messages[0] - return swarm.Bag{ - Category: attr(&head, "Category"), - Object: []byte(*head.Body), - Digest: swarm.Digest{Brief: *head.ReceiptHandle}, + return []swarm.Bag{ + { + Category: attr(&head, "Category"), + Object: []byte(*head.Body), + Digest: swarm.Digest{Brief: *head.ReceiptHandle}, + }, }, nil } diff --git a/channel.go b/channel.go deleted file mode 100644 index 08bfcf7..0000000 --- a/channel.go +++ /dev/null @@ -1,195 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package swarm - -import ( - "sync" - "time" -) - -// Channel is abstract concept of channel(s) -type Channel interface { - Sync() - Close() -} - -const syncInterval = 100 * time.Millisecond - -/* -MsgEnqCh is the pair of channel, exposed by the queue for enqueuing the messages -*/ -type MsgEnqCh[T any] struct { - Msg chan T // channel to send message out - Err chan T // channel to recv failed messages - Busy sync.Mutex -} - -func NewMsgEnqCh[T any](n int) MsgEnqCh[T] { - return MsgEnqCh[T]{ - Msg: make(chan T, n), - Err: make(chan T, n), - } -} - -func (ch *MsgEnqCh[T]) Sync() { - for { - time.Sleep(syncInterval) - if len(ch.Msg)+len(ch.Err) == 0 { - break - } - } - - time.Sleep(syncInterval) - ch.Busy.Lock() - defer ch.Busy.Unlock() -} - -func (ch *MsgEnqCh[T]) Close() { - ch.Sync() - - close(ch.Msg) - close(ch.Err) -} - -/* -msgRecv is the pair of channel, exposed by the queue to clients to recv messages -*/ -type MsgDeqCh[T any] struct { - Msg chan *Msg[T] // channel to recv message - Ack chan *Msg[T] // channel to send acknowledgement -} - -func NewMsgDeqCh[T any](n int) MsgDeqCh[T] { - return MsgDeqCh[T]{ - Msg: make(chan *Msg[T], n), - Ack: make(chan *Msg[T], n), - } -} - -func (ch *MsgDeqCh[T]) Sync() { - for { - time.Sleep(syncInterval) - if len(ch.Msg)+len(ch.Ack) == 0 { - break - } - } -} - -func (ch *MsgDeqCh[T]) Close() { - ch.Sync() - close(ch.Msg) - close(ch.Ack) -} - -/* -EvtEnqCh is the pair of channel, exposed by the queue to clients to send messages -*/ -type EvtEnqCh[T any, E EventKind[T]] struct { - Msg chan *E // channel to send message out - Err chan *E // channel to recv failed messages - Busy sync.Mutex -} - -func NewEvtEnqCh[T any, E EventKind[T]](n int) EvtEnqCh[T, E] { - return EvtEnqCh[T, E]{ - Msg: make(chan *E, n), - Err: make(chan *E, n), - } -} - -func (ch *EvtEnqCh[T, E]) Sync() { - for { - time.Sleep(syncInterval) - if len(ch.Msg)+len(ch.Err) == 0 { - break - } - } - - time.Sleep(syncInterval) - ch.Busy.Lock() - defer ch.Busy.Unlock() -} - -func (ch *EvtEnqCh[T, E]) Close() { - ch.Sync() - close(ch.Msg) - close(ch.Err) -} - -/* -msgRecv is the pair of channel, exposed by the queue to clients to recv messages -*/ -type EvtDeqCh[T any, E EventKind[T]] struct { - Msg chan *E // channel to recv message - Ack chan *E // channel to send acknowledgement -} - -func NewEvtDeqCh[T any, E EventKind[T]](n int) EvtDeqCh[T, E] { - return EvtDeqCh[T, E]{ - Msg: make(chan *E, n), - Ack: make(chan *E, n), - } -} - -func (ch *EvtDeqCh[T, E]) Sync() { - for { - time.Sleep(syncInterval) - if len(ch.Msg)+len(ch.Ack) == 0 { - break - } - } -} - -func (ch *EvtDeqCh[T, E]) Close() { - ch.Sync() - close(ch.Msg) - close(ch.Ack) -} - -/* -Channels -*/ -type Channels struct { - sync.Mutex - channels map[string]Channel -} - -func NewChannels() *Channels { - return &Channels{ - channels: make(map[string]Channel), - } -} - -func (chs *Channels) Length() int { - return len(chs.channels) -} - -func (chs *Channels) Attach(id string, ch Channel) { - chs.Lock() - defer chs.Unlock() - - chs.channels[id] = ch -} - -func (chs *Channels) Sync() { - for _, ch := range chs.channels { - ch.Sync() - } -} - -func (chs *Channels) Close() { - chs.Lock() - defer chs.Unlock() - - for _, ch := range chs.channels { - ch.Close() - } - - chs.channels = make(map[string]Channel) -} diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..62e99eb --- /dev/null +++ b/codec.go @@ -0,0 +1,83 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package swarm + +import ( + "encoding/json" + "time" + + "github.com/fogfish/curie" + "github.com/fogfish/golem/optics" + "github.com/fogfish/guid/v2" +) + +//------------------------------------------------------------------------------ + +// Json codec for I/O kernel +type CodecJson[T any] struct{} + +func (CodecJson[T]) Encode(x T) ([]byte, error) { + return json.Marshal(x) +} + +func (CodecJson[T]) Decode(b []byte) (x T, err error) { + err = json.Unmarshal(b, &x) + return +} + +func NewCodecJson[T any]() CodecJson[T] { return CodecJson[T]{} } + +//------------------------------------------------------------------------------ + +// Byte identity codec for I/O kernet +type CodecByte struct{} + +func (CodecByte) Encode(x []byte) ([]byte, error) { return x, nil } +func (CodecByte) Decode(x []byte) ([]byte, error) { return x, nil } + +func NewCodecByte() CodecByte { return CodecByte{} } + +//------------------------------------------------------------------------------ + +// Event codec for I/O kernel +type CodecEvent[T any, E EventKind[T]] struct { + source string + cat string + shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time] +} + +func (c CodecEvent[T, E]) Encode(obj *E) ([]byte, error) { + _, knd, src, _ := c.shape.Get(obj) + if knd == "" { + knd = curie.IRI(c.cat) + } + + if src == "" { + src = curie.IRI(c.source) + } + + c.shape.Put(obj, guid.G(guid.Clock).String(), knd, src, time.Now()) + + return json.Marshal(obj) +} + +func (c CodecEvent[T, E]) Decode(b []byte) (*E, error) { + x := new(E) + err := json.Unmarshal(b, x) + + return x, err +} + +func NewCodecEvent[T any, E EventKind[T]](source, cat string) CodecEvent[T, E] { + return CodecEvent[T, E]{ + source: source, + cat: cat, + shape: optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created"), + } +} diff --git a/config.go b/config.go index f9d654e..6a2b4eb 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,6 @@ import ( "time" "github.com/fogfish/swarm/internal/backoff" - "github.com/fogfish/swarm/internal/pipe" ) // Grade of Service Policy @@ -32,7 +31,7 @@ type Retry interface { } type Config struct { - // Instance of AWS Service, ... + // Instance of AWS Service, used to overwrite default client Service any // Source is a direct performer of the event. @@ -42,9 +41,11 @@ type Config struct { // Quality of Service Policy Policy Policy - // Queue capacity - EnqueueCapacity int - DequeueCapacity int + // Queue capacity (enhance with individual capacities) + CapOut int + CapDLQ int + CapRcv int + CapAck int // Retry Policy for service calls Backoff Retry @@ -60,21 +61,20 @@ type Config struct { // Timeout for any network operations NetworkTimeout time.Duration - - // Commit hook (executed after each loop iteration) - HookCommit func() } func NewConfig() Config { return Config{ - Source: "github.com/fogfish/swarm", - Policy: PolicyAtLeastOnce, - EnqueueCapacity: 0, - DequeueCapacity: 0, - Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), - PollFrequency: 10 * time.Millisecond, - TimeToFlight: 5 * time.Second, - NetworkTimeout: 5 * time.Second, + Source: "github.com/fogfish/swarm", + Policy: PolicyAtLeastOnce, + CapOut: 0, + CapDLQ: 0, + CapRcv: 0, + CapAck: 0, + Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), + PollFrequency: 10 * time.Millisecond, + TimeToFlight: 5 * time.Second, + NetworkTimeout: 5 * time.Second, } } @@ -142,9 +142,12 @@ func WithStdErr(stderr chan<- error) Option { func WithLogStdErr() Option { err := make(chan error) - pipe.ForEach(err, func(err error) { - slog.Error("Broker failed", "error", err) - }) + go func() { + var x error + for x = range err { + slog.Error("Broker failed", "error", x) + } + }() return func(conf *Config) { conf.StdErr = err @@ -205,8 +208,10 @@ func durationFromEnv(key string, def time.Duration) time.Duration { func WithPolicyAtMostOnce(n int) Option { return func(conf *Config) { conf.Policy = PolicyAtMostOnce - conf.EnqueueCapacity = n - conf.DequeueCapacity = n + conf.CapOut = n + conf.CapDLQ = n + conf.CapRcv = n + conf.CapAck = n } } @@ -216,13 +221,9 @@ func WithPolicyAtMostOnce(n int) Option { func WithPolicyAtLeastOnce(n int) Option { return func(conf *Config) { conf.Policy = PolicyAtLeastOnce - conf.EnqueueCapacity = 0 - conf.DequeueCapacity = n - } -} - -func WithHookCommit(hook func()) Option { - return func(conf *Config) { - conf.HookCommit = hook + conf.CapOut = 0 + conf.CapDLQ = 0 + conf.CapRcv = n + conf.CapAck = n } } diff --git a/event.go b/event.go index b6a06f2..628646e 100644 --- a/event.go +++ b/event.go @@ -58,17 +58,7 @@ type Event[T any] struct { // // The object upon which the event is carried out. Object T `json:"object,omitempty"` - - // - // The digest of received event (used internally to ack processing) - Digest Digest `json:"-"` } func (Event[T]) HKT1(EventType) {} func (Event[T]) HKT2(T) {} - -// Fail Event with error -func (evt *Event[T]) Fail(err error) *Event[T] { - evt.Digest.Error = err - return evt -} diff --git a/examples/bytes/dequeue/bytes.go b/examples/bytes/dequeue/bytes.go index 312b63f..ac701cf 100644 --- a/examples/bytes/dequeue/bytes.go +++ b/examples/bytes/dequeue/bytes.go @@ -32,7 +32,7 @@ func main() { type actor string -func (a actor) handle(rcv <-chan *swarm.Msg[[]byte], ack chan<- *swarm.Msg[[]byte]) { +func (a actor) handle(rcv <-chan swarm.Msg[[]byte], ack chan<- swarm.Msg[[]byte]) { for msg := range rcv { slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg diff --git a/examples/eventbridge/dequeue/eventbridge.go b/examples/eventbridge/dequeue/eventbridge.go index 4418fb1..cab1cf2 100644 --- a/examples/eventbridge/dequeue/eventbridge.go +++ b/examples/eventbridge/dequeue/eventbridge.go @@ -55,17 +55,17 @@ func main() { type actor[T any] string -func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { +func (a actor[T]) handle(rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { for msg := range rcv { slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg } } -func ebus(rcv <-chan *EventNote, ack chan<- *EventNote) { +func ebus(rcv <-chan swarm.Msg[*EventNote], ack chan<- swarm.Msg[*EventNote]) { for msg := range rcv { prefix := "" - switch string(msg.Type) { + switch string(msg.Object.Type) { case "note:EventCreateNote": prefix = "+ |" case "note:EventUpdateNote": diff --git a/examples/eventddb/dequeue/eventddb.go b/examples/eventddb/dequeue/eventddb.go index 9e99ef4..7ff2e1d 100644 --- a/examples/eventddb/dequeue/eventddb.go +++ b/examples/eventddb/dequeue/eventddb.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" + "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/eventddb" "github.com/fogfish/swarm/internal/qtest" @@ -28,7 +29,7 @@ func main() { q.Await() } -func common(rcv <-chan *eventddb.Event, ack chan<- *eventddb.Event) { +func common(rcv <-chan swarm.Msg[*events.DynamoDBEventRecord], ack chan<- swarm.Msg[*events.DynamoDBEventRecord]) { for msg := range rcv { v, _ := json.MarshalIndent(msg, "", " ") diff --git a/examples/eventddb/serverless/main.go b/examples/eventddb/serverless/main.go index de780bd..086f6ae 100644 --- a/examples/eventddb/serverless/main.go +++ b/examples/eventddb/serverless/main.go @@ -17,7 +17,7 @@ func main() { app := eventddb.NewServerlessApp() stack := app.NewStack("swarm-example-eventddb") - stack.NewGlobalTable() + stack.NewTable() stack.NewSink( &eventddb.SinkProps{ diff --git a/examples/events/dequeue/events.go b/examples/events/dequeue/events.go index 7ad3f32..51b4305 100644 --- a/examples/events/dequeue/events.go +++ b/examples/events/dequeue/events.go @@ -64,7 +64,7 @@ func main() { q.Await() } -func create(rcv <-chan *EventCreateUser, ack chan<- *EventCreateUser) { +func create(rcv <-chan swarm.Msg[*EventCreateUser], ack chan<- swarm.Msg[*EventCreateUser]) { for msg := range rcv { v, _ := json.MarshalIndent(msg, "+ |", " ") fmt.Printf("create user > \n %s\n", v) @@ -72,7 +72,7 @@ func create(rcv <-chan *EventCreateUser, ack chan<- *EventCreateUser) { } } -func update(rcv <-chan *EventUpdateUser, ack chan<- *EventUpdateUser) { +func update(rcv <-chan swarm.Msg[*EventUpdateUser], ack chan<- swarm.Msg[*EventUpdateUser]) { for msg := range rcv { v, _ := json.MarshalIndent(msg, "~ |", " ") fmt.Printf("update user > \n %s\n", v) @@ -80,7 +80,7 @@ func update(rcv <-chan *EventUpdateUser, ack chan<- *EventUpdateUser) { } } -func remove(rcv <-chan *EventRemoveUser, ack chan<- *EventRemoveUser) { +func remove(rcv <-chan swarm.Msg[*EventRemoveUser], ack chan<- swarm.Msg[*EventRemoveUser]) { for msg := range rcv { v, _ := json.MarshalIndent(msg, "- |", " ") fmt.Printf("remove user > \n %s\n", v) @@ -88,10 +88,10 @@ func remove(rcv <-chan *EventRemoveUser, ack chan<- *EventRemoveUser) { } } -func common(rcv <-chan *EventNote, ack chan<- *EventNote) { +func common(rcv <-chan swarm.Msg[*EventNote], ack chan<- swarm.Msg[*EventNote]) { for msg := range rcv { prefix := "" - switch string(msg.Type) { + switch string(msg.Object.Type) { case "note:EventCreateNote": prefix = "+ |" case "note:EventUpdateNote": diff --git a/examples/events3/dequeue/events3.go b/examples/events3/dequeue/events3.go index bc4178a..ef18a74 100644 --- a/examples/events3/dequeue/events3.go +++ b/examples/events3/dequeue/events3.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" + "github.com/aws/aws-lambda-go/events" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/events3" "github.com/fogfish/swarm/internal/qtest" @@ -28,7 +29,7 @@ func main() { q.Await() } -func common(rcv <-chan *events3.Event, ack chan<- *events3.Event) { +func common(rcv <-chan swarm.Msg[*events.S3EventRecord], ack chan<- swarm.Msg[*events.S3EventRecord]) { for msg := range rcv { v, _ := json.MarshalIndent(msg, "", " ") diff --git a/examples/eventsqs/dequeue/eventsqs.go b/examples/eventsqs/dequeue/eventsqs.go index 9694095..cbcc86c 100644 --- a/examples/eventsqs/dequeue/eventsqs.go +++ b/examples/eventsqs/dequeue/eventsqs.go @@ -46,7 +46,7 @@ func main() { type actor[T any] string -func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { +func (a actor[T]) handle(rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { for msg := range rcv { slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg diff --git a/examples/sqs/dequeue/sqs.go b/examples/sqs/dequeue/sqs.go index d33e9ad..93b5eb8 100644 --- a/examples/sqs/dequeue/sqs.go +++ b/examples/sqs/dequeue/sqs.go @@ -46,7 +46,7 @@ func main() { type actor[T any] string -func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { +func (a actor[T]) handle(rcv <-chan swarm.Msg[T], ack chan<- swarm.Msg[T]) { for msg := range rcv { slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg diff --git a/internal/kernel/kernel.go b/internal/kernel/kernel.go new file mode 100644 index 0000000..62f1112 --- /dev/null +++ b/internal/kernel/kernel.go @@ -0,0 +1,365 @@ +// +// Copyright (C) 2021 - 2024 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package kernel + +import ( + "errors" + "log/slog" + "sync" + "time" + + "github.com/fogfish/swarm" +) + +type Codec[T any] interface { + Encode(T) ([]byte, error) + Decode([]byte) (T, error) +} + +type Emitter interface { + Enq(swarm.Bag) error +} + +type Cathode interface { + Ack(digest string) error + Ask() ([]swarm.Bag, error) +} + +type Spawner interface { + Spawn(*Kernel) error +} + +// Event Kernel builds an infrastructure for integrating message brokers, +// events busses into Golang channel environment. +// The implementation follows the pattern, defined by +// https://go101.org/article/channel-closing.html +type Kernel struct { + sync.WaitGroup + sync.RWMutex + + // Kernel configuartion + Config swarm.Config + + // Control-plane stop channel. It is used to notify the kernel to terminate. + // Kernel notifies control plane of individual routines. + mainStop chan struct{} + + // Control-plane stop channel used by go routines to stop I/O on data channels + ctrlStop chan struct{} + + // Control-plane for ack of batch elements + ctrlAcks chan swarm.Digest + + // event router, binds category with destination channel + router map[string]interface{ Route(swarm.Bag) error } + + waiting bool + + // On the wire protocol emitter (writer) and cathode (receiver) + Emitter Emitter + Cathode Cathode +} + +// New routing and dispatch kernel +func New(emitter Emitter, cathode Cathode, config swarm.Config) *Kernel { + return &Kernel{ + Config: config, + mainStop: make(chan struct{}, 1), // MUST BE buffered + ctrlStop: make(chan struct{}), + + router: map[string]interface{ Route(swarm.Bag) error }{}, + Emitter: emitter, + Cathode: cathode, + } +} + +// internal infinite receive loop. +// waiting for message from event buses and queues and schedules it for delivery. +func (k *Kernel) receive() { + k.WaitGroup.Add(1) + + asker := func() { + seq, err := k.Cathode.Ask() + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + + for i := 0; i < len(seq); i++ { + bag := seq[i] + + k.RWMutex.RLock() + r, has := k.router[bag.Category] + k.RWMutex.RUnlock() + + if has { + err := r.Route(bag) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + return + } + } + } + } + + go func() { + exit: + for { + select { + case <-k.ctrlStop: + break exit + default: + } + + select { + case <-k.ctrlStop: + break exit + case <-time.After(k.Config.PollFrequency): + asker() + } + } + + slog.Debug("Free kernel infinite loop") + k.WaitGroup.Done() + }() + + slog.Debug("Init kernel infinite loop") +} + +// Close event delivery infrastructure +func (k *Kernel) Close() { + k.mainStop <- struct{}{} + if !k.waiting { + close(k.ctrlStop) + k.WaitGroup.Wait() + } +} + +// Await for event delivery +func (k *Kernel) Await() { + k.waiting = true + + if spawner, ok := k.Cathode.(Spawner); ok { + spawner.Spawn(k) + } else { + k.receive() + } + + <-k.mainStop + close(k.ctrlStop) + k.WaitGroup.Wait() +} + +// Dispatches batch of messages +func (k *Kernel) Dispatch(seq []swarm.Bag, timeout time.Duration) error { + k.WaitGroup.Add(1) + k.ctrlAcks = make(chan swarm.Digest, len(seq)) + + wnd := map[string]struct{}{} + for i := 0; i < len(seq); i++ { + bag := seq[i] + wnd[bag.Digest.Brief] = struct{}{} + + k.RWMutex.RLock() + r, has := k.router[bag.Category] + k.RWMutex.RUnlock() + + if has { + err := r.Route(bag) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + continue + } + } + } + + var err error + +exit: + for { + select { + case <-k.ctrlStop: + break exit + default: + } + + select { + case <-k.ctrlStop: + break exit + case ack := <-k.ctrlAcks: + if err == nil && ack.Error != nil { + err = ack.Error + } + + delete(wnd, ack.Brief) + if len(wnd) == 0 { + break exit + } + case <-time.After(timeout): + err = errors.New("ack timeout") + break exit + } + } + + close(k.ctrlAcks) + k.ctrlAcks = nil + k.WaitGroup.Done() + + return err +} + +// Enqueue channels for kernel +func Enqueue[T any](k *Kernel, cat string, codec Codec[T]) ( /*snd*/ chan<- T /*dlq*/, <-chan T) { + snd := make(chan T, k.Config.CapOut) + dlq := make(chan T, k.Config.CapDLQ) + + // emitter routine + emit := func(obj T) { + msg, err := codec.Encode(obj) + if err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + + bag := swarm.Bag{Category: cat, Object: msg} + if err := k.Emitter.Enq(bag); err != nil { + dlq <- obj + if k.Config.StdErr != nil { + k.Config.StdErr <- err + } + return + } + + slog.Debug("emitted ", "cat", cat, "object", obj) + } + + k.WaitGroup.Add(1) + go func() { + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.ctrlStop: + break exit + default: + } + + select { + case <-k.ctrlStop: + break exit + case obj := <-snd: + emit(obj) + } + } + + backlog := len(snd) + close(snd) + + slog.Debug("Free enqueue kernel", "cat", cat, "backlog", backlog) + if backlog != 0 { + for obj := range snd { + emit(obj) + } + } + k.WaitGroup.Done() + }() + + slog.Debug("Init enqueue kernel", "cat", cat) + + return snd, dlq +} + +type router[T any] struct { + ch chan swarm.Msg[T] + codec Codec[T] +} + +func (a router[T]) Route(bag swarm.Bag) error { + obj, err := a.codec.Decode(bag.Object) + if err != nil { + return err + } + msg := swarm.Msg[T]{Object: obj, Digest: bag.Digest} + a.ch <- msg + return nil +} + +// Enqueue channels for kernel +func Dequeue[T any](k *Kernel, cat string, codec Codec[T]) ( /*rcv*/ <-chan swarm.Msg[T] /*ack*/, chan<- swarm.Msg[T]) { + rcv := make(chan swarm.Msg[T], k.Config.CapRcv) + ack := make(chan swarm.Msg[T], k.Config.CapAck) + + k.RWMutex.Lock() + k.router[cat] = router[T]{ch: rcv, codec: codec} + k.RWMutex.Unlock() + + // emitter routine + acks := func(msg swarm.Msg[T]) { + if msg.Digest.Error == nil { + err := k.Cathode.Ack(msg.Digest.Brief) + if k.Config.StdErr != nil && err != nil { + k.Config.StdErr <- err + } + + slog.Debug("acked ", "cat", cat, "object", msg.Object) + } + + if k.ctrlAcks != nil { + k.ctrlAcks <- msg.Digest + } + } + + k.WaitGroup.Add(1) + go func() { + exit: + for { + // The try-receive operation here is to + // try to exit the sender goroutine as + // early as possible. Try-receive and + // try-send select blocks are specially + // optimized by the standard Go + // compiler, so they are very efficient. + select { + case <-k.ctrlStop: + break exit + default: + } + + select { + case <-k.ctrlStop: + break exit + case msg := <-ack: + acks(msg) + } + } + + backlog := len(ack) + close(ack) + + slog.Debug("Free dequeue kernel", "cat", cat, "backlog", backlog) + if backlog != 0 { + for msg := range ack { + acks(msg) + } + } + k.WaitGroup.Done() + }() + + slog.Debug("Init dequeue kernel", "cat", cat) + + return rcv, ack +} diff --git a/internal/pipe/pipe.go b/internal/pipe/pipe.go deleted file mode 100644 index a2e761d..0000000 --- a/internal/pipe/pipe.go +++ /dev/null @@ -1,49 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package pipe - -import ( - "time" -) - -/* -ForEach applies function for each message in the channel -*/ -func ForEach[A any](in <-chan A, f func(A)) { - go func() { - var ( - x A - ) - - for x = range in { - f(x) - } - }() -} - -/* -Emit periodically message from the function -*/ -func Emit[T any](eg chan<- T, frequency time.Duration, f func() (T, error)) { - go func() { - defer func() { - // Note: recover from panic on sending to closed channel - if recover() != nil { - return - } - }() - - for { - time.Sleep(frequency) - if x, err := f(); err == nil { - eg <- x - } - } - }() -} diff --git a/internal/qtest/qtest.go b/internal/qtest/qtest.go index 0703594..23b39cc 100644 --- a/internal/qtest/qtest.go +++ b/internal/qtest/qtest.go @@ -189,24 +189,27 @@ func TestDequeueTyped(t *testing.T, factory dequeue) { q.Close() }) - t.Run("Commit", func(t *testing.T) { - commited := false - - q := factory(eff, "test-queue", Category, Message, Receipt, retry200ms, swarm.WithHookCommit(func() { commited = true })) - - msg, ack := queue.Dequeue[Note](q) - go q.Await() - - val := <-msg - ack <- val - - it.Then(t). - Should(it.Equal(val.Object, Note{Some: "message"})). - Should(it.Equal(<-eff, Receipt)). - Should(it.True(commited)) - - q.Close() - }) + // + // Note: Commit hook deprecated, kernel implements sync + // + // t.Run("Commit", func(t *testing.T) { + // commited := false + // + // q := factory(eff, "test-queue", Category, Message, Receipt, retry200ms, swarm.WithHookCommit(func() { commited = true })) + // + // msg, ack := queue.Dequeue[Note](q) + // go q.Await() + // + // val := <-msg + // ack <- val + // + // it.Then(t). + // Should(it.Equal(val.Object, Note{Some: "message"})). + // Should(it.Equal(<-eff, Receipt)). + // Should(it.True(commited)) + + // q.Close() + // }) } func TestDequeueBytes(t *testing.T, factory dequeue) { @@ -254,12 +257,12 @@ func TestDequeueEvent(t *testing.T, factory dequeue) { ack <- val it.Then(t). - Should(it.Equal(*val.Object, Note{Some: "message"})). - Should(it.Equal(val.Agent, "agent")). - Should(it.Equal(val.Participant, "user")). - Should(it.Equal(val.Type, "type")). - Should(it.Equal(val.ID, "id")). - Should(it.Equal(val.Created, time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))). + Should(it.Equal(*val.Object.Object, Note{Some: "message"})). + Should(it.Equal(val.Object.Agent, "agent")). + Should(it.Equal(val.Object.Participant, "user")). + Should(it.Equal(val.Object.Type, "type")). + Should(it.Equal(val.Object.ID, "id")). + Should(it.Equal(val.Object.Created, time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))). Should(it.Equal(<-eff, Receipt)) q.Close() diff --git a/internal/router/router.go b/internal/router/router.go deleted file mode 100644 index 8af5624..0000000 --- a/internal/router/router.go +++ /dev/null @@ -1,104 +0,0 @@ -// -// Copyright (C) 2021 - 2022 Dmitry Kolesnikov -// -// This file may be modified and distributed under the terms -// of the Apache License Version 2.0. See the LICENSE file for details. -// https://github.com/fogfish/swarm -// - -package router - -import ( - "fmt" - "log/slog" - "sync" - "time" - - "github.com/fogfish/swarm" -) - -type Router struct { - sync.Mutex - config *swarm.Config - sack chan swarm.Bag - sock map[string]chan swarm.Bag - acks map[string]struct{} - onAck func(swarm.Bag) error -} - -func New(config *swarm.Config, onAck func(swarm.Bag) error) *Router { - return &Router{ - config: config, - sack: make(chan swarm.Bag, config.DequeueCapacity), - sock: make(map[string]chan swarm.Bag), - acks: map[string]struct{}{}, - onAck: onAck, - } -} - -func (router *Router) Register(category string) { - router.Lock() - defer router.Unlock() - - router.sock[category] = make(chan swarm.Bag, router.config.DequeueCapacity) - slog.Debug("Registered channel for category", "category", category) -} - -func (router *Router) Ack(bag swarm.Bag) error { - router.sack <- bag - return nil -} - -func (router *Router) Deq(category string) (swarm.Bag, error) { - bag := <-router.sock[category] - return bag, nil -} - -func (router *Router) Dispatch(bag swarm.Bag) error { - sock, exists := router.sock[bag.Category] - if !exists { - return fmt.Errorf("not found category %s", bag.Category) - } - - router.acks[bag.Digest.Brief] = struct{}{} - sock <- bag - - return nil -} - -func (router *Router) Await(d time.Duration) error { - for { - select { - case bag := <-router.sack: - // Note: existing implementation assumes invalidation of batch - // - SQS does not support batching yet (to be done later) - // - Event SQS 1 invocation for 1 message, entire batch invalidated - // - EventBridge does not support batching, 1 invocation for 1 message - // - S3 does not support batching (aws s3 api fakes it), 1 invocation for 1 message. - if err := bag.Digest.Error; err != nil { - router.acks = map[string]struct{}{} - return err - } - - if router.onAck != nil { - err := router.config.Backoff.Retry(func() error { - return router.onAck(bag) - }) - if err != nil { - return swarm.ErrServiceIO.New(err) - } - } - - delete(router.acks, bag.Digest.Brief) - if len(router.acks) == 0 { - if router.config.HookCommit != nil { - router.config.HookCommit() - } - return nil - } - case <-time.After(d): - router.acks = map[string]struct{}{} - return fmt.Errorf("timeout message ack") - } - } -} diff --git a/queue/bytes/dequeue.go b/queue/bytes/dequeue.go index 3961bac..d3a0854 100644 --- a/queue/bytes/dequeue.go +++ b/queue/bytes/dequeue.go @@ -9,58 +9,13 @@ package bytes import ( - "log/slog" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) // Dequeue bytes -func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swarm.Msg[[]byte]) { - conf := q.Config() - ch := swarm.NewMsgDeqCh[[]byte](conf.DequeueCapacity) - - sock := q.Dequeue(cat, &ch) - - pipe.ForEach(ch.Ack, func(object *swarm.Msg[[]byte]) { - err := conf.Backoff.Retry(func() error { - return sock.Ack(swarm.Bag{ - Category: cat, - Digest: object.Digest, - }) - }) - if err != nil && conf.StdErr != nil { - conf.StdErr <- err - return - } - - slog.Debug("Broker ack'ed object", "kind", "bytes", "category", cat, "object", object.Object, "error", object.Digest.Error) - }) - - pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[[]byte], error) { - var bag swarm.Bag - err := conf.Backoff.Retry(func() (err error) { - bag, err = sock.Deq(cat) - return - }) - if err != nil { - if conf.StdErr != nil { - conf.StdErr <- err - } - return nil, err - } - - msg := &swarm.Msg[[]byte]{ - Object: bag.Object, - Digest: bag.Digest, - } - - slog.Debug("Broker received object", "kind", "bytes", "category", cat, "object", bag.Object) - - return msg, nil - }) - - slog.Debug("Created dequeue channels: rcv, ack", "kind", "bytes", "category", cat) +func Dequeue(q swarm.Broker, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) { + codec := swarm.NewCodecByte() - return ch.Msg, ch.Ack + return kernel.Dequeue(q.(*kernel.Kernel), cat, codec) } diff --git a/queue/bytes/enqueue.go b/queue/bytes/enqueue.go index cb4b39f..0b21f0f 100644 --- a/queue/bytes/enqueue.go +++ b/queue/bytes/enqueue.go @@ -9,36 +9,13 @@ package bytes import ( - "log/slog" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) // Enqueue creates pair of channels to send messages and dead-letter queue func Enqueue(q swarm.Broker, cat string) (chan<- []byte, <-chan []byte) { - conf := q.Config() - ch := swarm.NewMsgEnqCh[[]byte](conf.EnqueueCapacity) - - sock := q.Enqueue(cat, &ch) - - pipe.ForEach(ch.Msg, func(object []byte) { - ch.Busy.Lock() - defer ch.Busy.Unlock() - - bag := swarm.Bag{Category: cat, Object: object} - err := conf.Backoff.Retry(func() error { return sock.Enq(bag) }) - if err != nil { - ch.Err <- object - if conf.StdErr != nil { - conf.StdErr <- err - } - } - - slog.Debug("Enqueued", "kind", "bytes", "category", bag.Category, "object", object) - }) - - slog.Debug("Created enqueue channels: out, err", "kind", "bytes", "category", cat) + codec := swarm.NewCodecByte() - return ch.Msg, ch.Err + return kernel.Enqueue(q.(*kernel.Kernel), cat, codec) } diff --git a/queue/bytes/queue.go b/queue/bytes/queue.go index 0ceabe6..6fe4f73 100644 --- a/queue/bytes/queue.go +++ b/queue/bytes/queue.go @@ -12,6 +12,7 @@ import ( "log/slog" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" ) type Queue interface { @@ -19,9 +20,9 @@ type Queue interface { } type queue struct { - cat string - conf swarm.Config - sock swarm.Enqueue + cat string + codec kernel.Codec[[]byte] + emit kernel.Emitter } func (q queue) Sync() {} @@ -29,7 +30,7 @@ func (q queue) Close() {} func (q queue) Enqueue(object []byte) error { bag := swarm.Bag{Category: q.cat, Object: object} - err := q.conf.Backoff.Retry(func() error { return q.sock.Enq(bag) }) + err := q.emit.Enq(bag) if err != nil { return err } @@ -39,8 +40,11 @@ func (q queue) Enqueue(object []byte) error { } func New(q swarm.Broker, category string) Queue { - queue := &queue{cat: category, conf: q.Config()} - queue.sock = q.Enqueue(category, queue) + k := q.(*kernel.Kernel) + + codec := swarm.NewCodecByte() + + queue := &queue{cat: category, codec: codec, emit: k.Emitter} slog.Debug("Created sync emitter", "kind", "bytes", "category", category) diff --git a/queue/dequeue.go b/queue/dequeue.go index bfb84a1..b8e281e 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -9,69 +9,18 @@ package queue import ( - "encoding/json" - "log/slog" - "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) // Dequeue message -func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], chan<- *swarm.Msg[T]) { - // TODO: automatically ack At Most Once, no ack channel - // make it as /dev/null - conf := q.Config() - ch := swarm.NewMsgDeqCh[T](conf.DequeueCapacity) - +func Dequeue[T any](q swarm.Broker, category ...string) (<-chan swarm.Msg[T], chan<- swarm.Msg[T]) { cat := categoryOf[T]() if len(category) > 0 { cat = category[0] } - sock := q.Dequeue(cat, &ch) - - pipe.ForEach(ch.Ack, func(object *swarm.Msg[T]) { - err := conf.Backoff.Retry(func() error { - return sock.Ack(swarm.Bag{ - Category: cat, - Digest: object.Digest, - }) - }) - if err != nil && conf.StdErr != nil { - conf.StdErr <- err - return - } - - slog.Debug("Broker ack'ed object", "kind", "typed", "category", cat, "object", object.Object, "error", object.Digest.Error) - }) - - pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[T], error) { - var bag swarm.Bag - err := conf.Backoff.Retry(func() (err error) { - bag, err = sock.Deq(cat) - return - }) - if err != nil { - if conf.StdErr != nil { - conf.StdErr <- err - } - return nil, err - } - - msg := &swarm.Msg[T]{Digest: bag.Digest} - if err := json.Unmarshal(bag.Object, &msg.Object); err != nil { - if conf.StdErr != nil { - conf.StdErr <- err - } - return nil, err - } - - slog.Debug("Broker received object", "kind", "typed", "category", cat, "object", msg.Object) - - return msg, nil - }) - - slog.Debug("Created dequeue channels: rcv, ack", "kind", "typed", "category", cat) + codec := swarm.NewCodecJson[T]() - return ch.Msg, ch.Ack + return kernel.Dequeue(q.(*kernel.Kernel), cat, codec) } diff --git a/queue/enqueue.go b/queue/enqueue.go index 74c1ad3..a097659 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -9,60 +9,23 @@ package queue import ( - "encoding/json" - "log/slog" "reflect" "strings" "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) -/* -Enqueue creates pair of channels to send messages and dead-letter queue -*/ +// Create egress and dead-letter queue channels for the category func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { - // TODO: discard dlq for At Most Once - // make it nil - - conf := q.Config() - ch := swarm.NewMsgEnqCh[T](conf.EnqueueCapacity) - cat := categoryOf[T]() if len(category) > 0 { cat = category[0] } - sock := q.Enqueue(cat, &ch) - - pipe.ForEach(ch.Msg, func(object T) { - ch.Busy.Lock() - defer ch.Busy.Unlock() - - msg, err := json.Marshal(object) - if err != nil { - ch.Err <- object - if conf.StdErr != nil { - conf.StdErr <- err - } - return - } - - bag := swarm.Bag{Category: cat, Object: msg} - err = conf.Backoff.Retry(func() error { return sock.Enq(bag) }) - if err != nil { - ch.Err <- object - if conf.StdErr != nil { - conf.StdErr <- err - } - } - - slog.Debug("Enqueued message", "kind", "typed", "category", bag.Category, "object", object) - }) - - slog.Debug("Created enqueue channels: out, err", "kind", "typed", "category", cat) + codec := swarm.NewCodecJson[T]() - return ch.Msg, ch.Err + return kernel.Enqueue(q.(*kernel.Kernel), cat, codec) } // normalized type name diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index dd47da3..d3be6e7 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -9,81 +9,18 @@ package events import ( - "encoding/json" - "log/slog" - - "github.com/fogfish/golem/optics" "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) // Dequeue event -func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan *E, chan<- *E) { - conf := q.Config() - ch := swarm.NewEvtDeqCh[T, E](conf.DequeueCapacity) - +func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan swarm.Msg[*E], chan<- swarm.Msg[*E]) { catE := categoryOf[E]() if len(category) > 0 { catE = category[0] } - lens := optics.ForProduct1[E, swarm.Digest]("Digest") - - sock := q.Dequeue(catE, &ch) - - pipe.ForEach(ch.Ack, func(object *E) { - digest := lens.Get(object) - - err := conf.Backoff.Retry(func() error { - return sock.Ack(swarm.Bag{ - Category: catE, - Digest: digest, - }) - }) - if err != nil && conf.StdErr != nil { - conf.StdErr <- err - return - } - - slog.Debug("Broker ack'ed object", "kind", "event", "category", catE, "object", object, "error", digest.Error) - }) - - pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*E, error) { - var bag swarm.Bag - err := conf.Backoff.Retry(func() (err error) { - bag, err = sock.Deq(catE) - return - }) - if err != nil { - if conf.StdErr != nil { - conf.StdErr <- err - } - return nil, err - } - - evt := new(E) - - if bag.Event != nil { - evt = bag.Event.(*E) - } - - if bag.Object != nil { - if err := json.Unmarshal(bag.Object, evt); err != nil { - if conf.StdErr != nil { - conf.StdErr <- err - } - return nil, err - } - } - - lens.Put(evt, bag.Digest) - - slog.Debug("Broker received object", "kind", "event", "category", catE, "object", evt) - - return evt, nil - }) - - slog.Debug("Created dequeue channels: rcv, ack", "kind", "event", "category", catE) + codec := swarm.NewCodecEvent[T, E]("TODO", catE) - return ch.Msg, ch.Ack + return kernel.Dequeue(q.(*kernel.Kernel), catE, codec) } diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 227e95d..5178e57 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -9,74 +9,26 @@ package events import ( - "encoding/json" - "log/slog" "reflect" "strings" - "time" - "github.com/fogfish/curie" - "github.com/fogfish/golem/optics" - "github.com/fogfish/guid/v2" "github.com/fogfish/swarm" - "github.com/fogfish/swarm/internal/pipe" + "github.com/fogfish/swarm/internal/kernel" ) // Enqueue creates pair of channels // - to send messages // - failed messages (dead-letter queue) func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) { - conf := q.Config() - ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity) - catE := categoryOf[E]() if len(category) > 0 { catE = category[0] } - shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created") - - sock := q.Enqueue(catE, &ch) - - pipe.ForEach(ch.Msg, func(object *E) { - ch.Busy.Lock() - defer ch.Busy.Unlock() - - _, knd, src, _ := shape.Get(object) - if knd == "" { - knd = curie.IRI(catE) - } - - if src == "" { - src = curie.IRI(q.Config().Source) - } - - shape.Put(object, guid.G(guid.Clock).String(), knd, src, time.Now()) - - msg, err := json.Marshal(object) - if err != nil { - ch.Err <- object - if conf.StdErr != nil { - conf.StdErr <- err - } - return - } - - bag := swarm.Bag{Category: catE, Object: msg} - err = conf.Backoff.Retry(func() error { return sock.Enq(bag) }) - if err != nil { - ch.Err <- object - if conf.StdErr != nil { - conf.StdErr <- err - } - } - - slog.Debug("Enqueued event", "kind", "event", "category", bag.Category, "object", object) - }) - - slog.Debug("Created enqueue channels: out, err", "kind", "event", "category", catE) + k := q.(*kernel.Kernel) + codec := swarm.NewCodecEvent[T, E](k.Config.Source, catE) - return ch.Msg, ch.Err + return kernel.Enqueue(q.(*kernel.Kernel), catE, codec) } // normalized type name diff --git a/queue/events/queue.go b/queue/events/queue.go index 5e39255..83e3dd4 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -9,14 +9,10 @@ package events import ( - "encoding/json" "log/slog" - "time" - "github.com/fogfish/curie" - "github.com/fogfish/golem/optics" - "github.com/fogfish/guid/v2" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" ) type Queue[T any, E swarm.EventKind[T]] interface { @@ -25,33 +21,21 @@ type Queue[T any, E swarm.EventKind[T]] interface { type queue[T any, E swarm.EventKind[T]] struct { cat string - conf swarm.Config - sock swarm.Enqueue - shape optics.Lens4[E, string, curie.IRI, curie.IRI, time.Time] + codec kernel.Codec[*E] + emit kernel.Emitter } func (q queue[T, E]) Sync() {} func (q queue[T, E]) Close() {} func (q queue[T, E]) Enqueue(object *E) error { - _, knd, src, _ := q.shape.Get(object) - if knd == "" { - knd = curie.IRI(q.cat) - } - - if src == "" { - src = curie.IRI(q.conf.Source) - } - - q.shape.Put(object, guid.G(guid.Clock).String(), knd, src, time.Now()) - - msg, err := json.Marshal(object) + msg, err := q.codec.Encode(object) if err != nil { return err } bag := swarm.Bag{Category: q.cat, Object: msg} - err = q.conf.Backoff.Retry(func() error { return q.sock.Enq(bag) }) + err = q.emit.Enq(bag) if err != nil { return err } @@ -61,19 +45,20 @@ func (q queue[T, E]) Enqueue(object *E) error { } func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Queue[T, E] { + k := q.(*kernel.Kernel) + catE := categoryOf[E]() if len(category) > 0 { catE = category[0] } - shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created") + codec := swarm.NewCodecEvent[T, E](k.Config.Source, catE) queue := &queue[T, E]{ cat: catE, - conf: q.Config(), - shape: shape, + codec: codec, + emit: k.Emitter, } - queue.sock = q.Enqueue(catE, queue) slog.Debug("Created sync emitter", "kind", "event", "category", catE) diff --git a/queue/queue.go b/queue/queue.go index 279753e..f1da3ef 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -9,10 +9,10 @@ package queue import ( - "encoding/json" "log/slog" "github.com/fogfish/swarm" + "github.com/fogfish/swarm/internal/kernel" ) type Queue[T any] interface { @@ -20,22 +20,22 @@ type Queue[T any] interface { } type queue[T any] struct { - cat string - conf swarm.Config - sock swarm.Enqueue + cat string + codec kernel.Codec[T] + emit kernel.Emitter } func (q queue[T]) Sync() {} func (q queue[T]) Close() {} func (q queue[T]) Enqueue(object T) error { - msg, err := json.Marshal(object) + msg, err := q.codec.Encode(object) if err != nil { return err } bag := swarm.Bag{Category: q.cat, Object: msg} - err = q.conf.Backoff.Retry(func() error { return q.sock.Enq(bag) }) + err = q.emit.Enq(bag) if err != nil { return err } @@ -45,13 +45,16 @@ func (q queue[T]) Enqueue(object T) error { } func New[T any](q swarm.Broker, category ...string) Queue[T] { + k := q.(*kernel.Kernel) + cat := categoryOf[T]() if len(category) > 0 { cat = category[0] } - queue := &queue[T]{cat: cat, conf: q.Config()} - queue.sock = q.Enqueue(cat, queue) + codec := swarm.NewCodecJson[T]() + + queue := &queue[T]{cat: cat, codec: codec, emit: k.Emitter} slog.Debug("Created sync emitter", "kind", "typed", "category", cat) diff --git a/service.go b/service.go index c35a907..5202850 100644 --- a/service.go +++ b/service.go @@ -10,17 +10,25 @@ package swarm import ( "log/slog" - - "github.com/fogfish/swarm/internal/pipe" ) +// Message broker +type Broker interface { + Close() + Await() +} + // Consumes dead letter messages // // swarm.LogDeadLetters(queue.Enqueue(...)) func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T { - pipe.ForEach[T](err, func(t T) { - slog.Error("Fail to emit", "object", t) - }) + go func() { + var x T + + for x = range err { + slog.Error("Fail to emit", "object", x) + } + }() return out }