Skip to content

Commit

Permalink
Convert to use go 1.18 generics
Browse files Browse the repository at this point in the history
  • Loading branch information
sidecut committed Jan 25, 2022
1 parent 66f8f15 commit 79f644e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 62 deletions.
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
const topic = "topic"

func Example() {
ps := pubsub.New(0)
ps := pubsub.New[string](0)
ch := ps.Sub(topic)
go publish(ps)

Expand All @@ -35,7 +35,7 @@ func Example() {
// Received message, 6 times.
}

func publish(ps *pubsub.PubSub) {
func publish(ps *pubsub.PubSub[string]) {
for {
ps.Pub("message", topic)
}
Expand Down
84 changes: 42 additions & 42 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,71 +25,71 @@ const (
)

// PubSub is a collection of topics.
type PubSub struct {
cmdChan chan cmd
type PubSub[Item any] struct {
cmdChan chan cmd[Item]
capacity int
}

type cmd struct {
type cmd[Item any] struct {
op operation
topics []string
ch chan interface{}
msg interface{}
ch chan Item
msg Item
}

// New creates a new PubSub and starts a goroutine for handling operations.
// The capacity of the channels created by Sub and SubOnce will be as specified.
func New(capacity int) *PubSub {
ps := &PubSub{make(chan cmd), capacity}
func New[Item any](capacity int) *PubSub[Item] {
ps := &PubSub[Item]{make(chan cmd[Item]), capacity}
go ps.start()
return ps
}

// Sub returns a channel on which messages published on any of
// the specified topics can be received.
func (ps *PubSub) Sub(topics ...string) chan interface{} {
func (ps *PubSub[Item]) Sub(topics ...string) chan Item {
return ps.sub(sub, topics...)
}

// SubOnce is similar to Sub, but only the first message published, after subscription,
// on any of the specified topics can be received.
func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
func (ps *PubSub[Item]) SubOnce(topics ...string) chan Item {
return ps.sub(subOnce, topics...)
}

// SubOnceEach returns a channel on which callers receive, at most, one message
// for each topic.
func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} {
func (ps *PubSub[Item]) SubOnceEach(topics ...string) chan Item {
return ps.sub(subOnceEach, topics...)
}

func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
ch := make(chan interface{}, ps.capacity)
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
func (ps *PubSub[Item]) sub(op operation, topics ...string) chan Item {
ch := make(chan Item, ps.capacity)
ps.cmdChan <- cmd[Item]{op: op, topics: topics, ch: ch}
return ch
}

// AddSub adds subscriptions to an existing channel.
func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
func (ps *PubSub[Item]) AddSub(ch chan Item, topics ...string) {
ps.cmdChan <- cmd[Item]{op: sub, topics: topics, ch: ch}
}

// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach
// behavior.
func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch}
func (ps *PubSub[Item]) AddSubOnceEach(ch chan Item, topics ...string) {
ps.cmdChan <- cmd[Item]{op: subOnceEach, topics: topics, ch: ch}
}

// Pub publishes the given message to all subscribers of
// the specified topics.
func (ps *PubSub) Pub(msg interface{}, topics ...string) {
ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg}
func (ps *PubSub[Item]) Pub(msg Item, topics ...string) {
ps.cmdChan <- cmd[Item]{op: pub, topics: topics, msg: msg}
}

// TryPub publishes the given message to all subscribers of
// the specified topics if the topic has buffer space.
func (ps *PubSub) TryPub(msg interface{}, topics ...string) {
ps.cmdChan <- cmd{op: tryPub, topics: topics, msg: msg}
func (ps *PubSub[Item]) TryPub(msg Item, topics ...string) {
ps.cmdChan <- cmd[Item]{op: tryPub, topics: topics, msg: msg}
}

// Unsub unsubscribes the given channel from the specified
Expand All @@ -99,31 +99,31 @@ func (ps *PubSub) TryPub(msg interface{}, topics ...string) {
// Unsub must be called from a goroutine that is different from the subscriber.
// The subscriber must consume messages from the channel until it reaches the
// end. Not doing so can result in a deadlock.
func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) {
func (ps *PubSub[Item]) Unsub(ch chan Item, topics ...string) {
if len(topics) == 0 {
ps.cmdChan <- cmd{op: unsubAll, ch: ch}
ps.cmdChan <- cmd[Item]{op: unsubAll, ch: ch}
return
}

ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch}
ps.cmdChan <- cmd[Item]{op: unsub, topics: topics, ch: ch}
}

// Close closes all channels currently subscribed to the specified topics.
// If a channel is subscribed to multiple topics, some of which is
// not specified, it is not closed.
func (ps *PubSub) Close(topics ...string) {
ps.cmdChan <- cmd{op: closeTopic, topics: topics}
func (ps *PubSub[Item]) Close(topics ...string) {
ps.cmdChan <- cmd[Item]{op: closeTopic, topics: topics}
}

// Shutdown closes all subscribed channels and terminates the goroutine.
func (ps *PubSub) Shutdown() {
ps.cmdChan <- cmd{op: shutdown}
func (ps *PubSub[Item]) Shutdown() {
ps.cmdChan <- cmd[Item]{op: shutdown}
}

func (ps *PubSub) start() {
reg := registry{
topics: make(map[string]map[chan interface{}]subType),
revTopics: make(map[chan interface{}]map[string]bool),
func (ps *PubSub[Item]) start() {
reg := registry[Item]{
topics: make(map[string]map[chan Item]subType),
revTopics: make(map[chan Item]map[string]bool),
}

loop:
Expand Down Expand Up @@ -175,9 +175,9 @@ loop:

// registry maintains the current subscription state. It's not
// safe to access a registry from multiple goroutines simultaneously.
type registry struct {
topics map[string]map[chan interface{}]subType
revTopics map[chan interface{}]map[string]bool
type registry[Item any] struct {
topics map[string]map[chan Item]subType
revTopics map[chan Item]map[string]bool
}

type subType int
Expand All @@ -188,9 +188,9 @@ const (
normal
)

func (reg *registry) add(topic string, ch chan interface{}, st subType) {
func (reg *registry[Item]) add(topic string, ch chan Item, st subType) {
if reg.topics[topic] == nil {
reg.topics[topic] = make(map[chan interface{}]subType)
reg.topics[topic] = make(map[chan Item]subType)
}
reg.topics[topic][ch] = st

Expand All @@ -200,7 +200,7 @@ func (reg *registry) add(topic string, ch chan interface{}, st subType) {
reg.revTopics[ch][topic] = true
}

func (reg *registry) send(topic string, msg interface{}) {
func (reg *registry[Item]) send(topic string, msg Item) {
for ch, st := range reg.topics[topic] {
ch <- msg
switch st {
Expand All @@ -214,7 +214,7 @@ func (reg *registry) send(topic string, msg interface{}) {
}
}

func (reg *registry) sendNoWait(topic string, msg interface{}) {
func (reg *registry[Item]) sendNoWait(topic string, msg Item) {
for ch, st := range reg.topics[topic] {
select {
case ch <- msg:
Expand All @@ -232,19 +232,19 @@ func (reg *registry) sendNoWait(topic string, msg interface{}) {
}
}

func (reg *registry) removeTopic(topic string) {
func (reg *registry[Item]) removeTopic(topic string) {
for ch := range reg.topics[topic] {
reg.remove(topic, ch)
}
}

func (reg *registry) removeChannel(ch chan interface{}) {
func (reg *registry[Item]) removeChannel(ch chan Item) {
for topic := range reg.revTopics[ch] {
reg.remove(topic, ch)
}
}

func (reg *registry) remove(topic string, ch chan interface{}) {
func (reg *registry[Item]) remove(topic string, ch chan Item) {
if _, ok := reg.topics[topic]; !ok {
return
}
Expand Down
36 changes: 18 additions & 18 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestSub(t *testing.T) {
ps := New(1)
ps := New[string](1)
ch1 := ps.Sub("t1")
ch2 := ps.Sub("t1")
ch3 := ps.Sub("t2")
Expand All @@ -28,7 +28,7 @@ func TestSub(t *testing.T) {
}

func TestSubOnce(t *testing.T) {
ps := New(1)
ps := New[string](1)
defer ps.Shutdown()

ch := ps.SubOnce("t1")
Expand All @@ -38,7 +38,7 @@ func TestSubOnce(t *testing.T) {
}

func TestAddSub(t *testing.T) {
ps := New(3)
ps := New[string](3)
ch1 := ps.Sub("t1")
ch2 := ps.Sub("t2")

Expand All @@ -56,7 +56,7 @@ func TestAddSub(t *testing.T) {
}

func TestUnsub(t *testing.T) {
ps := New(1)
ps := New[string](1)
defer ps.Shutdown()

ch := ps.Sub("t1")
Expand All @@ -67,7 +67,7 @@ func TestUnsub(t *testing.T) {
}

func TestUnsubAll(t *testing.T) {
ps := New(1)
ps := New[string](1)
ch1 := ps.Sub("t1", "t2", "t3")
ch2 := ps.Sub("t1", "t3")

Expand All @@ -81,7 +81,7 @@ func TestUnsubAll(t *testing.T) {
}

func TestClose(t *testing.T) {
ps := New(1)
ps := New[string](1)
ch1 := ps.Sub("t1")
ch2 := ps.Sub("t1")
ch3 := ps.Sub("t2")
Expand All @@ -102,7 +102,7 @@ func TestClose(t *testing.T) {
}

func TestUnsubAfterClose(t *testing.T) {
ps := New(1)
ps := New[string](1)
ch := ps.Sub("t1")
defer func() {
ps.Unsub(ch, "t1")
Expand All @@ -115,15 +115,15 @@ func TestUnsubAfterClose(t *testing.T) {

func TestShutdown(t *testing.T) {
start := runtime.NumGoroutine()
New(10).Shutdown()
New[string](10).Shutdown()
time.Sleep(1 * time.Millisecond)
if current := runtime.NumGoroutine(); current != start {
t.Fatalf("Goroutine leak! Expected: %d, but there were: %d.", start, current)
}
}

func TestMultiSub(t *testing.T) {
ps := New(2)
ps := New[string](2)
ch := ps.Sub("t1", "t2")

ps.Pub("hi", "t1")
Expand All @@ -134,7 +134,7 @@ func TestMultiSub(t *testing.T) {
}

func TestMultiSubOnce(t *testing.T) {
ps := New(1)
ps := New[string](1)
defer ps.Shutdown()

ch := ps.SubOnce("t1", "t2")
Expand All @@ -146,7 +146,7 @@ func TestMultiSubOnce(t *testing.T) {
}

func TestMultiSubOnceEach(t *testing.T) {
ps := New(2)
ps := New[string](2)
ch := ps.SubOnceEach("t1", "t2")

ps.Pub("hi", "t1")
Expand All @@ -158,7 +158,7 @@ func TestMultiSubOnceEach(t *testing.T) {
}

func TestMultiPub(t *testing.T) {
ps := New(2)
ps := New[string](2)
ch1 := ps.Sub("t1")
ch2 := ps.Sub("t2")

Expand All @@ -170,7 +170,7 @@ func TestMultiPub(t *testing.T) {
}

func TestTryPub(t *testing.T) {
ps := New(1)
ps := New[string](1)
defer ps.Shutdown()

ch := ps.Sub("t1")
Expand All @@ -191,7 +191,7 @@ func TestTryPub(t *testing.T) {
}

func TestMultiUnsub(t *testing.T) {
ps := New(1)
ps := New[string](1)
defer ps.Shutdown()

ch := ps.Sub("t1", "t2", "t3")
Expand All @@ -205,7 +205,7 @@ func TestMultiUnsub(t *testing.T) {
}

func TestMultiClose(t *testing.T) {
ps := New(2)
ps := New[string](2)
defer ps.Shutdown()

ch := ps.Sub("t1", "t2")
Expand All @@ -219,10 +219,10 @@ func TestMultiClose(t *testing.T) {
checkContents(t, ch, []string{"hi", "hello"})
}

func checkContents(t *testing.T, ch chan interface{}, vals []string) {
contents := []string{}
func checkContents[Item any](t *testing.T, ch chan Item, vals []string) {
contents := []Item{}
for v := range ch {
contents = append(contents, v.(string))
contents = append(contents, v)
}

if !reflect.DeepEqual(contents, vals) {
Expand Down

0 comments on commit 79f644e

Please sign in to comment.