From 8a6ad62fbf19adde84e3d2a528b8fa185dc3117d Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Mon, 30 Sep 2024 23:24:05 +0300 Subject: [PATCH] kernel v0.20.1 with minor improvements --- README.md | 443 +++++++++++++++++++++++++++-------------- bag.go | 15 +- config.go | 55 +++-- dequeue/dequeue.go | 4 +- enqueue/enqueue.go | 4 +- enqueue/writer.go | 4 +- errors.go | 27 ++- kernel/bridge.go | 3 +- kernel/cathode.go | 8 + kernel/cathode_test.go | 24 ++- version.go | 2 +- 11 files changed, 410 insertions(+), 179 deletions(-) diff --git a/README.md b/README.md index 1df0da3..ed8d525 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,6 @@

Go channels for distributed queueing and event-driven systems

- - - - - - - - @@ -29,48 +21,168 @@

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
sub-moduledocfeaturesabout
+ + + + + API & Kernel +
+ + + + + + + + AWS EventBridge +
+ + + + + + + + AWS DynamoDB Stream +
+ + + + + + + + AWS S3 Event +
+ + + + + + + + AWS SQS Events +
+ + + + + + + AWS SQS and SQS FIFO +
+ + + + + + + + AWS WebSocket API +
+ + + AWS SNS +
+ + + + AWS Kinesis Events +
+ + + AWS Kinesis +
+ + + AWS ElastiCache +
+ + + MQTT +

--- -Today's wrong abstractions lead to complexity on maintainability in the future. Usage of synchronous interfaces to reflect asynchronous nature of messaging queues is a good example of inaccurate abstraction. Usage of pure Go channels is a proper solution to distills asynchronous semantic of queueing systems into the idiomatic native Golang code. The library adapts Go channels for various systems and interface. - - -api | examples ---- | --- -**AWS EventBridge** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) - || [aws cdk](examples/eventbridge/serverless/main.go) - || [enqueue](examples/eventbridge/enqueue/eventbridge.go) - || [dequeue](examples/eventbridge/dequeue/eventbridge.go) -**AWS SQS** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) - || [aws cdk](examples/eventsqs/serverless/main.go) - || [enqueue](examples/eventsqs/enqueue/eventsqs.go) - || [dequeue](examples/eventsqs/dequeue/eventsqs.go) -**AWS SQS** | - || [enqueue](examples/sqs/enqueue/sqs.go) - || [dequeue](examples/sqs/dequeue/sqs.go) -**AWS SNS** | ![coming soon](https://img.shields.io/badge/coming%20soon-00b150?style=platic) - || enqueue -**AWS S3 Event** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) - || [aws cdk](./examples/events3/serverless/main.go) - || [dequeue](./examples/events3/dequeue/events3.go) -**AWS DynamoDB Streams** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) - || [aws cdk](./examples/eventddb/serverless/main.go) - || [dequeue](./examples/eventddb/dequeue/eventddb.go) -**AWS WebSocket API** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) - || [aws cdk](./examples/websocket/serverless/main.go) - || [dequeue](./examples/websocket/dequeue/websocket.go) -**AWS Kinesis** | ![serverless](https://img.shields.io/badge/serverless-e999b8?style=platic) ![coming soon](https://img.shields.io/badge/coming%20soon-00b150?style=platic) - || aws cdk - || enqueue - || dequeue -**AWS Kinesis** | ![coming soon](https://img.shields.io/badge/coming%20soon-00b150?style=platic) - || enqueue - || dequeue -**AWS Redis** | ![help needed](https://img.shields.io/badge/help%20needed-035392?style=platic) -**MQTT API** | ![help needed](https://img.shields.io/badge/help%20needed-035392?style=platic) - -Please let us know via [GitHub issues](https://github.com/fogfish/swarm/issue) your needs about queuing technologies. +Today's wrong abstractions lead to complexity on maintainability in the future. Usage of synchronous interfaces to reflect asynchronous nature of messaging queues is a good example of inaccurate abstraction. Usage of pure Go channels is a proper solution to distills asynchronous semantic of queueing systems into the idiomatic native Golang code. The library adapts Go channels for various systems and interface. Please let us know via [GitHub issues](https://github.com/fogfish/swarm/issue) your needs about queuing technologies. ## Inspiration @@ -94,30 +206,79 @@ Use `go get` to retrieve the library and add it as dependency to your applicatio go get -u github.com/fogfish/swarm ``` -- [Getting Started](#getting-started) +- [Inspiration](#inspiration) +- [Getting started](#getting-started) + - [Quick example](#quick-example) - [Produce (enqueue) messages](#produce-enqueue-messages) - [Consume (dequeue) messages](#consume-dequeue-messages) - [Configure library behavior](#configure-library-behavior) - [Message Delivery Guarantees](#message-delivery-guarantees) - [Delayed Guarantee vs Guarantee](#delayed-guarantee-vs-guarantee) - [Order of Messages](#order-of-messages) - - [Octet streams](#octet-streams) + - [Octet Streams](#octet-streams) - [Generic events](#generic-events) - - [Error handling](#error-handling) + - [Error Handling](#error-handling) - [Fail Fast](#fail-fast) - [Serverless](#serverless) - [Race condition in Serverless](#race-condition-in-serverless) +- [How To Contribute](#how-to-contribute) + - [commit message](#commit-message) + - [bugs](#bugs) +- [Bring Your Own Queue](#bring-your-own-queue) +- [License](#license) -### Produce (enqueue) messages +### Quick example -Please [see and try examples](examples). Its cover all basic use-cases with runnable code snippets, check the design pattern [Distributed event-driven Golang channels](./doc/pattern.md) for deep-dive into library philosophy. +Example below is most simplest illustration of enqueuing and dequeuing message +from AWS SQS. + +```go +package main + +import ( + "log/slog" + + "github.com/fogfish/swarm" + "github.com/fogfish/swarm/broker/sqs" + "github.com/fogfish/swarm/enqueue" + "github.com/fogfish/swarm/dequeue" +) + +func main() { + // create broker for AWS SQS + q, err := sqs.New("aws-sqs-queue-name") + if err != nil { + slog.Error("sqs broker has failed", "err", err) + return + } + + // create Golang channels + rcv, ack := dequeue.Typed[string](q) + out := swarm.LogDeadLetters(enqueue.Typed[string](q)) + + // use Golang channels for I/O + go func() { + for msg := range rcv { + out <- msg.Object + ack <- msg + } + }() + + q.Await() +} +``` + +Check the design pattern [Distributed event-driven Golang channels](./doc/pattern.md) for deep-dive into library philosophy. Also note, each supported broker comes with runnable examples that shows the library. + + +### Produce (enqueue) messages The following code snippet shows a typical flow of producing the messages using the library. ```go import ( "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/enqueue" ) // Use pure Golang struct to define semantic of messages and events @@ -127,13 +288,13 @@ type Note struct { } // Spawn a new instance of the messaging broker -q := swarm.Must(sqs.New("name-of-the-queue"), /* config options */) +q, err := sqs.New("name-of-the-queue"), /* config options */) // creates pair Golang channels dedicated for publishing -// messages of type Note through the messaging broker. The first channel +// messages of type [Note] through the messaging broker. The first channel // is dedicated to emit messages. The second one is the dead letter queue that // contains failed transmissions. -enq, dlq := queue.Enqueue[Note](q) +enq, dlq := enqueue.Typed[Note](q) // Enqueue message of type Note enq <- Note{ID: "note", Text: "some text"} @@ -144,14 +305,12 @@ q.Close() ### Consume (dequeue) messages -Please [see and try examples](examples). Its cover all basic use-cases with runnable code snippets, check the design pattern [Distributed event-driven Golang channels](./doc/pattern.md) for deep-dive into library philosophy. - The following code snippet shows a typical flow of consuming the messages using the library. ```go import ( "github.com/fogfish/swarm/broker/sqs" - "github.com/fogfish/swarm/queue" + "github.com/fogfish/swarm/dequeue" ) // Use pure Golang struct to define semantic of messages and events @@ -161,19 +320,21 @@ type Note struct { } // Spawn a new instance of the messaging broker -q := swarm.Must(sqs.New("name-of-the-queue", /* config options */)) +q, err := sqs.New("name-of-the-queue", /* config options */) // Create pair Golang channels dedicated for consuming // messages of type Note from the messaging broker. The first channel // is dedicated to receive messages. The second one is the channel to // acknowledge consumption -deq, ack := queue.Dequeue[Note](q) +deq, ack := dequeue.Typed[Note](q) // consume messages and then acknowledge it -for msg := range deq { - /* ... do something with msg.Object ...*/ - ack <- msg -} +go func() { + for msg := range deq { + /* ... do something with msg.Object and ack the message ...*/ + ack <- msg + } +}() // Await messages from the broker q.Await() @@ -181,24 +342,30 @@ q.Await() ### Configure library behavior -The library uses "option pattern" for the configuration. See all available [configuration options](./config.go), which are passed into the broker. Please note that each configuration options has `With` prefix: +The library uses "option pattern" for the configuration, which is divided into two parts: a generic I/O kernel configuration and a broker-specific configuration. Please note that each configuration option is prefixed with `With` and implemented in [config.go](./config.go) files. + ```go q, err := sqs.New("name-of-the-queue", - swarm.WithSource("name-of-my-component"), - swarm.WithRetryConstant(10 * time.Millisecond, 3), - swarm.WithPollFrequency(10 * time.Second), - /* ... */ + // WithXXX performs broker configuration + sqs.WithBatchSize(5), + // WithConfig performs generic kernel configuration + sqs.WithConfig( + swarm.WithSource("name-of-my-component"), + swarm.WithRetryConstant(10 * time.Millisecond, 3), + swarm.WithPollFrequency(10 * time.Second), + /* ... */ + ), ) ``` ### Message Delivery Guarantees -Usage of Golang channels as an abstraction raises a concern about grade of service on the message delivery guarantees. The library ensures exactly same grade of service as the underlying queueing system or event broker. Messages are delivered according to the promise once they are accepted by the remote side of queuing system. The library's built-in retry logic protects losses from temporary unavailability of the remote peer. However, Golang channels are sophisticated "in-memory buffers", which introduce a lag of few milliseconds between scheduling a message to the channel and dispatching message to the remote peer. Use one of the following policy to either accept or protect from the loss all the in-the-flight messages in case of catastrophic failures. +Usage of Golang channels as an abstraction raises a concern about grade of service on the message delivery guarantees. The library ensures exactly same grade of service as the underlying queueing system or event broker. Messages are delivered according to the promise once they are accepted by the remote side of queuing system. The library's built-in retry logic protects losses from temporary unavailability of the remote peer. However, Golang channels function as sophisticated "in-memory buffers," which can introduce a delay of a few milliseconds between scheduling a message to the channel and dispatching it to the remote peer. To handle catastrophic failures, choose one of the following policies to either accept or safeguard in-flight messages from potential loss. **At Most Once** is best effort policy, where a message is published without any formal acknowledgement of receipt, and it isn't replayed. Some messages can be lost as subscribers are not required to acknowledge receipt. -The library implements asymmetric approaches. The **enqueue** path uses buffered Golang channels for emitter and dead-letter queues. The **dequeue** path also uses buffered Golang channels for delivery message to consumer. The messages are automatically acknowledged to the broker upon successful scheduling. This means that information will be lost if the consumer crashes before it has finished processing the message. +The library implements asymmetric approaches for message handling. In the **enqueue** path, buffered Golang channels are used for both message emission and managing dead-letter queues. Similarly, the **dequeue** path uses buffered Golang channels to deliver messages to the consumer. ```go // Spawn a new instance of the messaging broker using At Most Once policy. @@ -209,16 +376,18 @@ q, err := sqs.New("name-of-the-queue", // for compatibility reasons two channels are returned on the enqueue path but // dead-letter-queue is nil -enq, dlq := queue.Enqueue[Note](q) +enq, dlq := enqueue.Typed[Note](q) // for compatibility reasons two channels are returned on the dequeue path but // ack channel acts as /dev/null discards any sent message -deq, ack := queue.Dequeue[Note](q) +deq, ack := dequeue.Typed[Note](q) ``` **At Least Once** is the default policy used by the library. The policy assume usage of "acknowledgement" protocol, which guarantees a message will be re-sent until it is formally acknowledged by a recipient. Messages should never be lost but it might be delivered more than once causing duplicate work to consumer. -The library implements also asymmetric approaches. The **enqueue** path uses unbuffered Golang channels to emit messages and handle dead-letter queue, which leads to a delayed guarantee. The delayed guarantee in this context implies that enqueueing of other messages is blocked until dead-letter queue is resolved. Alternatively, the application can use synchronous protocol to enqueue message. The **dequeue** path also uses unbuffered Golang channels for delivery message to consumer and acknowledge its processing. The acknowledgement of message by consumer guarantee reliable delivery of the message but might cause duplicates. +The library also implements asymmetric approaches for message handling. In the **enqueue** path, unbuffered Golang channels are used to emit messages and manage the dead-letter queue, resulting in a delayed guarantee. This means that enqueuing additional messages is blocked until the dead-letter queue is fully resolved. Alternatively, the application can opt for a synchronous protocol to enqueue messages. + +In the **dequeue** path, buffered Golang channels are used to deliver messages to the consumer and acknowledge their processing. While consumer acknowledgment ensures reliable message delivery, it may lead to message duplication. ```go // Spawn a new instance of the messaging broker using At Least Once policy. @@ -230,10 +399,10 @@ q, err := sqs.New("name-of-the-queue", ) // both channels are unbuffered -enq, dlq := queue.Enqueue[Note](q) +enq, dlq := enqueue.Typed[Note](q) // buffered channels of capacity n -deq, ack := queue.Dequeue[Note](q) +deq, ack := dequeue.Typed[Note](q) ``` **Exactly Once** is not supported by the library yet. @@ -241,25 +410,25 @@ deq, ack := queue.Dequeue[Note](q) ### Delayed Guarantee vs Guarantee -Usage of "At Least Once" policy (unbuffered channels) provides the delayed guarantee for producers. Let's consider the following example. If queue broker fails to send message `A` then the channel `enq` is blocked at sending message `B` until the program consumes message `A` from the dead-letter queue channel. +Usage of **At Least Once** policy (unbuffered channels) provides the delayed guarantee for producers. Let's consider the following example. If queue broker fails to send message `A` then the channel `enq` is blocked at sending message `B` until the program consumes message `A` from the dead-letter queue channel. ```go -enq, dlq := queue.Enqueue[*User](q) +enq, dlq := enqueue.Typed[User](q) -enq <- &User{ID: "A", Text: "some text by A"} // failed to send -enq <- &User{ID: "B", Text: "some text by B"} // blocked until dlq is processed -enq <- &User{ID: "C", Text: "some text by C"} +enq <- User{ID: "A", Text: "some text by A"} // failed to send +enq <- User{ID: "B", Text: "some text by B"} // blocked until dlq is processed +enq <- User{ID: "C", Text: "some text by C"} ``` The delayed guarantee is efficient on batch processing, pipelining but might cause complication at transactional processing. Therefore, the library also support a synchronous variant to producing a message: ```go // Creates "synchronous" variant of the queue -user := queue.New[User](q) +user := enqueue.NewTyped[User](q) // Synchronously enqueue the message. It ensure that message is scheduled for // delivery to remote peer once function successfully returns. -if err := user.Put(&User{ID: "A", Text: "some text by A"}); err != nil { +if err := user.Enq(context.Background(), &User{ID: "A", Text: "some text by A"}); err != nil { // handle error } ``` @@ -269,8 +438,8 @@ if err := user.Put(&User{ID: "A", Text: "some text by A"}); err != nil { The library guarantee ordering of the messages when they are produced over same Golang channel. Let's consider a following example: ```go -user, _ := queue.Enqueue[*User](q) -note, _ := queue.Enqueue[*Note](q) +user, _ := enqueue.Typed[User](q) +note, _ := enqueue.Typed[Note](q) user <- &User{ID: "A", Text: "some text by A"} note <- &Note{ID: "B", Text: "some note A"} @@ -291,46 +460,42 @@ import ( queue "github.com/fogfish/swarm/queue/bytes" ) -enq, dlq := queue.Enqueue(q, "Note") -deq, ack := queue.Dequeue(q, "Note") +enq, dlq := enqueue.Bytes(q, "Note") +deq, ack := enqueue.Bytes(q, "Note") ``` -Please see example about [binary](./examples/bytes) consumer/producer. +Please see example about binary [producer](./broker/sqs/examples/enqueue/bytes/sqs.go) and [consumer](./broker/sqs/examples/dequeue/bytes/sqs.go). + ### Generic events -Event defines immutable fact(s) placed into the queueing system. -Event resembles the concept of [Action](https://schema.org/Action) as it is defined by schema.org. +An event represents an immutable fact placed into the queuing system. It is conceptually similar to the [Action](https://schema.org/Action) defined by schema.org. > An action performed by a direct agent and indirect participants upon a direct object. -This type supports development of event-driven solutions that treat data as -a collection of immutable facts, which are queried and processed in real-time. -These applications processes logical log of events, each event defines a change -to current state of the object, i.e. which attributes were inserted, -updated or deleted (a kind of diff). The event identifies the object that was -changed together with using unique identifier. +This type facilitates the development of event-driven solutions that treat data as a collection of immutable facts, which can be queried and processed in real-time. These applications process a logical event log, where each event represents a change to the current state of an object, such as which attributes were inserted, updated, or deleted (essentially a diff). Each event uniquely identifies the affected object using a unique identifier. -The library support this concept through generic type `swarm.Event[T]` using [the Higher-Kinded Type Classes abstraction](https://github.com/fogfish/golem/blob/main/doc/typeclass.md). This abstraction allows to "overload" well-defined behavior of `swarm.Event[T]` with application specific type: +Unlike other solutions, this approach does not use an envelope for events. Instead, it pairs metadata and data side by side, making it more extendable. ```go -import ( - "github.com/fogfish/swarm" - queue "github.com/fogfish/swarm/queue/events" -) -// declares the application specific event type. -type EventCreateNote swarm.Event[*Note] +type Meta struct { + swarm.Meta + About string `json:"about"` +} -func (EventCreateNote) HKT1(swarm.EventType) {} -func (EventCreateNote) HKT2(*Note) {} +type User struct { + ID string `json:"id"` + Text string `json:"text"` +} // creates Golang channels to produce / consume messages -enq, dlq := queue.Enqueue[*Note, EventCreateNote](q) -deq, ack := queue.Dequeue[*Note, EventCreateNote](q) +enq, dlq := enqueue.Event[Meta, User](q) +deq, ack := enqueue.Event[Meta, User](q) ``` -Please see example about [event-driven](./examples/events/) consumer/producer. +Please see example about event [producer](./broker/sqs/examples/enqueue/event/sqs.go) and [consumer](./broker/sqs/examples/dequeue/event/sqs.go). + ### Error Handling @@ -338,21 +503,26 @@ The error handling on channel level is governed either by [dead-letter queue](#m ```go stderr := make(chan error) -q := queue.Must(sqs.New("swarm-test", swarm.WithStdErr(stderr))) +q, err := sqs.New("swarm-test", + sqs.WithConfig( + swarm.WithStdErr(stderr), + ), +) for err := range stderr { // error handling loop } ``` + ### Fail Fast -Existing message routing architecture assumes that micro-batch of messages is read from broker. These messages are dispatched to channels and it waits for acks. New micro-batch is not read until all messages are acknowledged or `TimeToFlight` timer is expired. In the time critical system or serverless application "fail fast" is the best strategy (e.g. lambda function does not need to idle for timeout). +The existing message routing architecture assumes that a micro-batch of messages is read from the broker, dispatched to channels, and then waits for acknowledgments. A new micro-batch is not read until all messages are acknowledged, or the `TimeToFlight` timer expires. In time-critical systems or serverless applications, a "fail fast" strategy is more effective (e.g., a Lambda function doesn't need to idle until the timeout). Send negative acknowledgement to `ack` channel to indicate error on message processing. ```go -deq, ack := queue.Dequeue[Note](q) +deq, ack := dequeue.Typed[Note](q) // consume messages and then acknowledge it for msg := range deq { @@ -368,7 +538,9 @@ for msg := range deq { ### Serverless -The library support development of serverless event-driven application using AWS service. The library provides AWS CDK Golang constructs to spawn consumers. See example of [serverless consumer](./examples/eventbridge/dequeue/eventbridge.go) and corresponding AWS CDK [application](./examples/eventbridge/serverless/main.go). +The library primarily support development of serverless event-driven application using AWS service. The library provides AWS CDK Golang constructs to spawn consumers. See example of [serverless consumer](./broker/eventbridge/examples/dequeue/typed/eventbridge.go) and corresponding AWS CDK [application](./broker/eventbridge/examples/serverless/eventbridge.go). + +It consistently implements a pattern - "create Broker, attach Sinks". ```go package main @@ -389,15 +561,16 @@ func main() { }, ) + // create broker broker := eventbridge.NewBroker(stack, jsii.String("Broker"), nil) broker.NewEventBus(nil) broker.NewSink( &eventbridge.SinkProps{ Source: []string{"swarm-example-eventbridge"}, - Lambda: &scud.FunctionGoProps{ - SourceCodePackage: "github.com/fogfish/swarm", - SourceCodeLambda: "examples/eventbridge/dequeue", + Function: &scud.FunctionGoProps{ + SourceCodeModule: "github.com/fogfish/swarm/broker/eventbridge", + SourceCodeLambda: "examples/dequeue/typed", }, }, ) @@ -406,37 +579,15 @@ func main() { } ``` -Note: **AWS Event Bridge** has a feature that allows to [match execution of consumer to the pattern](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html#CloudWatchEventsPatterns) of JSON object. Use `swarm.Event[T]` type to build reliable matching of incoming events: - -```go -/* -enq <- &swarm.Event[*User]{ - Agent: "swarm:example", - Participant: "user", - Object: &User{ID: "user", Text: "some text"}, -} -*/ - -stack.NewSink( - &eventbridge.SinkProps{ - Pattern: map[string]interface{}{ - "@type": []string{"[user:Event[*swarm.User]]"}, - "agent": []string{"[swarm:example]"}, - "participant": []string{"[user]"}, - }, - /* ... */ - }, -) -``` ### Race condition in Serverless -In serverless environment doing dequeue and enqueue might cause a raise condition. The dequeue loop might finish earlier than other messages emitted. +In a serverless environment, performing dequeue and enqueue operations can lead to race conditions. Specifically, the dequeue loop may complete before other emitted messages are processed. ```go -rcv, ack := queue.Dequeue[/* .. */](broker1) -snd, dlq := queue.Enqueue[/* .. */](broker2) +rcv, ack := dequeue.Typed[/* .. */](broker1) +snd, dlq := enqueue.Typed[/* .. */](broker2) for msg := range rcv { snd <- // ... @@ -448,9 +599,9 @@ for msg := range rcv { } ``` -Use one of the following techniques to overcome the issue -1. Add sleep before ack -2. Use sync version of sender +Unfortunately, the library does not provide yet ultimate solution. Either sleep of sync senders are required. + + ## How To Contribute @@ -480,12 +631,6 @@ The commit message helps us to write a good release note, speed-up review proces If you experience any issues with the library, please let us know via [GitHub issues](https://github.com/fogfish/swarm/issue). We appreciate detailed and accurate reports that help us to identity and replicate the issue. -### benchmarking - -```bash -cd queue/sqs -go test -run=^$ -bench=. -benchtime 100x -``` ## Bring Your Own Queue diff --git a/bag.go b/bag.go index 360625d..0449a27 100644 --- a/bag.go +++ b/bag.go @@ -25,6 +25,9 @@ type Msg[T any] struct { // Error on the message processing Error error + // I/O Context of the message, as obtained from broker + IOContext any + // Message decoded content Object T } @@ -47,16 +50,20 @@ type Bag struct { // Error on the message processing Error error + // I/O Context of the message, as obtained from broker + IOContext any + // Message raw content Object []byte } func ToMsg[T any](bag Bag, object T) Msg[T] { return Msg[T]{ - Category: bag.Category, - Digest: bag.Digest, - Error: bag.Error, - Object: object, + Category: bag.Category, + Digest: bag.Digest, + Error: bag.Error, + IOContext: bag.IOContext, + Object: object, } } diff --git a/config.go b/config.go index ce348a0..c444ca8 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,13 @@ import ( "github.com/fogfish/swarm/kernel/backoff" ) +// Environment variable to config kernel +const ( + EnvConfigPollFrequency = "CONFIG_SWARM_POLL_FREQUENCY" + EnvConfigTimeToFlight = "CONFIG_SWARM_TIME_TO_FLIGHT" + EnvConfigNetworkTimeout = "CONFIG_SWARM_NETWORK_TIMEOUT" +) + // Grade of Service Policy type Policy int @@ -64,22 +71,26 @@ type Config struct { // Timeout for any network operations NetworkTimeout time.Duration - // Codec for binary packets - Codec Codec + // Fail fast the message if category is not known to kernel. + FailOnUnknownCategory bool + + // PacketCodec for binary packets + PacketCodec Codec } func NewConfig() Config { return Config{ - Source: "github.com/fogfish/swarm", - Policy: PolicyAtLeastOnce, - CapOut: 0, - CapDlq: 0, - CapRcv: 0, - CapAck: 0, - Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), - PollFrequency: 10 * time.Millisecond, - TimeToFlight: 5 * time.Second, - NetworkTimeout: 5 * time.Second, + Source: "github.com/fogfish/swarm", + Policy: PolicyAtLeastOnce, + CapOut: 0, + CapDlq: 0, + CapRcv: 0, + CapAck: 0, + Backoff: backoff.Exp(10*time.Millisecond, 10, 0.5), + PollFrequency: 10 * time.Millisecond, + TimeToFlight: 5 * time.Second, + NetworkTimeout: 5 * time.Second, + FailOnUnknownCategory: false, } } @@ -179,9 +190,9 @@ func WithNetworkTimeout(t time.Duration) Option { // - CONFIG_SWARM_NETWORK_TIMEOUT func WithConfigFromEnv() Option { return func(conf *Config) { - conf.PollFrequency = durationFromEnv("CONFIG_SWARM_POLL_FREQUENCY", conf.PollFrequency) - conf.TimeToFlight = durationFromEnv("CONFIG_SWARM_TIME_TO_FLIGHT", conf.TimeToFlight) - conf.NetworkTimeout = durationFromEnv("CONFIG_SWARM_NETWORK_TIMEOUT", conf.NetworkTimeout) + conf.PollFrequency = durationFromEnv(EnvConfigPollFrequency, conf.PollFrequency) + conf.TimeToFlight = durationFromEnv(EnvConfigTimeToFlight, conf.TimeToFlight) + conf.NetworkTimeout = durationFromEnv(EnvConfigNetworkTimeout, conf.NetworkTimeout) } } @@ -225,3 +236,17 @@ func WithPolicyAtLeastOnce(n int) Option { conf.CapAck = n } } + +// Configure codec for binary packets +func WithPacketCodec(codec Codec) Option { + return func(conf *Config) { + conf.PacketCodec = codec + } +} + +// Fail fast the message if category is not known to kernel. +func WithFailOnUnknownCategory() Option { + return func(conf *Config) { + conf.FailOnUnknownCategory = true + } +} diff --git a/dequeue/dequeue.go b/dequeue/dequeue.go index 6c74059..eaacdcd 100644 --- a/dequeue/dequeue.go +++ b/dequeue/dequeue.go @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Dequeuer, category ...string) (<-chan swarm.Msg[s // Create pair of channels to receive and acknowledge pure binary func Bytes(q *kernel.Dequeuer, cat string) (<-chan swarm.Msg[[]byte], chan<- swarm.Msg[[]byte]) { - if q.Config.Codec != nil { - return kernel.Dequeue(q, cat, q.Config.Codec) + if q.Config.PacketCodec != nil { + return kernel.Dequeue(q, cat, q.Config.PacketCodec) } return kernel.Dequeue(q, cat, encoding.NewCodecByte()) diff --git a/enqueue/enqueue.go b/enqueue/enqueue.go index a38f60d..b57eabe 100644 --- a/enqueue/enqueue.go +++ b/enqueue/enqueue.go @@ -33,8 +33,8 @@ func Event[M, T any](q *kernel.Enqueuer, category ...string) (snd chan<- swarm.E // Create pair of channels to emit pure binaries func Bytes(q *kernel.Enqueuer, cat string) (snd chan<- []byte, dlq <-chan []byte) { - if q.Config.Codec != nil { - return kernel.Enqueue(q, cat, q.Config.Codec) + if q.Config.PacketCodec != nil { + return kernel.Enqueue(q, cat, q.Config.PacketCodec) } return kernel.Enqueue(q, cat, encoding.NewCodecByte()) diff --git a/enqueue/writer.go b/enqueue/writer.go index 923969a..96b7fdd 100644 --- a/enqueue/writer.go +++ b/enqueue/writer.go @@ -119,8 +119,8 @@ type EmitterBytes struct { // Creates synchronous emitter func NewBytes(q *kernel.Enqueuer, category string) *EmitterBytes { var codec swarm.Codec - if q.Config.Codec != nil { - codec = q.Config.Codec + if q.Config.PacketCodec != nil { + codec = q.Config.PacketCodec } else { codec = encoding.NewCodecByte() } diff --git a/errors.go b/errors.go index 08bbc77..ec685ff 100644 --- a/errors.go +++ b/errors.go @@ -8,10 +8,35 @@ package swarm -import "github.com/fogfish/faults" +import ( + "fmt" + "time" + + "github.com/fogfish/faults" +) const ( ErrServiceIO = faults.Type("service i/o failed") ErrEnqueue = faults.Type("enqueue is failed") ErrDequeue = faults.Type("dequeue is failed") ) + +type errTimeout struct { + op string + timer time.Duration +} + +func ErrTimeout(op string, timer time.Duration) error { + return errTimeout{ + op: op, + timer: timer, + } +} + +func (err errTimeout) Error() string { + return fmt.Sprintf("timeout %s after %s", err.op, err.timer) +} + +func (err errTimeout) Timeout() time.Duration { + return err.timer +} diff --git a/kernel/bridge.go b/kernel/bridge.go index 2d38815..5121924 100644 --- a/kernel/bridge.go +++ b/kernel/bridge.go @@ -10,7 +10,6 @@ package kernel import ( "context" - "fmt" "time" "github.com/fogfish/swarm" @@ -52,7 +51,7 @@ func (s *Bridge) Dispatch(seq []swarm.Bag) error { case err := <-s.session: return err case <-time.After(s.timeToFlight): - return fmt.Errorf("ack timeout") + return swarm.ErrTimeout("ack", s.timeToFlight) } } diff --git a/kernel/cathode.go b/kernel/cathode.go index 4a43787..d73a08d 100644 --- a/kernel/cathode.go +++ b/kernel/cathode.go @@ -10,6 +10,7 @@ package kernel import ( "context" + "fmt" "log/slog" "sync" "time" @@ -102,6 +103,13 @@ func (k *Dequeuer) receive() { k.Config.StdErr <- err return } + } else { + if k.Config.FailOnUnknownCategory { + slog.Error("Unknown category", "cat", bag.Category, "kernel", k.Config.Source) + k.Cathode.Err(k.context, bag.Digest, swarm.ErrDequeue.New(fmt.Errorf("unknown category %s ", bag.Category))) + } else { + slog.Warn("Unknown category", "cat", bag.Category, "kernel", k.Config.Source) + } } } } diff --git a/kernel/cathode_test.go b/kernel/cathode_test.go index a8fae4f..7f4c352 100644 --- a/kernel/cathode_test.go +++ b/kernel/cathode_test.go @@ -22,7 +22,14 @@ func TestDequeuer(t *testing.T) { codec := encoding.NewCodecJson[string]() none := mockCathode(nil, nil) pass := mockCathode(make(chan string), - []swarm.Bag{{Category: "test", Digest: "1", Object: []byte(`"1"`)}}, + []swarm.Bag{ + { + Category: "test", + Digest: "1", + IOContext: "context", + Object: []byte(`"1"`), + }, + }, ) t.Run("Kernel", func(t *testing.T) { @@ -60,6 +67,21 @@ func TestDequeuer(t *testing.T) { k.Close() }) + t.Run("Dequeue.1.Context", func(t *testing.T) { + k := NewDequeuer(pass, swarm.Config{PollFrequency: 10 * time.Millisecond}) + rcv, ack := Dequeue(k, "test", codec) + go k.Await() + + msg := <-rcv + ack <- msg + it.Then(t).Should( + it.Equal(string(<-pass.ack), `1`), + it.Equal(msg.IOContext.(string), "context"), + ) + + k.Close() + }) + t.Run("Backlog", func(t *testing.T) { k := NewDequeuer(pass, swarm.Config{CapAck: 4, PollFrequency: 1 * time.Millisecond}) rcv, ack := Dequeue(k, "test", codec) diff --git a/version.go b/version.go index 063e0f1..c16284b 100644 --- a/version.go +++ b/version.go @@ -8,4 +8,4 @@ package swarm -const Version = "v0.20.0-alpha" +const Version = "v0.20.1"