Skip to content

Commit

Permalink
Fixes for Timer and TimePeriod usage
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Sep 30, 2024
1 parent 7672bb5 commit 2793282
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/connection/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ function ping(nc; timeout::Union{Real, Period} = 1.0, measure = true)
last_pong_count = @atomic nc.pong_count
start_time = time()
send(nc, Ping())
result = timedwait(Second(timeout); pollint = Millisecond(1)) do
result = timedwait(timeout; pollint = 0.001) do
(@atomic nc.pong_count) != last_pong_count
end
result == :timed_out && error("No PONG received in specified timeout ($timeout seconds).")
result == :ok || error("Unexpected status symbol: :$result")
if measure && (@atomic nc.pong_received_at) > 0.0
@info "Measured ping time is $(1000 * (nc.pong_received_at - start_time)) milliseconds"
@info "Measured ping time is $(1000 * ((@atomic nc.pong_received_at) - start_time)) milliseconds"
end
Pong()
end
Expand Down
4 changes: 2 additions & 2 deletions src/connection/drain.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
### Code:

# Actual drain logic, for thread safety executed in connection controller task.
function _do_drain(nc::Connection, is_connected::Bool; timeout::Union{Real, TimePeriod} = nc.drain_timeout)
timer = Timer(Second(timeout))
function _do_drain(nc::Connection, is_connected::Bool; timeout::Union{Real, Period} = nc.drain_timeout)
timer = Timer(timeout)
sids = @lock nc.lock copy(keys(nc.sub_data))
for sid in sids
send(nc, Unsub(sid, 0))
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/drain.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Optional keyword arguments:
- `timeout`: error will be thrown if drain not finished until `timeout` expires. Default value is configurable per connection on `connect` with `drain_timeout`. Can be also set globally with `NATS_DRAIN_TIMEOUT_SECONDS` environment variable. If not set explicitly default drain timeout is `$DEFAULT_DRAIN_TIMEOUT_SECONDS` seconds.
"""
function drain(connection::Connection, sub::Sub; timeout::Union{Real, Period} = connection.drain_timeout)
timer = Timer(Second(timeout))
timer = Timer(timeout)
sub_stats = stats(connection, sub)
if isnothing(sub_stats)
return # Already drained.
Expand Down
5 changes: 3 additions & 2 deletions src/reqreply/request.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ function request(
sub = subscribe(connection, reply_to)
unsubscribe(connection, sub; max_msgs = nreplies)
publish(connection, subject, data; reply_to)
timer = Timer(Second(timeout).value) do _ # TODO: get rid of .value in 1.11
drain(connection, sub)
if timeout isa Period # TODO: get rid of if in 1.11
timeout = Nanosecond(timeout) / Nanosecond(Second(1))

Check warning on line 87 in src/reqreply/request.jl

View check run for this annotation

Codecov / codecov/patch

src/reqreply/request.jl#L87

Added line #L87 was not covered by tests
end
timer = Timer(timeout) do _; drain(connection, sub) end
result = Msg[]
for _ in 1:nreplies
msg = next(connection, sub; no_throw = true)
Expand Down

0 comments on commit 2793282

Please sign in to comment.