diff --git a/broker/eventbridge/broker.go b/broker/eventbridge/broker.go index c6a854c..c8af37c 100644 --- a/broker/eventbridge/broker.go +++ b/broker/eventbridge/broker.go @@ -10,6 +10,7 @@ package eventbridge import ( "context" + "log/slog" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" @@ -46,6 +47,7 @@ func New(bus string, opts ...swarm.Option) (swarm.Broker, error) { ctx, can := context.WithCancel(context.Background()) + slog.Info("created broker", "type", "eventbridge") return &broker{ config: conf, client: cli, diff --git a/broker/eventbridge/eventbridge.go b/broker/eventbridge/eventbridge.go index 1a0bd98..2c88eea 100644 --- a/broker/eventbridge/eventbridge.go +++ b/broker/eventbridge/eventbridge.go @@ -48,7 +48,7 @@ func newService(conf *swarm.Config) (EventBridge, error) { aws, err := config.LoadDefaultConfig(context.Background()) if err != nil { - return nil, err + return nil, swarm.ErrServiceIO.New(err) } return eventbridge.NewFromConfig(aws), nil @@ -72,7 +72,7 @@ func (cli *client) Enq(bag swarm.Bag) error { }, ) if err != nil { - return err + return swarm.ErrEnqueue.New(err) } if ret.FailedEntryCount > 0 { diff --git a/broker/events3/broker.go b/broker/events3/broker.go index b7c9d5e..e46c37a 100644 --- a/broker/events3/broker.go +++ b/broker/events3/broker.go @@ -10,6 +10,7 @@ package events3 import ( "context" + "log/slog" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" @@ -27,6 +28,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { ctx, can := context.WithCancel(context.Background()) + slog.Info("created broker", "type", "event-s3") return &broker{ config: conf, channels: swarm.NewChannels(), diff --git a/broker/eventsqs/broker.go b/broker/eventsqs/broker.go index 191831b..d767cd2 100644 --- a/broker/eventsqs/broker.go +++ b/broker/eventsqs/broker.go @@ -9,6 +9,8 @@ package eventsqs import ( + "log/slog" + "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/fogfish/swarm" @@ -28,6 +30,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { return nil, err } + slog.Info("created broker", "type", "event-sqs") return &broker{ Broker: bro, config: conf, diff --git a/broker/sqs/broker.go b/broker/sqs/broker.go index 6db3757..6acdfbf 100644 --- a/broker/sqs/broker.go +++ b/broker/sqs/broker.go @@ -10,6 +10,7 @@ package sqs import ( "context" + "log/slog" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/fogfish/swarm" @@ -47,6 +48,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { ctx, can := context.WithCancel(context.Background()) + slog.Info("created broker", "type", "sqs") return &broker{ config: conf, client: cli, diff --git a/broker/sqs/sqs.go b/broker/sqs/sqs.go index b74c3f6..9888214 100644 --- a/broker/sqs/sqs.go +++ b/broker/sqs/sqs.go @@ -41,7 +41,7 @@ func newClient(queue string, config *swarm.Config) (*client, error) { }, ) if err != nil { - return nil, err + return nil, swarm.ErrServiceIO.New(err) } return &client{ @@ -62,7 +62,7 @@ func newService(conf *swarm.Config) (SQS, error) { aws, err := config.LoadDefaultConfig(context.Background()) if err != nil { - return nil, err + return nil, swarm.ErrServiceIO.New(err) } return sqs.NewFromConfig(aws), nil @@ -89,7 +89,11 @@ func (cli *client) Enq(bag swarm.Bag) error { QueueUrl: cli.queue, }, ) - return err + if err != nil { + return swarm.ErrEnqueue.New(err) + } + + return nil } // Ack acknowledges message to broker @@ -103,7 +107,11 @@ func (cli *client) Ack(bag swarm.Bag) error { ReceiptHandle: aws.String(string(bag.Digest)), }, ) - return err + if err != nil { + return swarm.ErrServiceIO.New(err) + } + + return nil } // Deq dequeues message from broker @@ -120,7 +128,7 @@ func (cli client) Deq(cat string) (swarm.Bag, error) { }, ) if err != nil { - return swarm.Bag{}, err + return swarm.Bag{}, swarm.ErrDequeue.New(err) } if len(result.Messages) == 0 { diff --git a/config.go b/config.go index 09c705e..39520fc 100644 --- a/config.go +++ b/config.go @@ -9,9 +9,11 @@ package swarm import ( + "log/slog" "time" "github.com/fogfish/swarm/internal/backoff" + "github.com/fogfish/swarm/internal/pipe" ) // Grade of Service Policy @@ -127,13 +129,26 @@ func WithRetry(backoff Retry) Option { } } -// Configure Channel for global errors +// Configure broker to route global errors to channel func WithStdErr(stderr chan<- error) Option { return func(conf *Config) { conf.StdErr = stderr } } +// Configure broker to log standard errors +func WithLogStdErr() Option { + err := make(chan error) + + pipe.ForEach(err, func(err error) { + slog.Error("Broker fialed", "error", err) + }) + + return func(conf *Config) { + conf.StdErr = err + } +} + // Frequency to poll broker api func WithPollFrequency(t time.Duration) Option { return func(conf *Config) { diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..05a83be --- /dev/null +++ b/errors.go @@ -0,0 +1,9 @@ +package swarm + +import "github.com/fogfish/faults" + +const ( + ErrServiceIO = faults.Type("service i/o failed") + ErrEnqueue = faults.Type("enqueue is failed") + ErrDequeue = faults.Type("dequeue is failed") +) diff --git a/event.go b/event.go index 0007073..b7b431a 100644 --- a/event.go +++ b/event.go @@ -70,3 +70,9 @@ type Event[T any] struct { func (Event[T]) HKT1(EventType) {} func (Event[T]) HKT2(T) {} + +// Fail Event with error +func (evt *Event[T]) Fail(err error) *Event[T] { + evt.Err = err + return evt +} diff --git a/examples/bytes/dequeue/bytes.go b/examples/bytes/dequeue/bytes.go index 2076410..bbf5c11 100644 --- a/examples/bytes/dequeue/bytes.go +++ b/examples/bytes/dequeue/bytes.go @@ -9,7 +9,7 @@ package main import ( - "fmt" + "log/slog" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" @@ -18,7 +18,7 @@ import ( ) func main() { - q := queue.Must(sqs.New("swarm-test")) + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) go actor("user").handle(bytes.Dequeue(q, "User")) go actor("note").handle(bytes.Dequeue(q, "Note")) @@ -27,12 +27,11 @@ func main() { q.Await() } -// type actor string func (a actor) handle(rcv <-chan *swarm.Msg[[]byte], ack chan<- *swarm.Msg[[]byte]) { for msg := range rcv { - fmt.Printf("event on %s > %s\n", a, msg.Object) + slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg } } diff --git a/examples/bytes/enqueue/bytes.go b/examples/bytes/enqueue/bytes.go index 2d83451..dabba47 100644 --- a/examples/bytes/enqueue/bytes.go +++ b/examples/bytes/enqueue/bytes.go @@ -9,17 +9,18 @@ package main import ( + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" "github.com/fogfish/swarm/queue" "github.com/fogfish/swarm/queue/bytes" ) func main() { - q := queue.Must(sqs.New("swarm-test")) + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - user, _ := bytes.Enqueue(q, "User") - note, _ := bytes.Enqueue(q, "Note") - like, _ := bytes.Enqueue(q, "Like") + user := queue.LogDeadLetters(bytes.Enqueue(q, "User")) + note := queue.LogDeadLetters(bytes.Enqueue(q, "Note")) + like := queue.LogDeadLetters(bytes.Enqueue(q, "Like")) user <- []byte("user|some text by user") diff --git a/examples/eventbridge/dequeue/eventbridge.go b/examples/eventbridge/dequeue/eventbridge.go index 771398c..4c0a583 100644 --- a/examples/eventbridge/dequeue/eventbridge.go +++ b/examples/eventbridge/dequeue/eventbridge.go @@ -9,7 +9,7 @@ package main import ( - "fmt" + "log/slog" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/eventbridge" @@ -32,7 +32,7 @@ type Like struct { } func main() { - q := queue.Must(eventbridge.New("swarm-example-eventbridge")) + q := queue.Must(eventbridge.New("swarm-example-eventbridge", swarm.WithLogStdErr())) go actor[User]("user").handle(queue.Dequeue[User](q)) go actor[Note]("note").handle(queue.Dequeue[Note](q)) @@ -45,7 +45,7 @@ type actor[T any] string func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { for msg := range rcv { - fmt.Printf("event on %s > %+v\n", a, msg.Object) + slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg } } diff --git a/examples/eventbridge/enqueue/eventbridge.go b/examples/eventbridge/enqueue/eventbridge.go index 88bd971..b8bec35 100644 --- a/examples/eventbridge/enqueue/eventbridge.go +++ b/examples/eventbridge/enqueue/eventbridge.go @@ -32,11 +32,12 @@ type Like struct { func main() { q := queue.Must(eventbridge.New("swarm-example-eventbridge-latest", swarm.WithSource("swarm-example-eventbridge"), + swarm.WithLogStdErr(), )) - user, _ := queue.Enqueue[*User](q) - note, _ := queue.Enqueue[*Note](q) - like, _ := queue.Enqueue[*Like](q) + user := queue.LogDeadLetters(queue.Enqueue[*User](q)) + note := queue.LogDeadLetters(queue.Enqueue[*Note](q)) + like := queue.LogDeadLetters(queue.Enqueue[*Like](q)) user <- &User{ID: "user", Text: "some text"} note <- &Note{ID: "note", Text: "some text"} diff --git a/examples/events/dequeue/events.go b/examples/events/dequeue/events.go index 9053faa..5392289 100644 --- a/examples/events/dequeue/events.go +++ b/examples/events/dequeue/events.go @@ -51,7 +51,7 @@ func (EventNote) HKT1(swarm.EventType) {} func (EventNote) HKT2(*Note) {} func main() { - q := queue.Must(sqs.New("swarm-test")) + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) go create(events.Dequeue[*User, EventCreateUser](q)) go update(events.Dequeue[*User, EventUpdateUser](q)) diff --git a/examples/events/enqueue/events.go b/examples/events/enqueue/events.go index d2dcc8a..9ac2883 100644 --- a/examples/events/enqueue/events.go +++ b/examples/events/enqueue/events.go @@ -47,12 +47,12 @@ func (EventNote) HKT1(swarm.EventType) {} func (EventNote) HKT2(*Note) {} func main() { - q := queue.Must(sqs.New("swarm-test")) + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) - userCreated, _ := events.Enqueue[*User, EventCreateUser](q) - userUpdated, _ := events.Enqueue[*User, EventUpdateUser](q) - userRemoved, _ := events.Enqueue[*User, EventRemoveUser](q) - note, _ := events.Enqueue[*Note, EventNote](q) + userCreated := queue.LogDeadLetters(events.Enqueue[*User, EventCreateUser](q)) + userUpdated := queue.LogDeadLetters(events.Enqueue[*User, EventUpdateUser](q)) + userRemoved := queue.LogDeadLetters(events.Enqueue[*User, EventRemoveUser](q)) + note := queue.LogDeadLetters(events.Enqueue[*Note, EventNote](q)) // // Multiple channels emits events diff --git a/examples/events3/dequeue/events3.go b/examples/events3/dequeue/events3.go index d3af5ca..50386cf 100644 --- a/examples/events3/dequeue/events3.go +++ b/examples/events3/dequeue/events3.go @@ -12,12 +12,13 @@ import ( "encoding/json" "fmt" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/events3" "github.com/fogfish/swarm/queue" ) func main() { - q := queue.Must(events3.New("swarm-test")) + q := queue.Must(events3.New("swarm-test", swarm.WithLogStdErr())) go common(events3.Dequeue(q)) diff --git a/examples/eventsqs/dequeue/eventsqs.go b/examples/eventsqs/dequeue/eventsqs.go index 6f6320b..62e507b 100644 --- a/examples/eventsqs/dequeue/eventsqs.go +++ b/examples/eventsqs/dequeue/eventsqs.go @@ -9,7 +9,7 @@ package main import ( - "fmt" + "log/slog" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/eventsqs" @@ -32,7 +32,7 @@ type Like struct { } func main() { - q := queue.Must(eventsqs.New("swarm-example-sqs-latest")) + q := queue.Must(eventsqs.New("swarm-example-sqs-latest", swarm.WithLogStdErr())) go actor[User]("user").handle(queue.Dequeue[User](q)) go actor[Note]("note").handle(queue.Dequeue[Note](q)) @@ -41,12 +41,11 @@ func main() { q.Await() } -// type actor[T any] string func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { for msg := range rcv { - fmt.Printf("event on %s > %+v", a, msg.Object) + slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg } } diff --git a/examples/eventsqs/enqueue/eventsqs.go b/examples/eventsqs/enqueue/eventsqs.go index b873886..51c527f 100644 --- a/examples/eventsqs/enqueue/eventsqs.go +++ b/examples/eventsqs/enqueue/eventsqs.go @@ -9,6 +9,7 @@ package main import ( + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/eventsqs" "github.com/fogfish/swarm/queue" ) @@ -29,7 +30,7 @@ type Like struct { } func main() { - q := queue.Must(eventsqs.New("swarm-example-sqs-latest")) + q := queue.Must(eventsqs.New("swarm-example-sqs-latest", swarm.WithLogStdErr())) user, _ := queue.Enqueue[*User](q) note, _ := queue.Enqueue[*Note](q) diff --git a/examples/sqs/dequeue/sqs.go b/examples/sqs/dequeue/sqs.go index 14b8f41..292eb5a 100644 --- a/examples/sqs/dequeue/sqs.go +++ b/examples/sqs/dequeue/sqs.go @@ -9,7 +9,8 @@ package main import ( - "fmt" + "log/slog" + "os" "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" @@ -32,7 +33,17 @@ type Like struct { } func main() { - q := queue.Must(sqs.New("swarm-test")) + slog.SetDefault( + slog.New( + slog.NewTextHandler(os.Stdout, + &slog.HandlerOptions{ + Level: slog.LevelDebug, + }, + ), + ), + ) + + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) go actor[User]("user").handle(queue.Dequeue[User](q)) go actor[Note]("note").handle(queue.Dequeue[Note](q)) @@ -41,12 +52,11 @@ func main() { q.Await() } -// type actor[T any] string func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) { for msg := range rcv { - fmt.Printf("event on %s > %+v\n", a, msg.Object) + slog.Info("Event", "type", a, "msg", msg.Object) ack <- msg } } diff --git a/examples/sqs/enqueue/sqs.go b/examples/sqs/enqueue/sqs.go index 8ed1fec..4855eff 100644 --- a/examples/sqs/enqueue/sqs.go +++ b/examples/sqs/enqueue/sqs.go @@ -9,6 +9,10 @@ package main import ( + "log/slog" + "os" + + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" "github.com/fogfish/swarm/queue" ) @@ -29,11 +33,21 @@ type Like struct { } func main() { - q := queue.Must(sqs.New("swarm-test")) - - user, _ := queue.Enqueue[*User](q) - note, _ := queue.Enqueue[*Note](q) - like, _ := queue.Enqueue[*Like](q) + slog.SetDefault( + slog.New( + slog.NewTextHandler(os.Stdout, + &slog.HandlerOptions{ + Level: slog.LevelDebug, + }, + ), + ), + ) + + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) + + user := queue.LogDeadLetters(queue.Enqueue[*User](q)) + note := queue.LogDeadLetters(queue.Enqueue[*Note](q)) + like := queue.LogDeadLetters(queue.Enqueue[*Like](q)) user <- &User{ID: "user", Text: "some text by user"} diff --git a/go.mod b/go.mod index 63bf0c2..2b73d11 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1 // indirect github.com/fatih/color v1.15.0 // indirect + github.com/fogfish/faults v0.2.0 // indirect github.com/fogfish/golem/hseq v1.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect diff --git a/go.sum b/go.sum index 354f891..eaa554a 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fogfish/curie v1.8.2 h1:+4CezyjZ5uszSXUZAV27gfKwv58w3lKTH0JbQwh3S9A= github.com/fogfish/curie v1.8.2/go.mod h1:jPv7pg4hHd8Ug/USG29ZA2bAwlRfh/iinY90/30ATGg= +github.com/fogfish/faults v0.2.0 h1:3KHvZN3cgv2omAGw0MCVH/AbrqxfNag+TFGpgUp6m1w= +github.com/fogfish/faults v0.2.0/go.mod h1:PtvzLt9TP4IF/hRkwRp4dZub42oaMrLbxdS6vmSCJOs= github.com/fogfish/golem/hseq v1.1.1 h1:AV8Ziu5wavpvO31NpecxWk56Un7ahGZaUfbcDKtJLV0= github.com/fogfish/golem/hseq v1.1.1/go.mod h1:N5y7RLLJyL8iNxFOcD6mkciIBx5TJ9mT1fRszr+ByhQ= github.com/fogfish/golem/optics v0.11.0 h1:sIEe6cV/DzpJgobTyWd/GKSXg3XHgNFCsz/huUFqCWc= diff --git a/internal/router/router.go b/internal/router/router.go index 460794d..2f27b3b 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -10,6 +10,7 @@ package router import ( "fmt" + "log/slog" "sync" "time" @@ -40,6 +41,7 @@ func (router *Router) Register(category string) { defer router.Unlock() router.sock[category] = make(chan swarm.Bag, router.config.DequeueCapacity) + slog.Debug("Registered channel for category", "category", category) } func (router *Router) Ack(bag swarm.Bag) error { @@ -83,7 +85,7 @@ func (router *Router) Await(d time.Duration) error { return router.onAck(bag) }) if err != nil { - return err + return swarm.ErrServiceIO.New(err) } } diff --git a/queue/bytes/dequeue.go b/queue/bytes/dequeue.go index 864a8bb..ef8281c 100644 --- a/queue/bytes/dequeue.go +++ b/queue/bytes/dequeue.go @@ -9,6 +9,8 @@ package bytes import ( + "log/slog" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" ) @@ -56,5 +58,7 @@ func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swa return msg, nil }) + slog.Debug("Created dequeue channels: rcv, ack", "kind", "bytes", "category", cat) + return ch.Msg, ch.Ack } diff --git a/queue/bytes/enqueue.go b/queue/bytes/enqueue.go index 8c3cf09..e552186 100644 --- a/queue/bytes/enqueue.go +++ b/queue/bytes/enqueue.go @@ -9,13 +9,13 @@ package bytes import ( + "log/slog" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" ) -/* -Enqueue creates pair of channels to send messages and dead-letter queue -*/ +// Enqueue creates pair of channels to send messages and dead-letter queue func Enqueue(q swarm.Broker, cat string) (chan<- []byte, <-chan []byte) { conf := q.Config() ch := swarm.NewMsgEnqCh[[]byte](conf.EnqueueCapacity) @@ -36,5 +36,7 @@ func Enqueue(q swarm.Broker, cat string) (chan<- []byte, <-chan []byte) { } }) + slog.Debug("Created enqueue channels: out, err", "kind", "bytes", "category", cat) + return ch.Msg, ch.Err } diff --git a/queue/bytes/queue.go b/queue/bytes/queue.go index b1a475a..8540325 100644 --- a/queue/bytes/queue.go +++ b/queue/bytes/queue.go @@ -9,6 +9,8 @@ package bytes import ( + "log/slog" + "github.com/fogfish/swarm" ) @@ -16,7 +18,6 @@ type Queue interface { Enqueue([]byte) error } -// type queue struct { cat string conf swarm.Config @@ -36,10 +37,11 @@ func (q queue) Enqueue(object []byte) error { return nil } -// func New(q swarm.Broker, category string) Queue { queue := &queue{cat: category, conf: q.Config()} queue.sock = q.Enqueue(category, queue) + slog.Debug("Created sync emitter", "kind", "bytes", "category", category) + return queue } diff --git a/queue/dequeue.go b/queue/dequeue.go index fae575d..b343cc4 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -10,6 +10,7 @@ package queue import ( "encoding/json" + "log/slog" "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" @@ -68,5 +69,7 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c return msg, nil }) + slog.Debug("Created dequeue channels: rcv, ack", "category", cat) + return ch.Msg, ch.Ack } diff --git a/queue/dlq.go b/queue/dlq.go new file mode 100644 index 0000000..a056d74 --- /dev/null +++ b/queue/dlq.go @@ -0,0 +1,26 @@ +// +// Copyright (C) 2021 - 2022 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the Apache License Version 2.0. See the LICENSE file for details. +// https://github.com/fogfish/swarm +// + +package queue + +import ( + "log/slog" + + "github.com/fogfish/swarm/internal/pipe" +) + +// Consumes dead letter messages +// +// queue.LogDeadLetters(queue.Enqueue(...)) +func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T { + pipe.ForEach[T](err, func(t T) { + slog.Error("Fail to emit", "msg", t) + }) + + return out +} diff --git a/queue/enqueue.go b/queue/enqueue.go index a1e1392..de84726 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -10,6 +10,7 @@ package queue import ( "encoding/json" + "log/slog" "reflect" "strings" @@ -57,6 +58,8 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { } }) + slog.Debug("Created enqueue channels: out, err", "kind", "typed", "category", cat) + return ch.Msg, ch.Err } diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index 2354214..bdc038d 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -10,6 +10,7 @@ package events import ( "encoding/json" + "log/slog" "github.com/fogfish/golem/optics" "github.com/fogfish/swarm" @@ -80,5 +81,7 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< return evt, nil }) + slog.Debug("Created dequeue channels: rcv, ack", "kind", "event", "category", catE) + return ch.Msg, ch.Ack } diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 9b8a7a5..a0eda75 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -10,6 +10,7 @@ package events import ( "encoding/json" + "log/slog" "reflect" "strings" "time" @@ -73,6 +74,8 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c } }) + slog.Debug("Created enqueue channels: out, err", "kind", "event", "category", catE) + return ch.Msg, ch.Err } diff --git a/queue/events/queue.go b/queue/events/queue.go index bdfa0b7..b273ddb 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -10,6 +10,7 @@ package events import ( "encoding/json" + "log/slog" "time" "github.com/fogfish/curie" @@ -74,5 +75,7 @@ func New[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) Queue[ } queue.sock = q.Enqueue(catE, queue) + slog.Debug("Created sync emitter", "kind", "event", "category", catE) + return queue } diff --git a/queue/queue.go b/queue/queue.go index 3f47d92..6b529e7 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -10,6 +10,7 @@ package queue import ( "encoding/json" + "log/slog" "github.com/fogfish/swarm" ) @@ -51,5 +52,7 @@ func New[T any](q swarm.Broker, category ...string) Queue[T] { queue := &queue[T]{cat: cat, conf: q.Config()} queue.sock = q.Enqueue(cat, queue) + slog.Debug("Created sync emitter", "kind", "typed", "category", cat) + return queue }