Skip to content

Commit

Permalink
enhance logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Nov 17, 2023
1 parent a94a6b2 commit 4c9e8fd
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 13 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/check-code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ jobs:

- uses: actions/setup-go@v2
with:
go-version: 1.21
go-version: "1.21"

- uses: actions/checkout@v3

- uses: dominikh/staticcheck-action@v1.2.0
- uses: dominikh/staticcheck-action@v1.3.0
with:
version: "2022.1"
install-go: false
2 changes: 1 addition & 1 deletion .github/workflows/check-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:

- uses: actions/setup-go@v2
with:
go-version: 1.21
go-version: "1.21"

- uses: actions/checkout@v2

Expand Down
2 changes: 1 addition & 1 deletion broker/sqs/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {

ctx, can := context.WithCancel(context.Background())

slog.Info("created broker", "type", "sqs")
slog.Info("created broker", "broker", "sqs")
return &broker{
config: conf,
client: cli,
Expand Down
13 changes: 13 additions & 0 deletions examples/bytes/enqueue/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@
package main

import (
"log/slog"
"os"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/queue/bytes"
)

func main() {
slog.SetDefault(
slog.New(
slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{
Level: slog.LevelDebug,
},
),
),
)

q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

user := queue.LogDeadLetters(bytes.Enqueue(q, "User"))
Expand Down
9 changes: 6 additions & 3 deletions queue/bytes/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/fogfish/swarm/internal/pipe"
)

/*
Dequeue ...
*/
// Dequeue bytes
func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swarm.Msg[[]byte]) {
conf := q.Config()
ch := swarm.NewMsgDeqCh[[]byte](conf.DequeueCapacity)
Expand All @@ -34,7 +32,10 @@ func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swa
})
if err != nil && conf.StdErr != nil {
conf.StdErr <- err
return
}

slog.Debug("Broker ack'ed object", "kind", "bytes", "category", cat, "object", object)
})

pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[[]byte], error) {
Expand All @@ -55,6 +56,8 @@ func Dequeue(q swarm.Broker, cat string) (<-chan *swarm.Msg[[]byte], chan<- *swa
Digest: bag.Digest,
}

slog.Debug("Broker received object", "kind", "bytes", "category", cat, "object", bag.Object)

return msg, nil
})

Expand Down
2 changes: 2 additions & 0 deletions queue/bytes/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func Enqueue(q swarm.Broker, cat string) (chan<- []byte, <-chan []byte) {
conf.StdErr <- err
}
}

slog.Debug("Enqueued", "kind", "bytes", "category", bag.Category, "object", object)
})

slog.Debug("Created enqueue channels: out, err", "kind", "bytes", "category", cat)
Expand Down
1 change: 1 addition & 0 deletions queue/bytes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (q queue) Enqueue(object []byte) error {
return err
}

slog.Debug("Enqueued bytes", "category", bag.Category, "object", object)
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c
})
if err != nil && conf.StdErr != nil {
conf.StdErr <- err
return
}

slog.Debug("Broker ack'ed object", "kind", "typed", "category", cat, "object", object.Object)
})

pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*swarm.Msg[T], error) {
Expand All @@ -66,10 +69,12 @@ func Dequeue[T any](q swarm.Broker, category ...string) (<-chan *swarm.Msg[T], c
return nil, err
}

slog.Debug("Broker received object", "kind", "typed", "category", cat, "object", msg.Object)

return msg, nil
})

slog.Debug("Created dequeue channels: rcv, ack", "category", cat)
slog.Debug("Created dequeue channels: rcv, ack", "kind", "typed", "category", cat)

return ch.Msg, ch.Ack
}
2 changes: 1 addition & 1 deletion queue/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// queue.LogDeadLetters(queue.Enqueue(...))
func LogDeadLetters[T any](out chan<- T, err <-chan T) chan<- T {
pipe.ForEach[T](err, func(t T) {
slog.Error("Fail to emit", "msg", t)
slog.Error("Fail to emit", "object", t)
})

return out
Expand Down
2 changes: 2 additions & 0 deletions queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func Enqueue[T any](q swarm.Broker, category ...string) (chan<- T, <-chan T) {
conf.StdErr <- err
}
}

slog.Debug("Enqueued message", "kind", "typed", "category", bag.Category, "object", object)
})

slog.Debug("Created enqueue channels: out, err", "kind", "typed", "category", cat)
Expand Down
9 changes: 6 additions & 3 deletions queue/events/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/fogfish/swarm/internal/pipe"
)

/*
Dequeue ...
*/
// Dequeue event
func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<-chan *E, chan<- *E) {
conf := q.Config()
ch := swarm.NewEvtDeqCh[T, E](conf.DequeueCapacity)
Expand All @@ -45,7 +43,10 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<
})
if err != nil && conf.StdErr != nil {
conf.StdErr <- err
return
}

slog.Debug("Broker ack'ed object", "kind", "event", "category", catE, "object", object)
})

pipe.Emit(ch.Msg, q.Config().PollFrequency, func() (*E, error) {
Expand Down Expand Up @@ -78,6 +79,8 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<

shape.Put(evt, bag.Digest, nil)

slog.Debug("Broker received object", "kind", "event", "category", catE, "object", evt)

return evt, nil
})

Expand Down
2 changes: 2 additions & 0 deletions queue/events/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c
conf.StdErr <- err
}
}

slog.Debug("Enqueued event", "kind", "event", "category", bag.Category, "object", object)
})

slog.Debug("Created enqueue channels: out, err", "kind", "event", "category", catE)
Expand Down
1 change: 1 addition & 0 deletions queue/events/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (q queue[T, E]) Enqueue(object *E) error {
return err
}

slog.Debug("Enqueued event", "category", bag.Category, "object", object)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (q queue[T]) Enqueue(object T) error {
return err
}

slog.Debug("Enqueued message", "category", bag.Category, "object", object)
return nil
}

Expand Down

0 comments on commit 4c9e8fd

Please sign in to comment.