Skip to content

Commit

Permalink
Merge pull request #68 from pubnub/CE-3632-Clean-up-fixes
Browse files Browse the repository at this point in the history
Destroy now cleans up all goroutines opened by the SDK
  • Loading branch information
crimsonred authored Jun 20, 2019
2 parents 207b5d6 + 6ada628 commit 7846fbc
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 48 deletions.
9 changes: 8 additions & 1 deletion .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
---
changelog:
-
changes:
-
text: "Destroy now cleans up all goroutines opened by the SDK"
type: bug
date: Jun 20, 19
version: v4.2.4
-
changes:
-
Expand Down Expand Up @@ -356,4 +363,4 @@ supported-platforms:
- "Mac OS X 10.8 or later, amd64"
- "Windows 7 or later, amd64, 386"
version: "PubNub Go SDK"
version: v4.2.3
version: v4.2.4
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

# PubNub 4.2.3 client for Go
# PubNub 4.2.4 client for Go
* Go (1.9+)

# Please direct all Support Questions and Concerns to [email protected]
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.2.3
4.2.4
6 changes: 4 additions & 2 deletions listener_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func (m *ListenerManager) announceStatus(status *PNStatus) {
go func() {
m.RLock()
m.pubnub.Config.Log.Println("announceStatus lock")
AnnounceStatusLabel:
for l := range m.listeners {
select {
case <-m.exitListener:
m.pubnub.Config.Log.Println("announceStatus exitListener")
break
break AnnounceStatusLabel
case l.Status <- status:
}
}
Expand All @@ -79,11 +80,12 @@ func (m *ListenerManager) announceStatus(status *PNStatus) {
func (m *ListenerManager) announceMessage(message *PNMessage) {
go func() {
m.RLock()
AnnounceMessageLabel:
for l := range m.listeners {
select {
case <-m.exitListenerAnnounce:
m.pubnub.Config.Log.Println("announceMessage exitListenerAnnounce")
break
break AnnounceMessageLabel
case l.Message <- message:
}
}
Expand Down
19 changes: 7 additions & 12 deletions pubnub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// Default constants
const (
// Version :the version of the SDK
Version = "4.2.3"
Version = "4.2.4"
// MaxSequence for publish messages
MaxSequence = 65535
)
Expand Down Expand Up @@ -338,22 +338,17 @@ func (pn *PubNub) DeleteMessagesWithContext(ctx Context) *historyDeleteBuilder {
}

func (pn *PubNub) Destroy() {
pn.requestWorkers.Close()

close(pn.jobQueue)
pn.Config.Log.Println("Calling Destroy")
pn.cancel()

if pn.subscriptionManager != nil {
pn.subscriptionManager.Destroy()
pn.Config.Log.Println("after subscription manager Destroy")
}

pn.telemetryManager.RLock()
telManagerRunning := pn.telemetryManager.IsRunning
pn.telemetryManager.RUnlock()
if (pn.telemetryManager.ExitTelemetryManager != nil) && (telManagerRunning) {
pn.Config.Log.Println("calling exitTelemetryManager")
pn.telemetryManager.ExitTelemetryManager <- true
pn.Config.Log.Println("after exitTelemetryManager")
}
pn.Config.Log.Println("calling subscriptionManager Destroy")
if pn.heartbeatManager != nil {
pn.heartbeatManager.Destroy()
Expand Down Expand Up @@ -399,12 +394,12 @@ func NewPubNub(pnconf *Config) *PubNub {
pn.heartbeatManager = newHeartbeatManager(pn, ctx)
pn.telemetryManager = newTelemetryManager(pnconf.MaximumLatencyDataAge, ctx)
pn.jobQueue = make(chan *JobQItem)
pn.requestWorkers = pn.newNonSubQueueProcessor(pnconf.MaxWorkers)
pn.requestWorkers = pn.newNonSubQueueProcessor(pnconf.MaxWorkers, ctx)

return pn
}

func (pn *PubNub) newNonSubQueueProcessor(maxWorkers int) *RequestWorkers {
func (pn *PubNub) newNonSubQueueProcessor(maxWorkers int, ctx Context) *RequestWorkers {
workers := make(chan chan *JobQItem, maxWorkers)

pn.Config.Log.Printf("Init RequestWorkers: workers %d", maxWorkers)
Expand All @@ -413,7 +408,7 @@ func (pn *PubNub) newNonSubQueueProcessor(maxWorkers int) *RequestWorkers {
Workers: workers,
MaxWorkers: maxWorkers,
}
p.Start(pn)
p.Start(pn, ctx)
return p
}

Expand Down
48 changes: 37 additions & 11 deletions request_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,42 +29,63 @@ type RequestWorkers struct {
type Worker struct {
Workers chan chan *JobQItem
JobChannel chan *JobQItem
ctx Context
id int
}

func NewRequestWorkers(workers chan chan *JobQItem, id int) Worker {
var workers []Worker

func newRequestWorkers(workers chan chan *JobQItem, id int, ctx Context) Worker {
return Worker{
Workers: workers,
JobChannel: make(chan *JobQItem),
ctx: ctx,
id: id,
}
}

// Process runs a goroutine for the worker
func (pw Worker) Process(pubnub *PubNub) {
go func() {
ProcessLabel:
for {
pw.Workers <- pw.JobChannel
job := <-pw.JobChannel
res, err := job.Client.Do(job.Req)
jqr := &JobQResponse{
Error: err,
Resp: res,
select {
case pw.Workers <- pw.JobChannel:
job := <-pw.JobChannel
if job != nil {
res, err := job.Client.Do(job.Req)
jqr := &JobQResponse{
Error: err,
Resp: res,
}
job.JobResponse <- jqr
pubnub.Config.Log.Println("Request sent using worker id ", pw.id)
}
case <-pw.ctx.Done():
pubnub.Config.Log.Println("Exiting Worker Process by worker ctx, id ", pw.id)
break ProcessLabel
case <-pubnub.ctx.Done():
pubnub.Config.Log.Println("Exiting Worker Process by PN ctx, id ", pw.id)
break ProcessLabel
}
job.JobResponse <- jqr
}
}()
}

func (p *RequestWorkers) Start(pubnub *PubNub) {
// Start starts the workers
func (p *RequestWorkers) Start(pubnub *PubNub, ctx Context) {
pubnub.Config.Log.Println("Start: Running with workers ", p.MaxWorkers)
workers = make([]Worker, p.MaxWorkers)
for i := 0; i < p.MaxWorkers; i++ {
pubnub.Config.Log.Println("Start: StartNonSubWorker ", i)
worker := NewRequestWorkers(p.Workers, i)
worker := newRequestWorkers(p.Workers, i, ctx)
worker.Process(pubnub)
workers[i] = worker
}
go p.ReadQueue(pubnub)
}

// ReadQueue reads the queue and passes on the job to the workers
func (p *RequestWorkers) ReadQueue(pubnub *PubNub) {
for job := range pubnub.jobQueue {
pubnub.Config.Log.Println("ReadQueue: Got job for channel ", job.Req)
Expand All @@ -76,6 +97,11 @@ func (p *RequestWorkers) ReadQueue(pubnub *PubNub) {
pubnub.Config.Log.Println("ReadQueue: Exit")
}

// Close closes the workers
func (p *RequestWorkers) Close() {
close(p.Workers)

for _, w := range workers {
close(w.JobChannel)
w.ctx.Done()
}
}
28 changes: 8 additions & 20 deletions telemetry_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ type TelemetryManager struct {

cleanUpTimer *time.Ticker

maxLatencyDataAge int
ExitTelemetryManager chan bool
IsRunning bool
maxLatencyDataAge int
IsRunning bool
}

func newTelemetryManager(maxLatencyDataAge int, ctx Context) *TelemetryManager {
manager := &TelemetryManager{
maxLatencyDataAge: maxLatencyDataAge,
operations: make(map[string][]LatencyEntry),
ctx: ctx,
ExitTelemetryManager: make(chan bool),
maxLatencyDataAge: maxLatencyDataAge,
operations: make(map[string][]LatencyEntry),
ctx: ctx,
}

go manager.startCleanUpTimer()
Expand Down Expand Up @@ -109,6 +107,7 @@ func (m *TelemetryManager) CleanUpTelemetryData() {
delete(m.operations, endpoint)
}
}
m.ctx.Done()
m.Unlock()
}

Expand All @@ -118,27 +117,16 @@ func (m *TelemetryManager) startCleanUpTimer() {
cleanUpInterval*cleanUpIntervalMultiplier) * time.Millisecond)

go func() {
CleanUpTimerLabel:
for {
m.Lock()
m.IsRunning = true
m.Unlock()
timerCh := m.cleanUpTimer.C

select {
case <-timerCh:
m.CleanUpTelemetryData()
case <-m.ctx.Done():
m.Lock()
m.IsRunning = false
m.Unlock()
m.cleanUpTimer.Stop()
break
case <-m.ExitTelemetryManager:
m.Lock()
m.IsRunning = false
m.Unlock()
m.cleanUpTimer.Stop()
break
break CleanUpTimerLabel
}
}
}()
Expand Down

0 comments on commit 7846fbc

Please sign in to comment.