Skip to content

Commit

Permalink
use a waitgroup to wait for reserve holders of MFiles before unmapping
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Dec 11, 2024
1 parent 256e916 commit 1961dee
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/lavinmq/clustering/actions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module LavinMQ

struct AddAction < Action
def initialize(@data_dir : String, @filename : String, @mfile : MFile? = nil)
@mfile.try &.reserve
end

def lag_size : Int64
Expand All @@ -44,6 +45,7 @@ module LavinMQ
size = mfile.size.to_i64
socket.write_bytes size, IO::ByteFormat::LittleEndian
mfile.copy_to(socket, size)
mfile.unreserve
size
else
File.open(File.join(@data_dir, @filename)) do |f|
Expand All @@ -58,6 +60,9 @@ module LavinMQ

struct AppendAction < Action
def initialize(@data_dir : String, @filename : String, @obj : Bytes | FileRange | UInt32 | Int32)
if range = @obj.as?(FileRange)
range.mfile.reserve
end
end

def lag_size : Int64
Expand Down Expand Up @@ -85,6 +90,7 @@ module LavinMQ
len = obj.len.to_i64
socket.write_bytes -len.to_i64, IO::ByteFormat::LittleEndian
socket.write obj.to_slice
obj.mfile.unreserve
in UInt32, Int32
len = 4i64
socket.write_bytes -len.to_i64, IO::ByteFormat::LittleEndian
Expand Down
30 changes: 25 additions & 5 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "wait_group"

lib LibC
MS_ASYNC = 1
MREMAP_MAYMOVE = 1
Expand Down Expand Up @@ -31,6 +33,7 @@ class MFile < IO
getter fd : Int32
@buffer = Pointer(UInt8).null
@deleted = false
@wg = WaitGroup.new

# Map a file, if no capacity is given the file must exists and
# the file will be mapped as readonly
Expand Down Expand Up @@ -100,13 +103,30 @@ class MFile < IO
self
end

def reserve
@wg.add
end

def unreserve
@wg.done
end

# The file will be truncated to the current position unless readonly or deleted
def close(truncate_to_size = true)
# unmap occurs on finalize
if truncate_to_size && !@readonly && !@deleted && @fd > 0
code = LibC.ftruncate(@fd, @size)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
end

# unmap if non has reserved the file, race condition prone?
if @wg.@counter.get(:acquire).zero?
unmap
else
spawn(name: "munmap #{@path}") do
@wg.wait
unmap
end
end
ensure
unless @fd == -1
code = LibC.close(@fd)
Expand Down Expand Up @@ -170,10 +190,10 @@ class MFile < IO
raise RuntimeError.from_errno("msync") if code < 0
end

def finalize
LibC.close(@fd) if @fd > -1
LibC.munmap(@buffer, @capacity) unless @buffer.null?
end
# def finalize
# LibC.close(@fd) if @fd > -1
# LibC.munmap(@buffer, @capacity) unless @buffer.null?
# end

def write(slice : Bytes) : Nil
size = @size
Expand Down

0 comments on commit 1961dee

Please sign in to comment.