Skip to content

Commit

Permalink
feat: introduce msgIdGenerator and add ID field to Message wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan authored and vyzo committed Jan 23, 2022
1 parent b57bcc8 commit 957fc4f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
32 changes: 32 additions & 0 deletions midgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pubsub

import "sync"

type msgIDGenerator struct {
defGen MsgIdFunction

topicGens map[string]MsgIdFunction
topicGensLk sync.RWMutex
}

func (m *msgIDGenerator) Add(topic string, gen MsgIdFunction) {
m.topicGensLk.Lock()
m.topicGens[topic] = gen
m.topicGensLk.Unlock()
}

func (m *msgIDGenerator) GenID(msg *Message) string {
if msg.ID != "" {
return msg.ID
}

m.topicGensLk.RLock()
gen, ok := m.topicGens[msg.GetTopic()]
m.topicGensLk.RUnlock()
if !ok {
gen = m.defGen
}

msg.ID = gen(msg.Message)
return msg.ID
}
4 changes: 2 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ const (

type Message struct {
*pb.Message
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
}
Expand Down Expand Up @@ -1047,8 +1048,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue
}

msg := &Message{pmsg, rpc.from, nil}
p.pushMsg(msg)
p.pushMsg(&Message{pmsg, "", rpc.from, nil})
}
}

Expand Down
2 changes: 1 addition & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
}
}

return t.p.val.PushLocal(&Message{m, t.p.host.ID(), nil})
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
Expand Down

0 comments on commit 957fc4f

Please sign in to comment.