Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use optics library to manipulate event struct #37

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package swarm

// TODO: Digest Type combining Digest & Error

// Msg is a generic envelop type for incoming messages.
// It contains both decoded object and its digest used to acknowledge message.
type Msg[T any] struct {
Expand Down
13 changes: 8 additions & 5 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package swarm

import (
"time"

"github.com/fogfish/curie"
"github.com/fogfish/golem/pure"
)
Expand Down Expand Up @@ -41,6 +43,7 @@ type Event[T any] struct {

//
// Direct performer of the event, a software service that emits action to the stream.
// It is automatically defined by the library upon the transmission
Agent curie.IRI `json:"agent,omitempty"`

//
Expand All @@ -50,7 +53,11 @@ type Event[T any] struct {
//
// ISO8601 timestamps when action has been created
// It is automatically defined by the library upon the transmission
Created string `json:"created,omitempty"`
Created time.Time `json:"created,omitempty"`

//
// The object upon which the event is carried out.
Object T `json:"object,omitempty"`

//
// The digest of received event (used internally to ack processing)
Expand All @@ -59,10 +66,6 @@ type Event[T any] struct {
//
// The error of event handling (used internally to ack processing)
Err error `json:"-"`

//
// The object upon which the event is carried out.
Object T `json:"object,omitempty"`
}

func (Event[T]) HKT1(EventType) {}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.91.0
github.com/fogfish/curie v1.8.2
github.com/fogfish/golem/optics v0.11.0
github.com/fogfish/golem/pure v0.10.1
github.com/fogfish/guid v1.1.0
github.com/fogfish/it v1.0.0
Expand All @@ -36,6 +37,7 @@ require (
github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect
github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fogfish/golem/hseq v1.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/yuin/goldmark v1.5.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fogfish/curie v1.8.2 h1:+4CezyjZ5uszSXUZAV27gfKwv58w3lKTH0JbQwh3S9A=
github.com/fogfish/curie v1.8.2/go.mod h1:jPv7pg4hHd8Ug/USG29ZA2bAwlRfh/iinY90/30ATGg=
github.com/fogfish/golem/hseq v1.1.1 h1:AV8Ziu5wavpvO31NpecxWk56Un7ahGZaUfbcDKtJLV0=
github.com/fogfish/golem/hseq v1.1.1/go.mod h1:N5y7RLLJyL8iNxFOcD6mkciIBx5TJ9mT1fRszr+ByhQ=
github.com/fogfish/golem/optics v0.11.0 h1:sIEe6cV/DzpJgobTyWd/GKSXg3XHgNFCsz/huUFqCWc=
github.com/fogfish/golem/optics v0.11.0/go.mod h1:UO1L+9FWrO6yz1DiqBTEynmM0orI07sw7QBzAGWDtOE=
github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE=
github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw=
github.com/fogfish/guid v1.1.0 h1:C2dk990rZ+irgUxTISYPlxuyNs323beVo0s4TzdYjzM=
Expand Down
6 changes: 3 additions & 3 deletions internal/qtest/qtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) {
Should(it.Equal(*val.Object, Note{Some: "message"})).
Should(it.Equal(val.Type, "note:Event[*github.com/fogfish/swarm/internal/qtest.Note]")).
ShouldNot(it.Equal(len(val.ID), 0)).
ShouldNot(it.Equal(len(val.Created), 0))
ShouldNot(it.True(val.Created.IsZero()))

case <-time.After(50 * time.Millisecond):
t.Error("failed to send message")
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) {
Type: "type",
Agent: "agent",
Participant: "user",
Created: "created",
Created: time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC),
Object: &Note{Some: "message"},
}
message, _ := json.Marshal(event)
Expand All @@ -255,7 +255,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) {
Should(it.Equal(val.Participant, "user")).
Should(it.Equal(val.Type, "type")).
Should(it.Equal(val.ID, "id")).
Should(it.Equal(val.Created, "created")).
Should(it.Equal(val.Created, time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))).
Should(it.Equal(<-eff, Receipt))

q.Close()
Expand Down
15 changes: 4 additions & 11 deletions queue/events/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ package events
import (
"encoding/json"
"strings"
"unsafe"

"github.com/fogfish/golem/optics"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/pipe"
)
Expand All @@ -29,18 +29,12 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<
cat = category[0]
}

//
// building memory layout to make unsafe struct reading
kindT := swarm.Event[T]{}
offDigest, offFail := unsafe.Offsetof(kindT.Digest),
unsafe.Offsetof(kindT.Err)
shape := optics.ForShape2[E, string, error]("Digest", "Err")

sock := q.Dequeue(cat, &ch)

pipe.ForEach(ch.Ack, func(object *E) {
evt := unsafe.Pointer(object)
digest := *(*string)(unsafe.Pointer(uintptr(evt) + offDigest))
fail := *(*error)(unsafe.Pointer(uintptr(evt) + offFail))
digest, fail := shape.Get(object)

err := conf.Backoff.Retry(func() error {
return sock.Ack(swarm.Bag{
Expand Down Expand Up @@ -82,8 +76,7 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<
}
}

ptr := unsafe.Pointer(evt)
*(*string)(unsafe.Pointer(uintptr(ptr) + offDigest)) = bag.Digest
shape.Put(evt, bag.Digest, nil)

return evt, nil
})
Expand Down
38 changes: 13 additions & 25 deletions queue/events/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ import (
"reflect"
"strings"
"time"
"unsafe"

"github.com/fogfish/curie"
"github.com/fogfish/golem/optics"
"github.com/fogfish/guid"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/pipe"
)

/*
Enqueue creates pair of channels to send messages and dead-letter queue
*/
// Enqueue creates pair of channels
// - to send messages
// - failed messages (dead-letter queue)
func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) {
conf := q.Config()
ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity)
Expand All @@ -32,38 +33,25 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c
cat = category[0]
}

//
// building memory layout to make unsafe struct reading
kindT := swarm.Event[T]{}
offID, offType, offCreated :=
unsafe.Offsetof(kindT.ID),
unsafe.Offsetof(kindT.Type),
unsafe.Offsetof(kindT.Created)
shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created")

sock := q.Enqueue(cat, &ch)

pipe.ForEach(ch.Msg, func(object *E) {
ch.Busy.Lock()
defer ch.Busy.Unlock()

evt := unsafe.Pointer(object)

// patch ID
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offID))) == 0 {
id := guid.G.K(guid.Clock).String()
*(*string)(unsafe.Pointer(uintptr(evt) + offID)) = id
_, knd, src, _ := shape.Get(object)
if knd == "" {
knd = curie.IRI(cat)
}

// patch Type
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offType))) == 0 {
*(*string)(unsafe.Pointer(uintptr(evt) + offType)) = cat
if src == "" {
src = curie.IRI(q.Config().Source)
}

// patch Created
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offCreated))) == 0 {
t := time.Now().Format(time.RFC3339)
*(*string)(unsafe.Pointer(uintptr(evt) + offCreated)) = t
}
// TODO: migrate to v2
shape.Put(object, guid.G.K(guid.Clock).String(), knd, src, time.Now())

msg, err := json.Marshal(object)
if err != nil {
Expand Down
Loading