Skip to content

Commit

Permalink
use optics library to manipulate event struct
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Nov 13, 2023
1 parent 9123c4e commit 8314c06
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 44 deletions.
2 changes: 2 additions & 0 deletions bag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package swarm

// TODO: Digest Type combining Digest & Error

// Msg is a generic envelop type for incoming messages.
// It contains both decoded object and its digest used to acknowledge message.
type Msg[T any] struct {
Expand Down
13 changes: 8 additions & 5 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package swarm

import (
"time"

"github.com/fogfish/curie"
"github.com/fogfish/golem/pure"
)
Expand Down Expand Up @@ -41,6 +43,7 @@ type Event[T any] struct {

//
// Direct performer of the event, a software service that emits action to the stream.
// It is automatically defined by the library upon the transmission
Agent curie.IRI `json:"agent,omitempty"`

//
Expand All @@ -50,7 +53,11 @@ type Event[T any] struct {
//
// ISO8601 timestamps when action has been created
// It is automatically defined by the library upon the transmission
Created string `json:"created,omitempty"`
Created time.Time `json:"created,omitempty"`

//
// The object upon which the event is carried out.
Object T `json:"object,omitempty"`

//
// The digest of received event (used internally to ack processing)
Expand All @@ -59,10 +66,6 @@ type Event[T any] struct {
//
// The error of event handling (used internally to ack processing)
Err error `json:"-"`

//
// The object upon which the event is carried out.
Object T `json:"object,omitempty"`
}

func (Event[T]) HKT1(EventType) {}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/aws/constructs-go/constructs/v10 v10.3.0
github.com/aws/jsii-runtime-go v1.91.0
github.com/fogfish/curie v1.8.2
github.com/fogfish/golem/optics v0.11.0
github.com/fogfish/golem/pure v0.10.1
github.com/fogfish/guid v1.1.0
github.com/fogfish/it v1.0.0
Expand All @@ -36,6 +37,7 @@ require (
github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect
github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fogfish/golem/hseq v1.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/yuin/goldmark v1.5.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fogfish/curie v1.8.2 h1:+4CezyjZ5uszSXUZAV27gfKwv58w3lKTH0JbQwh3S9A=
github.com/fogfish/curie v1.8.2/go.mod h1:jPv7pg4hHd8Ug/USG29ZA2bAwlRfh/iinY90/30ATGg=
github.com/fogfish/golem/hseq v1.1.1 h1:AV8Ziu5wavpvO31NpecxWk56Un7ahGZaUfbcDKtJLV0=
github.com/fogfish/golem/hseq v1.1.1/go.mod h1:N5y7RLLJyL8iNxFOcD6mkciIBx5TJ9mT1fRszr+ByhQ=
github.com/fogfish/golem/optics v0.11.0 h1:sIEe6cV/DzpJgobTyWd/GKSXg3XHgNFCsz/huUFqCWc=
github.com/fogfish/golem/optics v0.11.0/go.mod h1:UO1L+9FWrO6yz1DiqBTEynmM0orI07sw7QBzAGWDtOE=
github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE=
github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw=
github.com/fogfish/guid v1.1.0 h1:C2dk990rZ+irgUxTISYPlxuyNs323beVo0s4TzdYjzM=
Expand Down
6 changes: 3 additions & 3 deletions internal/qtest/qtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) {
Should(it.Equal(*val.Object, Note{Some: "message"})).
Should(it.Equal(val.Type, "note:Event[*github.com/fogfish/swarm/internal/qtest.Note]")).
ShouldNot(it.Equal(len(val.ID), 0)).
ShouldNot(it.Equal(len(val.Created), 0))
ShouldNot(it.True(val.Created.IsZero()))

case <-time.After(50 * time.Millisecond):
t.Error("failed to send message")
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) {
Type: "type",
Agent: "agent",
Participant: "user",
Created: "created",
Created: time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC),
Object: &Note{Some: "message"},
}
message, _ := json.Marshal(event)
Expand All @@ -255,7 +255,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) {
Should(it.Equal(val.Participant, "user")).
Should(it.Equal(val.Type, "type")).
Should(it.Equal(val.ID, "id")).
Should(it.Equal(val.Created, "created")).
Should(it.Equal(val.Created, time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))).
Should(it.Equal(<-eff, Receipt))

q.Close()
Expand Down
15 changes: 4 additions & 11 deletions queue/events/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ package events
import (
"encoding/json"
"strings"
"unsafe"

"github.com/fogfish/golem/optics"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/pipe"
)
Expand All @@ -29,18 +29,12 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<
cat = category[0]
}

//
// building memory layout to make unsafe struct reading
kindT := swarm.Event[T]{}
offDigest, offFail := unsafe.Offsetof(kindT.Digest),
unsafe.Offsetof(kindT.Err)
shape := optics.ForShape2[E, string, error]("Digest", "Err")

sock := q.Dequeue(cat, &ch)

pipe.ForEach(ch.Ack, func(object *E) {
evt := unsafe.Pointer(object)
digest := *(*string)(unsafe.Pointer(uintptr(evt) + offDigest))
fail := *(*error)(unsafe.Pointer(uintptr(evt) + offFail))
digest, fail := shape.Get(object)

err := conf.Backoff.Retry(func() error {
return sock.Ack(swarm.Bag{
Expand Down Expand Up @@ -82,8 +76,7 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (<
}
}

ptr := unsafe.Pointer(evt)
*(*string)(unsafe.Pointer(uintptr(ptr) + offDigest)) = bag.Digest
shape.Put(evt, bag.Digest, nil)

return evt, nil
})
Expand Down
38 changes: 13 additions & 25 deletions queue/events/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ import (
"reflect"
"strings"
"time"
"unsafe"

"github.com/fogfish/curie"
"github.com/fogfish/golem/optics"
"github.com/fogfish/guid"
"github.com/fogfish/swarm"
"github.com/fogfish/swarm/internal/pipe"
)

/*
Enqueue creates pair of channels to send messages and dead-letter queue
*/
// Enqueue creates pair of channels
// - to send messages
// - failed messages (dead-letter queue)
func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) {
conf := q.Config()
ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity)
Expand All @@ -32,38 +33,25 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c
cat = category[0]
}

//
// building memory layout to make unsafe struct reading
kindT := swarm.Event[T]{}
offID, offType, offCreated :=
unsafe.Offsetof(kindT.ID),
unsafe.Offsetof(kindT.Type),
unsafe.Offsetof(kindT.Created)
shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created")

sock := q.Enqueue(cat, &ch)

pipe.ForEach(ch.Msg, func(object *E) {
ch.Busy.Lock()
defer ch.Busy.Unlock()

evt := unsafe.Pointer(object)

// patch ID
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offID))) == 0 {
id := guid.G.K(guid.Clock).String()
*(*string)(unsafe.Pointer(uintptr(evt) + offID)) = id
_, knd, src, _ := shape.Get(object)
if knd == "" {
knd = curie.IRI(cat)
}

// patch Type
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offType))) == 0 {
*(*string)(unsafe.Pointer(uintptr(evt) + offType)) = cat
if src == "" {
src = curie.IRI(q.Config().Source)
}

// patch Created
if len(*(*string)(unsafe.Pointer(uintptr(evt) + offCreated))) == 0 {
t := time.Now().Format(time.RFC3339)
*(*string)(unsafe.Pointer(uintptr(evt) + offCreated)) = t
}
// TODO: migrate to v2
shape.Put(object, guid.G.K(guid.Clock).String(), knd, src, time.Now())

msg, err := json.Marshal(object)
if err != nil {
Expand Down

0 comments on commit 8314c06

Please sign in to comment.