Skip to content

Commit

Permalink
While doing GC all SPQueues must be sorted by SP
Browse files Browse the repository at this point in the history
Priority queues and delayed message exchange queues were not, so now they're copied and sorted before iterated over in the GC algo
  • Loading branch information
snichme authored Jan 22, 2021
1 parent 4a243f6 commit d2cfe37
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 8 deletions.
6 changes: 3 additions & 3 deletions spec/queue_sortedreadyqueue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ require "spec"
require "../src/avalanchemq/segment_position"
require "../src/avalanchemq/queue/ready"

describe AvalancheMQ::Queue::SortedReadyQueue do
describe AvalancheMQ::Queue::ExpirationReadyQueue do
it "should insert SegmentPosition sorted" do
q = AvalancheMQ::Queue::SortedReadyQueue.new
q = AvalancheMQ::Queue::ExpirationReadyQueue.new
sps = [
AvalancheMQ::SegmentPosition.new(10,10,5u32),
AvalancheMQ::SegmentPosition.new(10,10,1u32),
Expand All @@ -21,7 +21,7 @@ describe AvalancheMQ::Queue::SortedReadyQueue do
end

it "should return SegmentPosition with lowest expiration ts" do
q = AvalancheMQ::Queue::SortedReadyQueue.new
q = AvalancheMQ::Queue::ExpirationReadyQueue.new
sps = [
AvalancheMQ::SegmentPosition.new(10,10,5u32),
AvalancheMQ::SegmentPosition.new(10,10,1u32),
Expand Down
60 changes: 60 additions & 0 deletions spec/segment_position_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,64 @@ describe AvalancheMQ::SegmentPosition do
sp.expiration_ts.should eq 0
sp.priority.should eq 0
end

describe AvalancheMQ::VHost::ReferencedSPs do
subject = AvalancheMQ::SegmentPosition

it "it should always be sorted (ReadyQueue)" do
offsets = (0..4).to_a.map(&.to_i64)
sps = offsets.map { |i| AvalancheMQ::SegmentPosition.from_i64(i) }

ready = AvalancheMQ::Queue::ReadyQueue.new(sps.size)
ready.insert(sps)

ref_sps = AvalancheMQ::VHost::ReferencedSPs.new(1)
ref_sps << AvalancheMQ::VHost::SPQueue.new(ready)

ready.to_a.map(&.to_i64).should eq offsets
ref_sps.to_a.map(&.to_i64).should eq offsets
end

it "should always be sorted (ExpirationReadyQueue)" do
sps = [
AvalancheMQ::SegmentPosition.new(0, 0, expiration_ts: 1_i64),
AvalancheMQ::SegmentPosition.new(0, 1, expiration_ts: 3_i64),
AvalancheMQ::SegmentPosition.new(0, 2, expiration_ts: 1_i64),
AvalancheMQ::SegmentPosition.new(0, 3, expiration_ts: 5_i64),
AvalancheMQ::SegmentPosition.new(0, 4, expiration_ts: 4_i64)
]
ready = AvalancheMQ::Queue::ExpirationReadyQueue.new(sps.size)
ready.insert(sps)

ref_sps = AvalancheMQ::VHost::ReferencedSPs.new(1)
ref_sps << AvalancheMQ::VHost::SPQueue.new(ready)

# The ExpirationReadyQueue queue should always be ordered by expiration timestamp
ready.to_a.map(&.to_i64).should eq [0, 2, 1, 4, 3].map(&.to_i64)

# The same ready queue but from ReferencesSPs should be ordered by SP (segment + offset)
ref_sps.to_a.map(&.to_i64).should eq (0..4).to_a.map(&.to_i64)
end

it "should always be sorted (PriorityReadyQueue)" do
sps = [
AvalancheMQ::SegmentPosition.new(0, 0, priority: 1_u8),
AvalancheMQ::SegmentPosition.new(0, 1, priority: 3_u8),
AvalancheMQ::SegmentPosition.new(0, 2, priority: 1_u8),
AvalancheMQ::SegmentPosition.new(0, 3, priority: 5_u8),
AvalancheMQ::SegmentPosition.new(0, 4, priority: 4_u8)
]
ready = AvalancheMQ::Queue::PriorityReadyQueue.new(sps.size)
ready.insert(sps)

ref_sps = AvalancheMQ::VHost::ReferencedSPs.new(1)
ref_sps << AvalancheMQ::VHost::SPQueue.new(ready)

# The PriorityReadyQueue queue should always be ordered by priority (highest prio first)
ready.to_a.map(&.to_i64).should eq [3, 4, 1, 0, 2].map(&.to_i64)

# The same ready queue but from ReferencesSPs should be ordered by SP (segment + offset)
ref_sps.to_a.map(&.to_i64).should eq (0..4).to_a.map(&.to_i64)
end
end
end
2 changes: 1 addition & 1 deletion src/avalanchemq/queue/delayed_exchange_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require "./durable_queue"

module AvalancheMQ
module DelayedExchangeQueueMixin
@ready : Queue::ReadyQueue = Queue::SortedReadyQueue.new
@ready : Queue::ReadyQueue = Queue::ExpirationReadyQueue.new
@internal = true
end

Expand Down
14 changes: 10 additions & 4 deletions src/avalanchemq/queue/ready.cr
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,13 @@ module AvalancheMQ
def unlock
@lock.unlock
end

def to_a
@ready.to_a
end
end

class SortedReadyQueue < ReadyQueue
abstract class SortedReadyQueue < ReadyQueue
def push(sp : SegmentPosition) : Int32
insert(sp)
end
Expand All @@ -227,16 +231,18 @@ module AvalancheMQ
# Insert SPs sorted, the array should ideally be sorted too
def insert(sps : Enumerable(SegmentPosition))
@lock.synchronize do
sps.reverse_each do |sp|
sps.each do |sp|
insert_sorted(sp)
end
@ready.size
end
end
end

class ExpirationReadyQueue < SortedReadyQueue
private def insert_sorted(sp)
idx = @ready.bsearch_index do |rsp|
rsp.expiration_ts >= sp.expiration_ts || rsp >= sp
rsp.expiration_ts > sp.expiration_ts
end
idx ? @ready.insert(idx, sp) : @ready.push(sp)
end
Expand All @@ -245,7 +251,7 @@ module AvalancheMQ
class PriorityReadyQueue < SortedReadyQueue
private def insert_sorted(sp)
idx = @ready.bsearch_index do |rsp|
sp.priority > rsp.priority
rsp.priority < sp.priority
end
idx ? @ready.insert(idx, sp) : @ready.push(sp)
end
Expand Down
34 changes: 34 additions & 0 deletions src/avalanchemq/vhost/spqueue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ module AvalancheMQ
SPUnackQueue.new(unack)
end

def self.new(ready : Queue::SortedReadyQueue)
SPUnsortedReadyQueue.new(ready)
end

def <=>(other : self)
peek <=> other.peek
end
Expand Down Expand Up @@ -52,6 +56,36 @@ module AvalancheMQ
end
end

class SPUnsortedReadyQueue < SPQueue
@list : Array(SegmentPosition)

# FIXME: This could be a performance issue on long queues
# that is using Delayed messages or Priority
# We need to sort the ready queue by SP for the GC as a
# SortedReadyQueue could be sorted on any attribute on a SP
def initialize(ready : Queue::SortedReadyQueue)
@list = ready.to_a.sort! { |a, b| b <=> a }
end

def peek : SegmentPosition
@list.last
end

def shift : SegmentPosition
@list.pop
end

def empty? : Bool
@list.empty?
end

def lock : Nil
end

def unlock : Nil
end
end

class SPUnackQueue < SPQueue
def initialize(@unack : Queue::UnackQueue)
end
Expand Down

0 comments on commit d2cfe37

Please sign in to comment.