diff --git a/example_test.go b/example_test.go index 95e726a..92a0cf9 100644 --- a/example_test.go +++ b/example_test.go @@ -9,7 +9,7 @@ import ( const topic = "topic" func Example() { - ps := pubsub.New(0) + ps := pubsub.New[string](0) ch := ps.Sub(topic) go publish(ps) @@ -35,7 +35,7 @@ func Example() { // Received message, 6 times. } -func publish(ps *pubsub.PubSub) { +func publish(ps *pubsub.PubSub[string]) { for { ps.Pub("message", topic) } diff --git a/pubsub.go b/pubsub.go index 352ef6f..3d20b41 100644 --- a/pubsub.go +++ b/pubsub.go @@ -25,71 +25,71 @@ const ( ) // PubSub is a collection of topics. -type PubSub struct { - cmdChan chan cmd +type PubSub[Item any] struct { + cmdChan chan cmd[Item] capacity int } -type cmd struct { +type cmd[Item any] struct { op operation topics []string - ch chan interface{} - msg interface{} + ch chan Item + msg Item } // New creates a new PubSub and starts a goroutine for handling operations. // The capacity of the channels created by Sub and SubOnce will be as specified. -func New(capacity int) *PubSub { - ps := &PubSub{make(chan cmd), capacity} +func New[Item any](capacity int) *PubSub[Item] { + ps := &PubSub[Item]{make(chan cmd[Item]), capacity} go ps.start() return ps } // Sub returns a channel on which messages published on any of // the specified topics can be received. -func (ps *PubSub) Sub(topics ...string) chan interface{} { +func (ps *PubSub[Item]) Sub(topics ...string) chan Item { return ps.sub(sub, topics...) } // SubOnce is similar to Sub, but only the first message published, after subscription, // on any of the specified topics can be received. -func (ps *PubSub) SubOnce(topics ...string) chan interface{} { +func (ps *PubSub[Item]) SubOnce(topics ...string) chan Item { return ps.sub(subOnce, topics...) } // SubOnceEach returns a channel on which callers receive, at most, one message // for each topic. -func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} { +func (ps *PubSub[Item]) SubOnceEach(topics ...string) chan Item { return ps.sub(subOnceEach, topics...) } -func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { - ch := make(chan interface{}, ps.capacity) - ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} +func (ps *PubSub[Item]) sub(op operation, topics ...string) chan Item { + ch := make(chan Item, ps.capacity) + ps.cmdChan <- cmd[Item]{op: op, topics: topics, ch: ch} return ch } // AddSub adds subscriptions to an existing channel. -func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { - ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} +func (ps *PubSub[Item]) AddSub(ch chan Item, topics ...string) { + ps.cmdChan <- cmd[Item]{op: sub, topics: topics, ch: ch} } // AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach // behavior. -func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) { - ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch} +func (ps *PubSub[Item]) AddSubOnceEach(ch chan Item, topics ...string) { + ps.cmdChan <- cmd[Item]{op: subOnceEach, topics: topics, ch: ch} } // Pub publishes the given message to all subscribers of // the specified topics. -func (ps *PubSub) Pub(msg interface{}, topics ...string) { - ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg} +func (ps *PubSub[Item]) Pub(msg Item, topics ...string) { + ps.cmdChan <- cmd[Item]{op: pub, topics: topics, msg: msg} } // TryPub publishes the given message to all subscribers of // the specified topics if the topic has buffer space. -func (ps *PubSub) TryPub(msg interface{}, topics ...string) { - ps.cmdChan <- cmd{op: tryPub, topics: topics, msg: msg} +func (ps *PubSub[Item]) TryPub(msg Item, topics ...string) { + ps.cmdChan <- cmd[Item]{op: tryPub, topics: topics, msg: msg} } // Unsub unsubscribes the given channel from the specified @@ -99,31 +99,31 @@ func (ps *PubSub) TryPub(msg interface{}, topics ...string) { // Unsub must be called from a goroutine that is different from the subscriber. // The subscriber must consume messages from the channel until it reaches the // end. Not doing so can result in a deadlock. -func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) { +func (ps *PubSub[Item]) Unsub(ch chan Item, topics ...string) { if len(topics) == 0 { - ps.cmdChan <- cmd{op: unsubAll, ch: ch} + ps.cmdChan <- cmd[Item]{op: unsubAll, ch: ch} return } - ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch} + ps.cmdChan <- cmd[Item]{op: unsub, topics: topics, ch: ch} } // Close closes all channels currently subscribed to the specified topics. // If a channel is subscribed to multiple topics, some of which is // not specified, it is not closed. -func (ps *PubSub) Close(topics ...string) { - ps.cmdChan <- cmd{op: closeTopic, topics: topics} +func (ps *PubSub[Item]) Close(topics ...string) { + ps.cmdChan <- cmd[Item]{op: closeTopic, topics: topics} } // Shutdown closes all subscribed channels and terminates the goroutine. -func (ps *PubSub) Shutdown() { - ps.cmdChan <- cmd{op: shutdown} +func (ps *PubSub[Item]) Shutdown() { + ps.cmdChan <- cmd[Item]{op: shutdown} } -func (ps *PubSub) start() { - reg := registry{ - topics: make(map[string]map[chan interface{}]subType), - revTopics: make(map[chan interface{}]map[string]bool), +func (ps *PubSub[Item]) start() { + reg := registry[Item]{ + topics: make(map[string]map[chan Item]subType), + revTopics: make(map[chan Item]map[string]bool), } loop: @@ -175,9 +175,9 @@ loop: // registry maintains the current subscription state. It's not // safe to access a registry from multiple goroutines simultaneously. -type registry struct { - topics map[string]map[chan interface{}]subType - revTopics map[chan interface{}]map[string]bool +type registry[Item any] struct { + topics map[string]map[chan Item]subType + revTopics map[chan Item]map[string]bool } type subType int @@ -188,9 +188,9 @@ const ( normal ) -func (reg *registry) add(topic string, ch chan interface{}, st subType) { +func (reg *registry[Item]) add(topic string, ch chan Item, st subType) { if reg.topics[topic] == nil { - reg.topics[topic] = make(map[chan interface{}]subType) + reg.topics[topic] = make(map[chan Item]subType) } reg.topics[topic][ch] = st @@ -200,7 +200,7 @@ func (reg *registry) add(topic string, ch chan interface{}, st subType) { reg.revTopics[ch][topic] = true } -func (reg *registry) send(topic string, msg interface{}) { +func (reg *registry[Item]) send(topic string, msg Item) { for ch, st := range reg.topics[topic] { ch <- msg switch st { @@ -214,7 +214,7 @@ func (reg *registry) send(topic string, msg interface{}) { } } -func (reg *registry) sendNoWait(topic string, msg interface{}) { +func (reg *registry[Item]) sendNoWait(topic string, msg Item) { for ch, st := range reg.topics[topic] { select { case ch <- msg: @@ -232,19 +232,19 @@ func (reg *registry) sendNoWait(topic string, msg interface{}) { } } -func (reg *registry) removeTopic(topic string) { +func (reg *registry[Item]) removeTopic(topic string) { for ch := range reg.topics[topic] { reg.remove(topic, ch) } } -func (reg *registry) removeChannel(ch chan interface{}) { +func (reg *registry[Item]) removeChannel(ch chan Item) { for topic := range reg.revTopics[ch] { reg.remove(topic, ch) } } -func (reg *registry) remove(topic string, ch chan interface{}) { +func (reg *registry[Item]) remove(topic string, ch chan Item) { if _, ok := reg.topics[topic]; !ok { return } diff --git a/pubsub_test.go b/pubsub_test.go index 7930704..8b7f440 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -12,7 +12,7 @@ import ( ) func TestSub(t *testing.T) { - ps := New(1) + ps := New[string](1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t1") ch3 := ps.Sub("t2") @@ -28,7 +28,7 @@ func TestSub(t *testing.T) { } func TestSubOnce(t *testing.T) { - ps := New(1) + ps := New[string](1) defer ps.Shutdown() ch := ps.SubOnce("t1") @@ -38,7 +38,7 @@ func TestSubOnce(t *testing.T) { } func TestAddSub(t *testing.T) { - ps := New(3) + ps := New[string](3) ch1 := ps.Sub("t1") ch2 := ps.Sub("t2") @@ -56,7 +56,7 @@ func TestAddSub(t *testing.T) { } func TestUnsub(t *testing.T) { - ps := New(1) + ps := New[string](1) defer ps.Shutdown() ch := ps.Sub("t1") @@ -67,7 +67,7 @@ func TestUnsub(t *testing.T) { } func TestUnsubAll(t *testing.T) { - ps := New(1) + ps := New[string](1) ch1 := ps.Sub("t1", "t2", "t3") ch2 := ps.Sub("t1", "t3") @@ -81,7 +81,7 @@ func TestUnsubAll(t *testing.T) { } func TestClose(t *testing.T) { - ps := New(1) + ps := New[string](1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t1") ch3 := ps.Sub("t2") @@ -102,7 +102,7 @@ func TestClose(t *testing.T) { } func TestUnsubAfterClose(t *testing.T) { - ps := New(1) + ps := New[string](1) ch := ps.Sub("t1") defer func() { ps.Unsub(ch, "t1") @@ -115,7 +115,7 @@ func TestUnsubAfterClose(t *testing.T) { func TestShutdown(t *testing.T) { start := runtime.NumGoroutine() - New(10).Shutdown() + New[string](10).Shutdown() time.Sleep(1 * time.Millisecond) if current := runtime.NumGoroutine(); current != start { t.Fatalf("Goroutine leak! Expected: %d, but there were: %d.", start, current) @@ -123,7 +123,7 @@ func TestShutdown(t *testing.T) { } func TestMultiSub(t *testing.T) { - ps := New(2) + ps := New[string](2) ch := ps.Sub("t1", "t2") ps.Pub("hi", "t1") @@ -134,7 +134,7 @@ func TestMultiSub(t *testing.T) { } func TestMultiSubOnce(t *testing.T) { - ps := New(1) + ps := New[string](1) defer ps.Shutdown() ch := ps.SubOnce("t1", "t2") @@ -146,7 +146,7 @@ func TestMultiSubOnce(t *testing.T) { } func TestMultiSubOnceEach(t *testing.T) { - ps := New(2) + ps := New[string](2) ch := ps.SubOnceEach("t1", "t2") ps.Pub("hi", "t1") @@ -158,7 +158,7 @@ func TestMultiSubOnceEach(t *testing.T) { } func TestMultiPub(t *testing.T) { - ps := New(2) + ps := New[string](2) ch1 := ps.Sub("t1") ch2 := ps.Sub("t2") @@ -170,7 +170,7 @@ func TestMultiPub(t *testing.T) { } func TestTryPub(t *testing.T) { - ps := New(1) + ps := New[string](1) defer ps.Shutdown() ch := ps.Sub("t1") @@ -191,7 +191,7 @@ func TestTryPub(t *testing.T) { } func TestMultiUnsub(t *testing.T) { - ps := New(1) + ps := New[string](1) defer ps.Shutdown() ch := ps.Sub("t1", "t2", "t3") @@ -205,7 +205,7 @@ func TestMultiUnsub(t *testing.T) { } func TestMultiClose(t *testing.T) { - ps := New(2) + ps := New[string](2) defer ps.Shutdown() ch := ps.Sub("t1", "t2") @@ -219,10 +219,10 @@ func TestMultiClose(t *testing.T) { checkContents(t, ch, []string{"hi", "hello"}) } -func checkContents(t *testing.T, ch chan interface{}, vals []string) { - contents := []string{} +func checkContents[Item any](t *testing.T, ch chan Item, vals []string) { + contents := []Item{} for v := range ch { - contents = append(contents, v.(string)) + contents = append(contents, v) } if !reflect.DeepEqual(contents, vals) {