Skip to content

Commit

Permalink
Merge pull request #15 from jfontan/improvement/add-finite-memory-queue
Browse files Browse the repository at this point in the history
Add finite memory queue
  • Loading branch information
jfontan authored Oct 3, 2018
2 parents 3eb0eac + dfb1ed6 commit 6214d25
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 7 deletions.
36 changes: 29 additions & 7 deletions memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,39 @@ func init() {
queue.Register("memory", func(uri string) (queue.Broker, error) {
return New(), nil
})

queue.Register("memoryfinite", func(uri string) (queue.Broker, error) {
return NewFinite(true), nil
})
}

// Broker is a in-memory implementation of Broker.
type Broker struct {
queues map[string]queue.Queue
finite bool
}

// New creates a new Broker for an in-memory queue.
func New() queue.Broker {
return &Broker{make(map[string]queue.Queue)}
return NewFinite(false)
}

// NewFinite creates a new Broker for an in-memory queue. The argument
// specifies if the JobIter stops on EOF or not.
func NewFinite(finite bool) queue.Broker {
return &Broker{
queues: make(map[string]queue.Queue),
finite: finite,
}
}

// Queue returns the queue with the given name.
func (b *Broker) Queue(name string) (queue.Queue, error) {
if _, ok := b.queues[name]; !ok {
b.queues[name] = &Queue{jobs: make([]*queue.Job, 0, 10)}
b.queues[name] = &Queue{
jobs: make([]*queue.Job, 0, 10),
finite: b.finite,
}
}

return b.queues[name], nil
Expand All @@ -45,6 +62,7 @@ type Queue struct {
sync.RWMutex
idx int
publishImmediately bool
finite bool
}

// Publish publishes a Job to the queue.
Expand Down Expand Up @@ -101,13 +119,14 @@ func (q *Queue) Transaction(txcb queue.TxCallback) error {

// Consume implements Queue. MemoryQueues have infinite advertised window.
func (q *Queue) Consume(_ int) (queue.JobIter, error) {
return &JobIter{q: q, RWMutex: &q.RWMutex}, nil
return &JobIter{q: q, RWMutex: &q.RWMutex, finite: q.finite}, nil
}

// JobIter implements a queue.JobIter interface.
type JobIter struct {
q *Queue
closed bool
finite bool
*sync.RWMutex
}

Expand Down Expand Up @@ -149,12 +168,15 @@ func (i *JobIter) Next() (*queue.Job, error) {
}

j, err := i.next()
if err != nil {
time.Sleep(1 * time.Second)
continue
if err == nil {
return j, nil
}

if err == io.EOF && i.finite {
return nil, err
}

return j, nil
time.Sleep(1 * time.Second)
}
}

Expand Down
32 changes: 32 additions & 0 deletions memory/memory_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memory

import (
"io"
"testing"

"gopkg.in/src-d/go-queue.v1"
Expand Down Expand Up @@ -65,3 +66,34 @@ func (s *MemorySuite) TestIntegration() {
err = iter.Close()
assert.NoError(err)
}

func (s *MemorySuite) TestFinite() {
assert := assert.New(s.T())

b, err := queue.NewBroker("memoryfinite://")
assert.NoError(err)

qName := test.NewName()
q, err := b.Queue(qName)
assert.NoError(err)
assert.NotNil(q)

j, err := queue.NewJob()
assert.NoError(err)

j.Encode(true)
err = q.Publish(j)
assert.NoError(err)

advertisedWindow := 0 // ignored by memory brokers
iter, err := q.Consume(advertisedWindow)
assert.NoError(err)

retrievedJob, err := iter.Next()
assert.NoError(err)
assert.NoError(retrievedJob.Ack())

retrievedJob, err = iter.Next()
assert.Equal(io.EOF, err)
assert.Nil(retrievedJob)
}

0 comments on commit 6214d25

Please sign in to comment.