From 4c9e8fd6d2692d85a8a2256af2ba6076c808859a Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Fri, 17 Nov 2023 18:42:11 +0200 Subject: [PATCH] enhance logging --- .github/workflows/check-code.yml | 5 ++--- .github/workflows/check-test.yml | 2 +- broker/sqs/broker.go | 2 +- examples/bytes/enqueue/bytes.go | 13 +++++++++++++ queue/bytes/dequeue.go | 9 ++++++--- queue/bytes/enqueue.go | 2 ++ queue/bytes/queue.go | 1 + queue/dequeue.go | 7 ++++++- queue/dlq.go | 2 +- queue/enqueue.go | 2 ++ queue/events/dequeue.go | 9 ++++++--- queue/events/enqueue.go | 2 ++ queue/events/queue.go | 1 + queue/queue.go | 1 + 14 files changed, 45 insertions(+), 13 deletions(-) diff --git a/.github/workflows/check-code.yml b/.github/workflows/check-code.yml index f491bc2..292d850 100644 --- a/.github/workflows/check-code.yml +++ b/.github/workflows/check-code.yml @@ -16,11 +16,10 @@ jobs: - uses: actions/setup-go@v2 with: - go-version: 1.21 + go-version: "1.21" - uses: actions/checkout@v3 - - uses: dominikh/staticcheck-action@v1.2.0 + - uses: dominikh/staticcheck-action@v1.3.0 with: - version: "2022.1" install-go: false diff --git a/.github/workflows/check-test.yml b/.github/workflows/check-test.yml index 58e13fa..5d41c86 100644 --- a/.github/workflows/check-test.yml +++ b/.github/workflows/check-test.yml @@ -21,7 +21,7 @@ jobs: - uses: actions/setup-go@v2 with: - go-version: 1.21 + go-version: "1.21" - uses: actions/checkout@v2 diff --git a/broker/sqs/broker.go b/broker/sqs/broker.go index 6acdfbf..cf64545 100644 --- a/broker/sqs/broker.go +++ b/broker/sqs/broker.go @@ -48,7 +48,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) { ctx, can := context.WithCancel(context.Background()) - slog.Info("created broker", "type", "sqs") + slog.Info("created broker", "broker", "sqs") return &broker{ config: conf, client: cli, diff --git a/examples/bytes/enqueue/bytes.go b/examples/bytes/enqueue/bytes.go index dabba47..a67b0b2 100644 --- a/examples/bytes/enqueue/bytes.go +++ b/examples/bytes/enqueue/bytes.go @@ -9,6 +9,9 @@ package main import ( + "log/slog" + "os" + "github.com/fogfish/swarm" "github.com/fogfish/swarm/broker/sqs" "github.com/fogfish/swarm/queue" @@ -16,6 +19,16 @@ import ( ) func main() { + slog.SetDefault( + slog.New( + slog.NewTextHandler(os.Stdout, + &slog.HandlerOptions{ + Level: slog.LevelDebug, + }, + ), + ), + ) + q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr())) user := queue.LogDeadLetters(bytes.Enqueue(q, "User")) diff --git a/queue/bytes/dequeue.go b/queue/bytes/dequeue.go index ef8281c..c8f428f 100644 --- a/queue/bytes/dequeue.go +++ b/queue/bytes/dequeue.go @@ -15,9 +15,7 @@ import ( "github.com/fogfish/swarm/internal/pipe" ) -/* -Dequeue ... -*/ +// Dequeue bytes func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swarm.Msg[[]byte]) { conf := q.Config() ch := swarm.NewMsgDeqCh[[]byte](conf.DequeueCapacity) @@ -34,7 +32,10 @@ func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swa }) if err != nil && conf.StdErr != nil { conf.StdErr <- err + return } + + slog.Debug("Broker ack'ed object", "kind", "bytes", "category", cat, "object", object) }) pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[[]byte], error) { @@ -55,6 +56,8 @@ func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swa Digest: bag.Digest, } + slog.Debug("Broker received object", "kind", "bytes", "category", cat, "object", bag.Object) + return msg, nil }) diff --git a/queue/bytes/enqueue.go b/queue/bytes/enqueue.go index e552186..cb4b39f 100644 --- a/queue/bytes/enqueue.go +++ b/queue/bytes/enqueue.go @@ -34,6 +34,8 @@ func Enqueue(q swarm.Broker, cat string) (chan<- []byte, <-chan []byte) { conf.StdErr <- err } } + + slog.Debug("Enqueued", "kind", "bytes", "category", bag.Category, "object", object) }) slog.Debug("Created enqueue channels: out, err", "kind", "bytes", "category", cat) diff --git a/queue/bytes/queue.go b/queue/bytes/queue.go index 8540325..0ceabe6 100644 --- a/queue/bytes/queue.go +++ b/queue/bytes/queue.go @@ -34,6 +34,7 @@ func (q queue) Enqueue(object []byte) error { return err } + slog.Debug("Enqueued bytes", "category", bag.Category, "object", object) return nil } diff --git a/queue/dequeue.go b/queue/dequeue.go index b343cc4..d8915f3 100644 --- a/queue/dequeue.go +++ b/queue/dequeue.go @@ -42,7 +42,10 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c }) if err != nil && conf.StdErr != nil { conf.StdErr <- err + return } + + slog.Debug("Broker ack'ed object", "kind", "typed", "category", cat, "object", object.Object) }) pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[T], error) { @@ -66,10 +69,12 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c return nil, err } + slog.Debug("Broker received object", "kind", "typed", "category", cat, "object", msg.Object) + return msg, nil }) - slog.Debug("Created dequeue channels: rcv, ack", "category", cat) + slog.Debug("Created dequeue channels: rcv, ack", "kind", "typed", "category", cat) return ch.Msg, ch.Ack } diff --git a/queue/dlq.go b/queue/dlq.go index a056d74..a110842 100644 --- a/queue/dlq.go +++ b/queue/dlq.go @@ -19,7 +19,7 @@ import ( // queue.LogDeadLetters(queue.Enqueue(...)) func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T { pipe.ForEach[T](err, func(t T) { - slog.Error("Fail to emit", "msg", t) + slog.Error("Fail to emit", "object", t) }) return out diff --git a/queue/enqueue.go b/queue/enqueue.go index de84726..74c1ad3 100644 --- a/queue/enqueue.go +++ b/queue/enqueue.go @@ -56,6 +56,8 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) { conf.StdErr <- err } } + + slog.Debug("Enqueued message", "kind", "typed", "category", bag.Category, "object", object) }) slog.Debug("Created enqueue channels: out, err", "kind", "typed", "category", cat) diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index bdc038d..4019c96 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -17,9 +17,7 @@ import ( "github.com/fogfish/swarm/internal/pipe" ) -/* -Dequeue ... -*/ +// Dequeue event func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan *E, chan<- *E) { conf := q.Config() ch := swarm.NewEvtDeqCh[T, E](conf.DequeueCapacity) @@ -45,7 +43,10 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< }) if err != nil && conf.StdErr != nil { conf.StdErr <- err + return } + + slog.Debug("Broker ack'ed object", "kind", "event", "category", catE, "object", object) }) pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*E, error) { @@ -78,6 +79,8 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< shape.Put(evt, bag.Digest, nil) + slog.Debug("Broker received object", "kind", "event", "category", catE, "object", evt) + return evt, nil }) diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index a0eda75..088b53c 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -72,6 +72,8 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c conf.StdErr <- err } } + + slog.Debug("Enqueued event", "kind", "event", "category", bag.Category, "object", object) }) slog.Debug("Created enqueue channels: out, err", "kind", "event", "category", catE) diff --git a/queue/events/queue.go b/queue/events/queue.go index b273ddb..c7684af 100644 --- a/queue/events/queue.go +++ b/queue/events/queue.go @@ -57,6 +57,7 @@ func (q queue[T, E]) Enqueue(object *E) error { return err } + slog.Debug("Enqueued event", "category", bag.Category, "object", object) return nil } diff --git a/queue/queue.go b/queue/queue.go index 6b529e7..279753e 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -40,6 +40,7 @@ func (q queue[T]) Enqueue(object T) error { return err } + slog.Debug("Enqueued message", "category", bag.Category, "object", object) return nil }