From 8314c0656e7d326ee2b4ee1d7616a579c83d5963 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Mon, 13 Nov 2023 21:54:34 +0200 Subject: [PATCH] use optics library to manipulate event struct --- bag.go | 2 ++ event.go | 13 ++++++++----- go.mod | 2 ++ go.sum | 4 ++++ internal/qtest/qtest.go | 6 +++--- queue/events/dequeue.go | 15 ++++----------- queue/events/enqueue.go | 38 +++++++++++++------------------------- 7 files changed, 36 insertions(+), 44 deletions(-) diff --git a/bag.go b/bag.go index 9f7c46b..c334700 100644 --- a/bag.go +++ b/bag.go @@ -8,6 +8,8 @@ package swarm +// TODO: Digest Type combining Digest & Error + // Msg is a generic envelop type for incoming messages. // It contains both decoded object and its digest used to acknowledge message. type Msg[T any] struct { diff --git a/event.go b/event.go index 33da939..0007073 100644 --- a/event.go +++ b/event.go @@ -9,6 +9,8 @@ package swarm import ( + "time" + "github.com/fogfish/curie" "github.com/fogfish/golem/pure" ) @@ -41,6 +43,7 @@ type Event[T any] struct { // // Direct performer of the event, a software service that emits action to the stream. + // It is automatically defined by the library upon the transmission Agent curie.IRI `json:"agent,omitempty"` // @@ -50,7 +53,11 @@ type Event[T any] struct { // // ISO8601 timestamps when action has been created // It is automatically defined by the library upon the transmission - Created string `json:"created,omitempty"` + Created time.Time `json:"created,omitempty"` + + // + // The object upon which the event is carried out. + Object T `json:"object,omitempty"` // // The digest of received event (used internally to ack processing) @@ -59,10 +66,6 @@ type Event[T any] struct { // // The error of event handling (used internally to ack processing) Err error `json:"-"` - - // - // The object upon which the event is carried out. - Object T `json:"object,omitempty"` } func (Event[T]) HKT1(EventType) {} diff --git a/go.mod b/go.mod index 3ddb598..db20b74 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/aws/constructs-go/constructs/v10 v10.3.0 github.com/aws/jsii-runtime-go v1.91.0 github.com/fogfish/curie v1.8.2 + github.com/fogfish/golem/optics v0.11.0 github.com/fogfish/golem/pure v0.10.1 github.com/fogfish/guid v1.1.0 github.com/fogfish/it v1.0.0 @@ -36,6 +37,7 @@ require ( github.com/cdklabs/awscdk-asset-kubectl-go/kubectlv20/v2 v2.1.2 // indirect github.com/cdklabs/awscdk-asset-node-proxy-agent-go/nodeproxyagentv6/v2 v2.0.1 // indirect github.com/fatih/color v1.15.0 // indirect + github.com/fogfish/golem/hseq v1.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/yuin/goldmark v1.5.3 // indirect diff --git a/go.sum b/go.sum index 808a12f..8c8ee18 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,10 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fogfish/curie v1.8.2 h1:+4CezyjZ5uszSXUZAV27gfKwv58w3lKTH0JbQwh3S9A= github.com/fogfish/curie v1.8.2/go.mod h1:jPv7pg4hHd8Ug/USG29ZA2bAwlRfh/iinY90/30ATGg= +github.com/fogfish/golem/hseq v1.1.1 h1:AV8Ziu5wavpvO31NpecxWk56Un7ahGZaUfbcDKtJLV0= +github.com/fogfish/golem/hseq v1.1.1/go.mod h1:N5y7RLLJyL8iNxFOcD6mkciIBx5TJ9mT1fRszr+ByhQ= +github.com/fogfish/golem/optics v0.11.0 h1:sIEe6cV/DzpJgobTyWd/GKSXg3XHgNFCsz/huUFqCWc= +github.com/fogfish/golem/optics v0.11.0/go.mod h1:UO1L+9FWrO6yz1DiqBTEynmM0orI07sw7QBzAGWDtOE= github.com/fogfish/golem/pure v0.10.1 h1:0+cnvdaV9zF+0NN8SZMgR5bgFM6yNfBHU4rynYSDfmE= github.com/fogfish/golem/pure v0.10.1/go.mod h1:kLPfgu5uKP0CrwVap7jejisRwV7vo1q8Eyqnc/Z0qyw= github.com/fogfish/guid v1.1.0 h1:C2dk990rZ+irgUxTISYPlxuyNs323beVo0s4TzdYjzM= diff --git a/internal/qtest/qtest.go b/internal/qtest/qtest.go index 1379961..b66631f 100644 --- a/internal/qtest/qtest.go +++ b/internal/qtest/qtest.go @@ -142,7 +142,7 @@ func TestEnqueueEvent(t *testing.T, factory enqueue) { Should(it.Equal(*val.Object, Note{Some: "message"})). Should(it.Equal(val.Type, "note:Event[*github.com/fogfish/swarm/internal/qtest.Note]")). ShouldNot(it.Equal(len(val.ID), 0)). - ShouldNot(it.Equal(len(val.Created), 0)) + ShouldNot(it.True(val.Created.IsZero())) case <-time.After(50 * time.Millisecond): t.Error("failed to send message") @@ -236,7 +236,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) { Type: "type", Agent: "agent", Participant: "user", - Created: "created", + Created: time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC), Object: &Note{Some: "message"}, } message, _ := json.Marshal(event) @@ -255,7 +255,7 @@ func TestDequeueEvent(t *testing.T, factory dequeue) { Should(it.Equal(val.Participant, "user")). Should(it.Equal(val.Type, "type")). Should(it.Equal(val.ID, "id")). - Should(it.Equal(val.Created, "created")). + Should(it.Equal(val.Created, time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))). Should(it.Equal(<-eff, Receipt)) q.Close() diff --git a/queue/events/dequeue.go b/queue/events/dequeue.go index 01ed00d..20ddf52 100644 --- a/queue/events/dequeue.go +++ b/queue/events/dequeue.go @@ -11,8 +11,8 @@ package events import ( "encoding/json" "strings" - "unsafe" + "github.com/fogfish/golem/optics" "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" ) @@ -29,18 +29,12 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< cat = category[0] } - // - // building memory layout to make unsafe struct reading - kindT := swarm.Event[T]{} - offDigest, offFail := unsafe.Offsetof(kindT.Digest), - unsafe.Offsetof(kindT.Err) + shape := optics.ForShape2[E, string, error]("Digest", "Err") sock := q.Dequeue(cat, &ch) pipe.ForEach(ch.Ack, func(object *E) { - evt := unsafe.Pointer(object) - digest := *(*string)(unsafe.Pointer(uintptr(evt) + offDigest)) - fail := *(*error)(unsafe.Pointer(uintptr(evt) + offFail)) + digest, fail := shape.Get(object) err := conf.Backoff.Retry(func() error { return sock.Ack(swarm.Bag{ @@ -82,8 +76,7 @@ func Dequeue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (< } } - ptr := unsafe.Pointer(evt) - *(*string)(unsafe.Pointer(uintptr(ptr) + offDigest)) = bag.Digest + shape.Put(evt, bag.Digest, nil) return evt, nil }) diff --git a/queue/events/enqueue.go b/queue/events/enqueue.go index 7ce3ede..4cba6df 100644 --- a/queue/events/enqueue.go +++ b/queue/events/enqueue.go @@ -13,16 +13,17 @@ import ( "reflect" "strings" "time" - "unsafe" + "github.com/fogfish/curie" + "github.com/fogfish/golem/optics" "github.com/fogfish/guid" "github.com/fogfish/swarm" "github.com/fogfish/swarm/internal/pipe" ) -/* -Enqueue creates pair of channels to send messages and dead-letter queue -*/ +// Enqueue creates pair of channels +// - to send messages +// - failed messages (dead-letter queue) func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (chan<- *E, <-chan *E) { conf := q.Config() ch := swarm.NewEvtEnqCh[T, E](conf.EnqueueCapacity) @@ -32,13 +33,7 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c cat = category[0] } - // - // building memory layout to make unsafe struct reading - kindT := swarm.Event[T]{} - offID, offType, offCreated := - unsafe.Offsetof(kindT.ID), - unsafe.Offsetof(kindT.Type), - unsafe.Offsetof(kindT.Created) + shape := optics.ForShape4[E, string, curie.IRI, curie.IRI, time.Time]("ID", "Type", "Agent", "Created") sock := q.Enqueue(cat, &ch) @@ -46,24 +41,17 @@ func Enqueue[T any, E swarm.EventKind[T]](q swarm.Broker, category ...string) (c ch.Busy.Lock() defer ch.Busy.Unlock() - evt := unsafe.Pointer(object) - - // patch ID - if len(*(*string)(unsafe.Pointer(uintptr(evt) + offID))) == 0 { - id := guid.G.K(guid.Clock).String() - *(*string)(unsafe.Pointer(uintptr(evt) + offID)) = id + _, knd, src, _ := shape.Get(object) + if knd == "" { + knd = curie.IRI(cat) } - // patch Type - if len(*(*string)(unsafe.Pointer(uintptr(evt) + offType))) == 0 { - *(*string)(unsafe.Pointer(uintptr(evt) + offType)) = cat + if src == "" { + src = curie.IRI(q.Config().Source) } - // patch Created - if len(*(*string)(unsafe.Pointer(uintptr(evt) + offCreated))) == 0 { - t := time.Now().Format(time.RFC3339) - *(*string)(unsafe.Pointer(uintptr(evt) + offCreated)) = t - } + // TODO: migrate to v2 + shape.Put(object, guid.G.K(guid.Clock).String(), knd, src, time.Now()) msg, err := json.Marshal(object) if err != nil {