Skip to content

Commit

Permalink
gracefully cancel a request (JuliaLang#256)
Browse files Browse the repository at this point in the history
* gracefully cancel a request

Adds a way to gracefully cancel an ongoing request. The `request` method accepts an additional `interrupt` keyword which can be a `Base.Event`. When it is triggered, the [`curl_multi_remove_handle`](https://curl.se/libcurl/c/curl_multi_remove_handle.html) is invoked, which interrupts the easy handle gracefully. It closes the `output` and `progress` channels of the `Easy` handle to unblock the waiting request task, which then terminates with a `RequestError`.
  • Loading branch information
tanmaykm authored Sep 3, 2024
1 parent 1061ecc commit df33406
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ request(url;
[ debug = <none>, ]
[ throw = true, ]
[ downloader = <default>, ]
[ interrupt = <none>, ]
) -> Union{Response, RequestError}
```
- `url :: AbstractString`
Expand All @@ -110,6 +111,7 @@ request(url;
- `debug :: (type, message) --> Any`
- `throw :: Bool`
- `downloader :: Downloader`
- `interrupt :: Base.Event`

Make a request to the given url, returning a `Response` object capturing the
status, headers and other information about the response. The body of the
Expand All @@ -129,6 +131,11 @@ be downloaded (indicated by non-2xx status code), `request` returns a `Response`
object no matter what the status code of the response is. If there is an error
with getting a response at all, then a `RequestError` is thrown or returned.

If the `interrupt` keyword argument is provided, it must be a `Base.Event` object.
If the event is triggered while the request is in progress, the request will be
cancelled and an error will be thrown. This can be used to interrupt a long
running request, for example if the user wants to cancel a download.

### default_downloader!

```jl
Expand Down
4 changes: 4 additions & 0 deletions src/Curl/Multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ function socket_callback(
end
end
@isdefined(errormonitor) && errormonitor(task)
else
lock(multi.lock) do
check_multi_info(multi)
end
end
@isdefined(old_watcher) && close(old_watcher)
return 0
Expand Down
40 changes: 38 additions & 2 deletions src/Downloads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ end
[ debug = <none>, ]
[ throw = true, ]
[ downloader = <default>, ]
[ interrupt = <none>, ]
) -> Union{Response, RequestError}
url :: AbstractString
Expand All @@ -299,6 +300,7 @@ end
debug :: (type, message) --> Any
throw :: Bool
downloader :: Downloader
interrupt :: Base.Event
Make a request to the given url, returning a `Response` object capturing the
status, headers and other information about the response. The body of the
Expand All @@ -317,6 +319,11 @@ Note that unlike `download` which throws an error if the requested URL could not
be downloaded (indicated by non-2xx status code), `request` returns a `Response`
object no matter what the status code of the response is. If there is an error
with getting a response at all, then a `RequestError` is thrown or returned.
If the `interrupt` keyword argument is provided, it must be a `Base.Event` object.
If the event is triggered while the request is in progress, the request will be
cancelled and an error will be thrown. This can be used to interrupt a long
running request, for example if the user wants to cancel a download.
"""
function request(
url :: AbstractString;
Expand All @@ -330,6 +337,7 @@ function request(
debug :: Union{Function, Nothing} = nothing,
throw :: Bool = true,
downloader :: Union{Downloader, Nothing} = nothing,
interrupt :: Union{Nothing, Base.Event} = nothing,
) :: Union{Response, RequestError}
if downloader === nothing
lock(DOWNLOAD_LOCK) do
Expand Down Expand Up @@ -388,6 +396,20 @@ function request(

# do the request
add_handle(downloader.multi, easy)
interrupted = false
if interrupt !== nothing
interrupt_task = @async begin
# wait for the interrupt event
wait(interrupt)
# cancel the request
remove_handle(downloader.multi, easy)
close(easy.output)
close(easy.progress)
interrupted = true
end
else
interrupt_task = nothing
end
try # ensure handle is removed
@sync begin
@async for buf in easy.output
Expand All @@ -403,14 +425,28 @@ function request(
end
end
finally
remove_handle(downloader.multi, easy)
if !interrupted
if interrupt_task !== nothing
# trigger interrupt
notify(interrupt)
wait(interrupt_task)
else
remove_handle(downloader.multi, easy)
end
end
end

# return the response or throw an error
response = Response(get_response_info(easy)...)
easy.code == Curl.CURLE_OK && return response
message = get_curl_errstr(easy)
response = RequestError(url, easy.code, message, response)
if easy.code == typemax(Curl.CURLcode)
# uninitialized code, likely a protocol error
code = Int(0)
else
code = Int(easy.code)
end
response = RequestError(url, code, message, response)
throw && Base.throw(response)
end
end
Expand Down
12 changes: 12 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,18 @@ include("setup.jl")
end
end

@testset "interrupt" begin
url = "$server/delay/10"
interrupt = Base.Event()
download_task = @async request(url; interrupt=interrupt)
sleep(0.1)
@test !istaskdone(download_task)
notify(interrupt)
timedwait(()->istaskdone(download_task), 5.0)
@test istaskdone(download_task)
@test download_task.result isa RequestError
end

@testset "progress" begin
url = "$server/drip"
progress = []
Expand Down

0 comments on commit df33406

Please sign in to comment.