Skip to content

Commit

Permalink
streaming: Get tests passing
Browse files Browse the repository at this point in the history
Switch from RemoteFetcher to RemoteChannelFetcher
Pass object rather than type to `stream_{push,pull}_values!`
ProcessRingBuffer: Don't exit on graceful interrupt when non-empty
  • Loading branch information
jpsamaroo authored and JamesWrigley committed Oct 12, 2024
1 parent 43bd51f commit 96bad48
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 240 deletions.
2 changes: 1 addition & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ include("sch/Sch.jl"); using .Sch
include("datadeps.jl")

# Streaming
include("stream.jl")
include("stream-buffers.jl")
include("stream-transfer.jl")
include("stream.jl")

# Array computations
include("array/darray.jl")
Expand Down
218 changes: 44 additions & 174 deletions src/stream-buffers.jl
Original file line number Diff line number Diff line change
@@ -1,50 +1,35 @@
"""
A buffer that drops all elements put into it. Only to be used as the output
buffer for a task - will throw if attached as an input.
A buffer that drops all elements put into it.
"""
struct DropBuffer{T} end
mutable struct DropBuffer{T}
open::Bool
DropBuffer{T}() where T = new{T}(true)
end
DropBuffer{T}(_) where T = DropBuffer{T}()
Base.isempty(::DropBuffer) = true
isfull(::DropBuffer) = false
Base.put!(::DropBuffer, _) = nothing
Base.take!(::DropBuffer) = error("Cannot `take!` from a DropBuffer")

"A process-local buffer backed by a `Channel{T}`."
struct ChannelBuffer{T}
channel::Channel{T}
len::Int
count::Threads.Atomic{Int}
ChannelBuffer{T}(len::Int=1024) where T =
new{T}(Channel{T}(len), len, Threads.Atomic{Int}(0))
end
Base.isempty(cb::ChannelBuffer) = isempty(cb.channel)
isfull(cb::ChannelBuffer) = cb.count[] == cb.len
function Base.put!(cb::ChannelBuffer{T}, x) where T
put!(cb.channel, convert(T, x))
Threads.atomic_add!(cb.count, 1)
end
function Base.take!(cb::ChannelBuffer)
take!(cb.channel)
Threads.atomic_sub!(cb.count, 1)
end

"A cross-worker buffer backed by a `RemoteChannel{T}`."
struct RemoteChannelBuffer{T}
channel::RemoteChannel{Channel{T}}
len::Int
count::Threads.Atomic{Int}
RemoteChannelBuffer{T}(len::Int=1024) where T =
new{T}(RemoteChannel(()->Channel{T}(len)), len, Threads.Atomic{Int}(0))
end
Base.isempty(cb::RemoteChannelBuffer) = isempty(cb.channel)
isfull(cb::RemoteChannelBuffer) = cb.count[] == cb.len
function Base.put!(cb::RemoteChannelBuffer{T}, x) where T
put!(cb.channel, convert(T, x))
Threads.atomic_add!(cb.count, 1)
end
function Base.take!(cb::RemoteChannelBuffer)
take!(cb.channel)
Threads.atomic_sub!(cb.count, 1)
capacity(::DropBuffer) = typemax(Int)
Base.length(::DropBuffer) = 0
Base.isopen(buf::DropBuffer) = buf.open
function Base.close(buf::DropBuffer)
buf.open = false
end
function Base.put!(buf::DropBuffer, _)
if !isopen(buf)
throw(InvalidStateException("DropBuffer is closed", :closed))
end
task_may_cancel!(; must_force=true)
yield()
return
end
function Base.take!(buf::DropBuffer)
while true
if !isopen(buf)
throw(InvalidStateException("DropBuffer is closed", :closed))
end
task_may_cancel!(; must_force=true)
yield()
end
end

"A process-local ring buffer."
Expand All @@ -53,40 +38,45 @@ mutable struct ProcessRingBuffer{T}
write_idx::Int
@atomic count::Int
buffer::Vector{T}
open::Bool
@atomic open::Bool
function ProcessRingBuffer{T}(len::Int=1024) where T
buffer = Vector{T}(undef, len)
return new{T}(1, 1, 0, buffer, true)
end
end
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
capacity(rb::ProcessRingBuffer) = length(rb.buffer)
Base.length(rb::ProcessRingBuffer) = @atomic rb.count
Base.isopen(rb::ProcessRingBuffer) = rb.open
Base.isopen(rb::ProcessRingBuffer) = @atomic rb.open
function Base.close(rb::ProcessRingBuffer)
rb.open = false
@atomic rb.open = false
end
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
len = length(rb.buffer)
while (@atomic rb.count) == len
while isfull(rb)
yield()
if !isopen(rb)
throw(InvalidStateException("Stream is closed", :closed))
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
end
task_may_cancel!()
task_may_cancel!(; must_force=true)
end
to_write_idx = mod1(rb.write_idx, len)
to_write_idx = mod1(rb.write_idx, length(rb.buffer))
rb.buffer[to_write_idx] = convert(T, x)
rb.write_idx += 1
@atomic rb.count += 1
end
function Base.take!(rb::ProcessRingBuffer)
while (@atomic rb.count) == 0
while isempty(rb)
yield()
if !isopen(rb)
throw(InvalidStateException("Stream is closed", :closed))
if !isopen(rb) && isempty(rb)
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
end
task_may_cancel!()
if task_cancelled() && isempty(rb)
# We respect a graceful cancellation only if the buffer is empty.
# Otherwise, we may have values to continue communicating.
task_may_cancel!()
end
task_may_cancel!(; must_force=true)
end
to_read_idx = rb.read_idx
rb.read_idx += 1
Expand All @@ -106,123 +96,3 @@ function collect!(rb::ProcessRingBuffer{T}) where T

return output
end

#= TODO
"A server-local ring buffer backed by shared-memory."
mutable struct ServerRingBuffer{T}
read_idx::Int
write_idx::Int
@atomic count::Int
buffer::Vector{T}
function ServerRingBuffer{T}(len::Int=1024) where T
buffer = Vector{T}(undef, len)
return new{T}(1, 1, 0, buffer)
end
end
Base.isempty(rb::ServerRingBuffer) = (@atomic rb.count) == 0
function Base.put!(rb::ServerRingBuffer{T}, x) where T
len = length(rb.buffer)
while (@atomic rb.count) == len
yield()
end
to_write_idx = mod1(rb.write_idx, len)
rb.buffer[to_write_idx] = convert(T, x)
rb.write_idx += 1
@atomic rb.count += 1
end
function Base.take!(rb::ServerRingBuffer)
while (@atomic rb.count) == 0
yield()
end
to_read_idx = rb.read_idx
rb.read_idx += 1
@atomic rb.count -= 1
to_read_idx = mod1(to_read_idx, length(rb.buffer))
return rb.buffer[to_read_idx]
end
=#

#=
"A TCP-based ring buffer."
mutable struct TCPRingBuffer{T}
read_idx::Int
write_idx::Int
@atomic count::Int
buffer::Vector{T}
function TCPRingBuffer{T}(len::Int=1024) where T
buffer = Vector{T}(undef, len)
return new{T}(1, 1, 0, buffer)
end
end
Base.isempty(rb::TCPRingBuffer) = (@atomic rb.count) == 0
function Base.put!(rb::TCPRingBuffer{T}, x) where T
len = length(rb.buffer)
while (@atomic rb.count) == len
yield()
end
to_write_idx = mod1(rb.write_idx, len)
rb.buffer[to_write_idx] = convert(T, x)
rb.write_idx += 1
@atomic rb.count += 1
end
function Base.take!(rb::TCPRingBuffer)
while (@atomic rb.count) == 0
yield()
end
to_read_idx = rb.read_idx
rb.read_idx += 1
@atomic rb.count -= 1
to_read_idx = mod1(to_read_idx, length(rb.buffer))
return rb.buffer[to_read_idx]
end
=#

#=
"""
A flexible puller which switches to the most efficient buffer type based
on the sender and receiver locations.
"""
mutable struct UniBuffer{T}
buffer::Union{ProcessRingBuffer{T}, Nothing}
end
function initialize_stream_buffer!(::Type{UniBuffer{T}}, T, send_proc, recv_proc, buffer_amount) where T
if buffer_amount == 0
error("Return NullBuffer")
end
send_osproc = get_parent(send_proc)
recv_osproc = get_parent(recv_proc)
if send_osproc.pid == recv_osproc.pid
inner = RingBuffer{T}(buffer_amount)
elseif system_uuid(send_osproc.pid) == system_uuid(recv_osproc.pid)
inner = ProcessBuffer{T}(buffer_amount)
else
inner = RemoteBuffer{T}(buffer_amount)
end
return UniBuffer{T}(buffer_amount)
end
struct LocalPuller{T,B}
buffer::B{T}
id::UInt
function LocalPuller{T,B}(id::UInt, buffer_amount::Integer) where {T,B}
buffer = initialize_stream_buffer!(B, T, buffer_amount)
return new{T,B}(buffer, id)
end
end
function Base.take!(pull::LocalPuller{T,B}) where {T,B}
if pull.buffer === nothing
pull.buffer =
error("Return NullBuffer")
end
value = take!(pull.buffer)
end
function initialize_input_stream!(stream::Stream{T,B}, id::UInt, send_proc::Processor, recv_proc::Processor, buffer_amount::Integer) where {T,B}
local_buffer = remotecall_fetch(stream.ref.handle.owner, stream.ref.handle, id) do ref, id
local_buffer, remote_buffer = initialize_stream_buffer!(B, T, send_proc, recv_proc, buffer_amount)
ref.buffers[id] = remote_buffer
return local_buffer
end
stream.buffer = local_buffer
return stream
end
=#
Loading

0 comments on commit 96bad48

Please sign in to comment.