Skip to content

Commit

Permalink
streaming: Fix concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed May 2, 2024
1 parent 8423fb8 commit aa7b000
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 50 deletions.
44 changes: 25 additions & 19 deletions src/stream-fetchers.jl
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
struct RemoteFetcher end
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
if store_ref.handle.owner == myid()
store = fetch(store_ref)::Store_remote
while !isfull(buffer)
value = take!(store, id)::T
put!(buffer, value)
thunk_id = STREAM_THUNK_ID[]
@dagdebug thunk_id :stream "fetching values"
@label fetch_values
# FIXME: Pass buffer free space
# TODO: It would be ideal if we could wait on store.lock, but get unlocked during migration
values = MemPool.access_ref(store_ref.handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
if !isopen(store)
throw(InvalidStateException("Stream is closed", :closed))
end
else
thunk_id = STREAM_THUNK_ID[]
values = remotecall_fetch(store_ref.handle.owner, store_ref.handle, id, T, Store_remote) do store_ref, id, T, Store_remote
STREAM_THUNK_ID[] = thunk_id
store = MemPool.poolget(store_ref)::Store_remote
values = T[]
while !isempty(store, id)
value = take!(store, id)::T
push!(values, value)
end
return values
end::Vector{T}
for value in values
put!(buffer, value)
@dagdebug thunk_id :stream "trying to fetch values at $(myid())"
store::Store_remote
in_store = store
STREAM_THUNK_ID[] = thunk_id
values = T[]
while !isempty(store, id)
value = take!(store, id)::T
push!(values, value)
end
return values
end::Vector{T}
if isempty(values)
@goto fetch_values
end

@dagdebug thunk_id :stream "fetched $(length(values)) values"
for value in values
put!(buffer, value)
end
end
68 changes: 37 additions & 31 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ function tid_to_uid(thunk_id)
end
function Base.put!(store::StreamStore{T,B}, value) where {T,B}
thunk_id = STREAM_THUNK_ID[]
uid = tid_to_uid(thunk_id)
@lock store.lock begin
if !isopen(store)
@dagdebug thunk_id :stream "[$uid] closed!"
@dagdebug thunk_id :stream "closed!"
throw(InvalidStateException("Stream is closed", :closed))
end
@dagdebug thunk_id :stream "[$uid] adding $value"
@dagdebug thunk_id :stream "adding $value"
for buffer in values(store.buffers)
while isfull(buffer)
@dagdebug thunk_id :stream "[$uid] buffer full, waiting"
@dagdebug thunk_id :stream "buffer full, waiting"
wait(store.lock)
end
put!(buffer, value)
Expand All @@ -38,16 +37,15 @@ function Base.put!(store::StreamStore{T,B}, value) where {T,B}
end
function Base.take!(store::StreamStore, id::UInt)
thunk_id = STREAM_THUNK_ID[]
uid = tid_to_uid(thunk_id)
@lock store.lock begin
buffer = store.buffers[id]
while isempty(buffer) && isopen(store, id)
@dagdebug thunk_id :stream "[$uid] no elements, not taking"
@dagdebug thunk_id :stream "no elements, not taking"
wait(store.lock)
end
@dagdebug thunk_id :stream "[$uid] wait finished"
@dagdebug thunk_id :stream "wait finished"
if !isopen(store, id)
@dagdebug thunk_id :stream "[$uid] closed!"
@dagdebug thunk_id :stream "closed!"
throw(InvalidStateException("Stream is closed", :closed))
end
unlock(store.lock)
Expand All @@ -56,7 +54,7 @@ function Base.take!(store::StreamStore, id::UInt)
finally
lock(store.lock)
end
@dagdebug thunk_id :stream "[$uid] value accepted"
@dagdebug thunk_id :stream "value accepted"
notify(store.lock)
return value
end
Expand Down Expand Up @@ -129,46 +127,53 @@ function Base.take!(stream::Stream{T,B}, id::UInt) where {T,B}
return take!(stream.input_buffer)
end
function Base.isopen(stream::Stream, id::UInt)::Bool
return remotecall_fetch(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
return isopen(MemPool.poolget(ref)::StreamStore, id)
return MemPool.access_ref(stream.store_ref.handle, id) do store, id
return isopen(store::StreamStore, id)
end
end
function Base.close(stream::Stream)
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
close(MemPool.poolget(ref)::StreamStore)
MemPool.access_ref(stream.store_ref.handle) do store
close(store::StreamStore)
return
end
return
end
function add_waiters!(stream::Stream, waiters::Vector{Int})
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
add_waiters!(MemPool.poolget(ref)::StreamStore, waiters)
MemPool.access_ref(stream.store_ref.handle, waiters) do store, waiters
add_waiters!(store::StreamStore, waiters)
return
end
return
end
add_waiters!(stream::Stream, waiter::Integer) =
add_waiters!(stream::Stream, Int[waiter])
function remove_waiters!(stream::Stream, waiters::Vector{Int})
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
remove_waiters!(MemPool.poolget(ref)::StreamStore, waiters)
MemPool.access_ref(stream.store_ref.handle, waiters) do store, waiters
remove_waiters!(store::StreamStore, waiters)
return
end
return
end
remove_waiters!(stream::Stream, waiter::Integer) =
remove_waiters!(stream::Stream, Int[waiter])

function migrate_stream!(stream::Stream, w::Integer=myid())
if !isdefined(MemPool, :migrate!)
@warn "MemPool migration support not enabled!\nPerformance may be degraded" maxlog=1
return
end

# Perform migration of the StreamStore
# MemPool will block access to the new ref until the migration completes
# FIXME: Do this with MemPool.access_ref, in case stream was already migrated
if stream.store_ref.handle.owner != w
# Take lock to prevent any further modifications
# N.B. Serialization automatically unlocks
remotecall_wait(stream.store_ref.handle.owner, stream.store_ref.handle) do ref
lock((MemPool.poolget(ref)::StreamStore).lock)
new_store_ref = MemPool.migrate!(stream.store_ref.handle, w; pre_migration=store->begin
# Lock store to prevent any further modifications
# N.B. Serialization automatically unlocks the migrated copy
lock((store::StreamStore).lock)
end, post_migration=store->begin
# Unlock the store
# FIXME: Indicate to all waiters that this store is dead
unlock((store::StreamStore).lock)
end)
if w == myid()
stream.store = MemPool.access_ref(identity, new_store_ref; local_only=true)
end

MemPool.migrate!(stream.store_ref.handle, w)
end
end

Expand Down Expand Up @@ -272,6 +277,7 @@ function (sf::StreamingFunction)(args...; kwargs...)
# Migrate our output stream to this worker
if sf.stream isa Stream
migrate_stream!(sf.stream)
@dagdebug thunk_id :stream "Migration complete"
end

try
Expand Down Expand Up @@ -299,13 +305,13 @@ function (sf::StreamingFunction)(args...; kwargs...)
end
end
for stream in streams
@dagdebug thunk_id :stream "[$uid] dropping waiter"
@dagdebug thunk_id :stream "dropping waiter"
remove_waiters!(stream, uid)
@dagdebug thunk_id :stream "[$uid] dropped waiter"
@dagdebug thunk_id :stream "dropped waiter"
end

# Ensure downstream tasks also terminate
@dagdebug thunk_id :stream "[$uid] closed stream"
@dagdebug thunk_id :stream "closed stream"
close(sf.stream)
end
end
Expand Down

0 comments on commit aa7b000

Please sign in to comment.