diff --git a/midgen.go b/midgen.go new file mode 100644 index 00000000..03293294 --- /dev/null +++ b/midgen.go @@ -0,0 +1,32 @@ +package pubsub + +import "sync" + +type msgIDGenerator struct { + defGen MsgIdFunction + + topicGens map[string]MsgIdFunction + topicGensLk sync.RWMutex +} + +func (m *msgIDGenerator) Add(topic string, gen MsgIdFunction) { + m.topicGensLk.Lock() + m.topicGens[topic] = gen + m.topicGensLk.Unlock() +} + +func (m *msgIDGenerator) GenID(msg *Message) string { + if msg.ID != "" { + return msg.ID + } + + m.topicGensLk.RLock() + gen, ok := m.topicGens[msg.GetTopic()] + m.topicGensLk.RUnlock() + if !ok { + gen = m.defGen + } + + msg.ID = gen(msg.Message) + return msg.ID +} diff --git a/pubsub.go b/pubsub.go index fdfa755b..cba16c59 100644 --- a/pubsub.go +++ b/pubsub.go @@ -213,6 +213,7 @@ const ( type Message struct { *pb.Message + ID string ReceivedFrom peer.ID ValidatorData interface{} } @@ -1047,8 +1048,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { continue } - msg := &Message{pmsg, rpc.from, nil} - p.pushMsg(msg) + p.pushMsg(&Message{pmsg, "", rpc.from, nil}) } } diff --git a/topic.go b/topic.go index 8de88c3c..edec94ff 100644 --- a/topic.go +++ b/topic.go @@ -283,7 +283,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error } } - return t.p.val.PushLocal(&Message{m, t.p.host.ID(), nil}) + return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil}) } // WithReadiness returns a publishing option for only publishing when the router is ready.