Skip to content

Commit

Permalink
Use slog for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Nov 16, 2023
1 parent df2652d commit bb1c0c3
Show file tree
Hide file tree
Showing 33 changed files with 179 additions and 49 deletions.
2 changes: 2 additions & 0 deletions broker/eventbridge/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package eventbridge

import (
"context"
"log/slog"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
Expand Down Expand Up @@ -46,6 +47,7 @@ func New(bus string, opts ...swarm.Option) (swarm.Broker, error) {

ctx, can := context.WithCancel(context.Background())

slog.Info("created broker", "type", "eventbridge")
return &broker{
config: conf,
client: cli,
Expand Down
4 changes: 2 additions & 2 deletions broker/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newService(conf *swarm.Config) (EventBridge, error) {

aws, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, err
return nil, swarm.ErrServiceIO.New(err)
}

return eventbridge.NewFromConfig(aws), nil
Expand All @@ -72,7 +72,7 @@ func (cli *client) Enq(bag swarm.Bag) error {
},
)
if err != nil {
return err
return swarm.ErrEnqueue.New(err)
}

if ret.FailedEntryCount > 0 {
Expand Down
2 changes: 2 additions & 0 deletions broker/events3/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package events3

import (
"context"
"log/slog"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
Expand All @@ -27,6 +28,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {

ctx, can := context.WithCancel(context.Background())

slog.Info("created broker", "type", "event-s3")
return &broker{
config: conf,
channels: swarm.NewChannels(),
Expand Down
3 changes: 3 additions & 0 deletions broker/eventsqs/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package eventsqs

import (
"log/slog"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/fogfish/swarm"
Expand All @@ -28,6 +30,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {
return nil, err
}

slog.Info("created broker", "type", "event-sqs")
return &broker{
Broker: bro,
config: conf,
Expand Down
2 changes: 2 additions & 0 deletions broker/sqs/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package sqs

import (
"context"
"log/slog"

"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/fogfish/swarm"
Expand Down Expand Up @@ -47,6 +48,7 @@ func New(queue string, opts ...swarm.Option) (swarm.Broker, error) {

ctx, can := context.WithCancel(context.Background())

slog.Info("created broker", "type", "sqs")
return &broker{
config: conf,
client: cli,
Expand Down
18 changes: 13 additions & 5 deletions broker/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newClient(queue string, config *swarm.Config) (*client, error) {
},
)
if err != nil {
return nil, err
return nil, swarm.ErrServiceIO.New(err)
}

return &client{
Expand All @@ -62,7 +62,7 @@ func newService(conf *swarm.Config) (SQS, error) {

aws, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, err
return nil, swarm.ErrServiceIO.New(err)
}

return sqs.NewFromConfig(aws), nil
Expand All @@ -89,7 +89,11 @@ func (cli *client) Enq(bag swarm.Bag) error {
QueueUrl: cli.queue,
},
)
return err
if err != nil {
return swarm.ErrEnqueue.New(err)
}

return nil
}

// Ack acknowledges message to broker
Expand All @@ -103,7 +107,11 @@ func (cli *client) Ack(bag swarm.Bag) error {
ReceiptHandle: aws.String(string(bag.Digest)),
},
)
return err
if err != nil {
return swarm.ErrServiceIO.New(err)
}

return nil
}

// Deq dequeues message from broker
Expand All @@ -120,7 +128,7 @@ func (cli client) Deq(cat string) (swarm.Bag, error) {
},
)
if err != nil {
return swarm.Bag{}, err
return swarm.Bag{}, swarm.ErrDequeue.New(err)
}

if len(result.Messages) == 0 {
Expand Down
17 changes: 16 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package swarm

import (
"log/slog"

Check failure on line 12 in config.go

View workflow job for this annotation

GitHub Actions / unit

package log/slog is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/log/slog)

Check failure on line 12 in config.go

View workflow job for this annotation

GitHub Actions / code

package log/slog is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/log/slog) (compile)
"time"

"github.com/fogfish/swarm/internal/backoff"
"github.com/fogfish/swarm/internal/pipe"
)

// Grade of Service Policy
Expand Down Expand Up @@ -127,13 +129,26 @@ func WithRetry(backoff Retry) Option {
}
}

// Configure Channel for global errors
// Configure broker to route global errors to channel
func WithStdErr(stderr chan<- error) Option {
return func(conf *Config) {
conf.StdErr = stderr
}
}

// Configure broker to log standard errors
func WithLogStdErr() Option {
err := make(chan error)

pipe.ForEach(err, func(err error) {
slog.Error("Broker fialed", "error", err)
})

return func(conf *Config) {
conf.StdErr = err
}
}

// Frequency to poll broker api
func WithPollFrequency(t time.Duration) Option {
return func(conf *Config) {
Expand Down
9 changes: 9 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package swarm

import "github.com/fogfish/faults"

const (
ErrServiceIO = faults.Type("service i/o failed")
ErrEnqueue = faults.Type("enqueue is failed")
ErrDequeue = faults.Type("dequeue is failed")
)
6 changes: 6 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ type Event[T any] struct {

func (Event[T]) HKT1(EventType) {}
func (Event[T]) HKT2(T) {}

// Fail Event with error
func (evt *Event[T]) Fail(err error) *Event[T] {
evt.Err = err
return evt
}
7 changes: 3 additions & 4 deletions examples/bytes/dequeue/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package main

import (
"fmt"
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
Expand All @@ -18,7 +18,7 @@ import (
)

func main() {
q := queue.Must(sqs.New("swarm-test"))
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

go actor("user").handle(bytes.Dequeue(q, "User"))
go actor("note").handle(bytes.Dequeue(q, "Note"))
Expand All @@ -27,12 +27,11 @@ func main() {
q.Await()
}

//
type actor string

func (a actor) handle(rcv <-chan *swarm.Msg[[]byte], ack chan<- *swarm.Msg[[]byte]) {
for msg := range rcv {
fmt.Printf("event on %s > %s\n", a, msg.Object)
slog.Info("Event", "type", a, "msg", msg.Object)
ack <- msg
}
}
9 changes: 5 additions & 4 deletions examples/bytes/enqueue/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
package main

import (
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/queue/bytes"
)

func main() {
q := queue.Must(sqs.New("swarm-test"))
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

user, _ := bytes.Enqueue(q, "User")
note, _ := bytes.Enqueue(q, "Note")
like, _ := bytes.Enqueue(q, "Like")
user := queue.LogDeadLetters(bytes.Enqueue(q, "User"))
note := queue.LogDeadLetters(bytes.Enqueue(q, "Note"))
like := queue.LogDeadLetters(bytes.Enqueue(q, "Like"))

user <- []byte("user|some text by user")

Expand Down
6 changes: 3 additions & 3 deletions examples/eventbridge/dequeue/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package main

import (
"fmt"
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventbridge"
Expand All @@ -32,7 +32,7 @@ type Like struct {
}

func main() {
q := queue.Must(eventbridge.New("swarm-example-eventbridge"))
q := queue.Must(eventbridge.New("swarm-example-eventbridge", swarm.WithLogStdErr()))

go actor[User]("user").handle(queue.Dequeue[User](q))
go actor[Note]("note").handle(queue.Dequeue[Note](q))
Expand All @@ -45,7 +45,7 @@ type actor[T any] string

func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) {
for msg := range rcv {
fmt.Printf("event on %s > %+v\n", a, msg.Object)
slog.Info("Event", "type", a, "msg", msg.Object)
ack <- msg
}
}
7 changes: 4 additions & 3 deletions examples/eventbridge/enqueue/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type Like struct {
func main() {
q := queue.Must(eventbridge.New("swarm-example-eventbridge-latest",
swarm.WithSource("swarm-example-eventbridge"),
swarm.WithLogStdErr(),
))

user, _ := queue.Enqueue[*User](q)
note, _ := queue.Enqueue[*Note](q)
like, _ := queue.Enqueue[*Like](q)
user := queue.LogDeadLetters(queue.Enqueue[*User](q))
note := queue.LogDeadLetters(queue.Enqueue[*Note](q))
like := queue.LogDeadLetters(queue.Enqueue[*Like](q))

user <- &User{ID: "user", Text: "some text"}
note <- &Note{ID: "note", Text: "some text"}
Expand Down
2 changes: 1 addition & 1 deletion examples/events/dequeue/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (EventNote) HKT1(swarm.EventType) {}
func (EventNote) HKT2(*Note) {}

func main() {
q := queue.Must(sqs.New("swarm-test"))
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

go create(events.Dequeue[*User, EventCreateUser](q))
go update(events.Dequeue[*User, EventUpdateUser](q))
Expand Down
10 changes: 5 additions & 5 deletions examples/events/enqueue/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (EventNote) HKT1(swarm.EventType) {}
func (EventNote) HKT2(*Note) {}

func main() {
q := queue.Must(sqs.New("swarm-test"))
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

userCreated, _ := events.Enqueue[*User, EventCreateUser](q)
userUpdated, _ := events.Enqueue[*User, EventUpdateUser](q)
userRemoved, _ := events.Enqueue[*User, EventRemoveUser](q)
note, _ := events.Enqueue[*Note, EventNote](q)
userCreated := queue.LogDeadLetters(events.Enqueue[*User, EventCreateUser](q))
userUpdated := queue.LogDeadLetters(events.Enqueue[*User, EventUpdateUser](q))
userRemoved := queue.LogDeadLetters(events.Enqueue[*User, EventRemoveUser](q))
note := queue.LogDeadLetters(events.Enqueue[*Note, EventNote](q))

//
// Multiple channels emits events
Expand Down
3 changes: 2 additions & 1 deletion examples/events3/dequeue/events3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"encoding/json"
"fmt"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/events3"
"github.com/fogfish/swarm/queue"
)

func main() {
q := queue.Must(events3.New("swarm-test"))
q := queue.Must(events3.New("swarm-test", swarm.WithLogStdErr()))

go common(events3.Dequeue(q))

Expand Down
7 changes: 3 additions & 4 deletions examples/eventsqs/dequeue/eventsqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package main

import (
"fmt"
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventsqs"
Expand All @@ -32,7 +32,7 @@ type Like struct {
}

func main() {
q := queue.Must(eventsqs.New("swarm-example-sqs-latest"))
q := queue.Must(eventsqs.New("swarm-example-sqs-latest", swarm.WithLogStdErr()))

go actor[User]("user").handle(queue.Dequeue[User](q))
go actor[Note]("note").handle(queue.Dequeue[Note](q))
Expand All @@ -41,12 +41,11 @@ func main() {
q.Await()
}

//
type actor[T any] string

func (a actor[T]) handle(rcv <-chan *swarm.Msg[T], ack chan<- *swarm.Msg[T]) {
for msg := range rcv {
fmt.Printf("event on %s > %+v", a, msg.Object)
slog.Info("Event", "type", a, "msg", msg.Object)
ack <- msg
}
}
3 changes: 2 additions & 1 deletion examples/eventsqs/enqueue/eventsqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package main

import (
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/eventsqs"
"github.com/fogfish/swarm/queue"
)
Expand All @@ -29,7 +30,7 @@ type Like struct {
}

func main() {
q := queue.Must(eventsqs.New("swarm-example-sqs-latest"))
q := queue.Must(eventsqs.New("swarm-example-sqs-latest", swarm.WithLogStdErr()))

user, _ := queue.Enqueue[*User](q)
note, _ := queue.Enqueue[*Note](q)
Expand Down
Loading

0 comments on commit bb1c0c3

Please sign in to comment.