Skip to content

Commit

Permalink
Update sqs to kernel v0.20.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Sep 29, 2024
1 parent 043fcde commit 67dfaef
Show file tree
Hide file tree
Showing 13 changed files with 482 additions and 376 deletions.
42 changes: 42 additions & 0 deletions broker/sqs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package sqs

import (
"github.com/fogfish/swarm"
)

type Option func(*Client)

var defs = []Option{WithConfig()}

func WithConfig(opts ...swarm.Option) Option {
return func(c *Client) {
config := swarm.NewConfig()
for _, opt := range opts {
opt(&config)
}

c.batchSize = 1

c.config = config
}
}

func WithService(service SQS) Option {
return func(c *Client) {
c.service = service
}
}

func WithBatchSize(batch int) Option {
return func(c *Client) {
c.batchSize = batch
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@ import (

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/queue/bytes"
"github.com/fogfish/swarm/dequeue"
)

func main() {
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))
q, err := sqs.NewDequeuer("swarm-test",
sqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("sqs reader has failed", "err", err)
return
}

go actor("user").handle(bytes.Dequeue(q, "User"))
go actor("note").handle(bytes.Dequeue(q, "Note"))
go actor("like").handle(bytes.Dequeue(q, "Like"))
go actor("user").handle(dequeue.Bytes(q, "User"))
go actor("note").handle(dequeue.Bytes(q, "Note"))
go actor("like").handle(dequeue.Bytes(q, "Like"))

q.Await()
}
Expand Down
105 changes: 105 additions & 0 deletions broker/sqs/examples/dequeue/event/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//
// Copyright (C) 2021 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package main

import (
"encoding/json"
"fmt"
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/dequeue"
)

// Date type (object) affected by events
type User struct {
ID string `json:"id"`
Text string `json:"text"`
}

type CreatedUser User

type EventCreateUser = swarm.Event[swarm.Meta, CreatedUser]

type UpdatedUser User

type EventUpdateUser = swarm.Event[swarm.Meta, UpdatedUser]

type RemovedUser User

type EventRemoveUser = swarm.Event[swarm.Meta, RemovedUser]

type Note struct {
ID string `json:"id"`
Text string `json:"text"`
}

type EventNote = swarm.Event[swarm.Meta, Note]

func main() {
q, err := sqs.NewDequeuer("swarm-test",
sqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("sqs reader has failed", "err", err)
return
}

go create(dequeue.Event[swarm.Meta, CreatedUser](q))
go update(dequeue.Event[swarm.Meta, UpdatedUser](q))
go remove(dequeue.Event[swarm.Meta, RemovedUser](q))
go common(dequeue.Event[swarm.Meta, Note](q))

q.Await()
}

func create(rcv <-chan swarm.Msg[EventCreateUser], ack chan<- swarm.Msg[EventCreateUser]) {
for msg := range rcv {
v, _ := json.MarshalIndent(msg, "+ |", " ")
fmt.Printf("create user > \n %s\n", v)
ack <- msg
}
}

func update(rcv <-chan swarm.Msg[EventUpdateUser], ack chan<- swarm.Msg[EventUpdateUser]) {
for msg := range rcv {
v, _ := json.MarshalIndent(msg, "~ |", " ")
fmt.Printf("update user > \n %s\n", v)
ack <- msg
}
}

func remove(rcv <-chan swarm.Msg[EventRemoveUser], ack chan<- swarm.Msg[EventRemoveUser]) {
for msg := range rcv {
v, _ := json.MarshalIndent(msg, "- |", " ")
fmt.Printf("remove user > \n %s\n", v)
ack <- msg
}
}

func common(rcv <-chan swarm.Msg[EventNote], ack chan<- swarm.Msg[EventNote]) {
for msg := range rcv {
prefix := ""
switch string(msg.Object.Meta.Type) {
case "note:EventCreateNote":
prefix = "+ |"
case "note:EventUpdateNote":
prefix = "~ |"
case "note:EventRemoveNote":
prefix = "- |"
}

v, _ := json.MarshalIndent(msg, prefix, " ")
fmt.Printf("common note > \n %s\n", v)
ack <- msg
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 Dmitry Kolesnikov
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
Expand All @@ -13,7 +13,7 @@ import (

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/dequeue"
)

type User struct {
Expand All @@ -32,11 +32,19 @@ type Like struct {
}

func main() {
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))
q, err := sqs.NewDequeuer("swarm-test",
sqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("sqs reader has failed", "err", err)
return
}

go actor[User]("user").handle(queue.Dequeue[User](q))
go actor[Note]("note").handle(queue.Dequeue[Note](q))
go actor[Like]("like").handle(queue.Dequeue[Like](q))
go actor[User]("user").handle(dequeue.Typed[User](q))
go actor[Note]("note").handle(dequeue.Typed[Note](q))
go actor[Like]("like").handle(dequeue.Typed[Like](q))

q.Await()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@
package main

import (
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/queue"
"github.com/fogfish/swarm/queue/bytes"
"github.com/fogfish/swarm/enqueue"
)

func main() {
q := queue.Must(sqs.New("swarm-test", swarm.WithLogStdErr()))

user := swarm.LogDeadLetters(bytes.Enqueue(q, "User"))
note := swarm.LogDeadLetters(bytes.Enqueue(q, "Note"))
like := swarm.LogDeadLetters(bytes.Enqueue(q, "Like"))
q, err := sqs.NewEnqueuer("swarm-test",
sqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("sqs writer has failed", "err", err)
return
}

user := swarm.LogDeadLetters(enqueue.Bytes(q, "User"))
note := swarm.LogDeadLetters(enqueue.Bytes(q, "Note"))
like := swarm.LogDeadLetters(enqueue.Bytes(q, "Like"))

user <- []byte("user|some text by user")

Expand Down
116 changes: 116 additions & 0 deletions broker/sqs/examples/enqueue/event/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//
// Copyright (C) 2021 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package main

import (
"log/slog"

"github.com/fogfish/swarm"
"github.com/fogfish/swarm/broker/sqs"
"github.com/fogfish/swarm/enqueue"
)

// Date type (object) affected by events
type User struct {
ID string `json:"id"`
Text string `json:"text"`
}

type CreatedUser User

type EventCreateUser = swarm.Event[swarm.Meta, CreatedUser]

type UpdatedUser User

type EventUpdateUser = swarm.Event[swarm.Meta, UpdatedUser]

type RemovedUser User

type EventRemoveUser = swarm.Event[swarm.Meta, RemovedUser]

type Note struct {
ID string `json:"id"`
Text string `json:"text"`
}

type EventNote = swarm.Event[swarm.Meta, Note]

func main() {
q, err := sqs.NewEnqueuer("swarm-test",
sqs.WithConfig(
swarm.WithLogStdErr(),
),
)
if err != nil {
slog.Error("sqs writer has failed", "err", err)
return
}

userCreated := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, CreatedUser](q))
userUpdated := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, UpdatedUser](q))
userRemoved := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, RemovedUser](q))
note := swarm.LogDeadLetters(enqueue.Event[swarm.Meta, Note](q))

//
// Multiple channels emits events
userCreated <- EventCreateUser{
Meta: &swarm.Meta{
Agent: "example",
Participant: "user",
},
Data: &CreatedUser{ID: "user", Text: "some text"},
}

userUpdated <- EventUpdateUser{
Meta: &swarm.Meta{
Agent: "example",
Participant: "user",
},
Data: &UpdatedUser{ID: "user", Text: "some text with changes"},
}

userRemoved <- swarm.Event[swarm.Meta, RemovedUser]{
Meta: &swarm.Meta{
Agent: "example",
Participant: "user",
},
Data: &RemovedUser{ID: "user"},
}

//
// Single channel emits event
note <- EventNote{
Meta: &swarm.Meta{
Type: "note:EventCreateNote",
Agent: "example",
Participant: "user",
},
Data: &Note{ID: "note", Text: "some text"},
}

note <- EventNote{
Meta: &swarm.Meta{
Type: "note:EventUpdateNote",
Agent: "example",
Participant: "user",
},
Data: &Note{ID: "note", Text: "some text with changes"},
}

note <- EventNote{
Meta: &swarm.Meta{
Type: "note:EventRemoveNote",
Agent: "example",
Participant: "user",
},
Data: &Note{ID: "note"},
}

q.Close()
}
Loading

0 comments on commit 67dfaef

Please sign in to comment.