Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup single fallible case. #4218

Merged
merged 8 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/core/builtins/builtins_request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ let _ =
Some
"Limit in seconds to the duration of the request resolution. \
Defaults to `settings.request.timeout` when `null`." );
( "content_type",
Lang.nullable_t (Lang.source_t (Lang.univ_t ())),
Some Lang.null,
Some
"Check that the request can decode content suitable for the given \
source." );
("", Request.Value.t, None, None);
]
Lang.bool_t
Expand All @@ -121,8 +127,17 @@ let _ =
let timeout =
Lang.to_valued_option Lang.to_float (List.assoc "timeout" p)
in
let source =
Lang.to_valued_option Lang.to_source (List.assoc "content_type" p)
in
let r = Request.Value.of_value (List.assoc "" p) in
Lang.bool (try Request.resolve ?timeout r = `Resolved with _ -> false))
Lang.bool
(match (Request.resolve ?timeout r, source) with
| `Resolved, Some s -> (
try Request.get_decoder ~ctype:s#content_type r <> None
with _ -> false)
| `Resolved, None -> true
| _ | (exception _) -> false))

let _ =
Lang.add_builtin ~base:request "metadata" ~category:`Liquidsoap
Expand Down Expand Up @@ -310,8 +325,8 @@ class process ~name r =
~name ~priority:`Non_blocking
~retry_delay:(fun _ -> 0.1)
~available:(fun _ -> true)
~prefetch:1 ~timeout:None ~synchronous:true
(Lang.val_fun [] (fun _ -> Lang.null))
1 None

initializer
self#on_wake_up (fun () ->
Expand Down
72 changes: 49 additions & 23 deletions src/core/sources/request_dynamic.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type handler = {
close : unit -> unit;
}

type task = { notify : unit -> unit; stop : unit -> unit }

let log_failed_request (log : Log.t) request ans =
log#important "Could not resolve request %s: %s."
(Request.initial_uri request)
Expand All @@ -47,20 +49,14 @@ let log_failed_request (log : Log.t) request ans =
| `Timeout -> "timeout"
| `Resolved -> "file could not be decoded with the correct content")

let extract_queued_params p =
let l = Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p) in
let l = Option.value ~default:conf_prefetch#get l in
let t = Lang.to_valued_option Lang.to_float (List.assoc "timeout" p) in
(l, t)

let should_fail = Atomic.make false

let () =
Lifecycle.before_core_shutdown ~name:"request.dynamic shutdown" (fun () ->
Atomic.set should_fail true)

class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
(f : Lang.value) prefetch timeout =
~prefetch ~synchronous ~timeout f =
let available () = (not (Atomic.get should_fail)) && available () in
object (self)
inherit source ~name ()
Expand Down Expand Up @@ -180,17 +176,16 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
method seek_source = (self :> Source.source)
method abort_track = Atomic.set should_skip true

method private is_request_ready =
self#current <> None || try self#fetch_request with _ -> false

method can_generate_frame =
let is_ready =
(fun () ->
self#current <> None || try self#fetch_request with _ -> false)
()
in
match is_ready with
match self#is_request_ready with
| true -> true
| false ->
if available () then self#notify_new_request;
false
(* Try one more time in case a new request was queued above. *)
self#is_request_ready

val retrieved : queue_item Queue.t = Queue.create ()
method private queue_size = Queue.length retrieved
Expand Down Expand Up @@ -219,11 +214,24 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available

initializer
self#on_wake_up (fun () ->
let t = Duppy.Async.add Tutils.scheduler ~priority self#feed_queue in
Duppy.Async.wake_up t;
let task =
if synchronous then
{
notify = (fun () -> self#synchronous_feed_queue);
stop = (fun () -> ());
}
else (
let t =
Duppy.Async.add Tutils.scheduler ~priority self#feed_queue
in
{
notify = (fun () -> Duppy.Async.wake_up t);
stop = (fun () -> Duppy.Async.stop t);
})
in
assert (
Atomic.compare_and_set state `Sleeping
(`Started (Unix.gettimeofday (), t))))
(`Started (Unix.gettimeofday (), task))))

method private clear_retrieved =
let rec clear () =
Expand All @@ -238,8 +246,8 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
initializer
self#on_sleep (fun () ->
match Atomic.exchange state `Sleeping with
| `Started (_, t) ->
Duppy.Async.stop t;
| `Started (_, { stop }) ->
stop ();
(* No more feeding task, we can go to sleep. *)
self#end_request;
self#log#info "Cleaning up request queue...";
Expand All @@ -250,8 +258,7 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
opportunity to feed the queue, in case it is sleeping. *)
method private notify_new_request =
match Atomic.get state with
| `Started (d, t) when d <= Unix.gettimeofday () ->
Duppy.Async.wake_up t
| `Started (d, { notify }) when d <= Unix.gettimeofday () -> notify ()
| _ -> ()

(** The body of the feeding task *)
Expand All @@ -266,6 +273,11 @@ class dynamic ?(name = "request.dynamic") ~priority ~retry_delay ~available
d)
| _ -> -1.

method private synchronous_feed_queue =
match self#feed_queue () with
| 0. -> self#synchronous_feed_queue
| _ -> ()

method fetch =
try
let r =
Expand Down Expand Up @@ -350,6 +362,12 @@ let _ =
Some
"Whether some new requests are available (when set to false, it \
stops after current playing request)." );
( "synchronous",
Lang.bool_t,
Some (Lang.bool false),
Some
"If `true`, new requests are prepared as needed instead of using an \
asynchronous queue." );
( "prefetch",
Lang.nullable_t Lang.int_t,
Some Lang.null,
Expand Down Expand Up @@ -435,5 +453,13 @@ let _ =
| "non_blocking" -> `Non_blocking
| n -> `Named n
in
let l, t = extract_queued_params p in
new dynamic ~available ~priority ~retry_delay f l t)
let prefetch =
Lang.to_valued_option Lang.to_int (List.assoc "prefetch" p)
in
let prefetch = Option.value ~default:conf_prefetch#get prefetch in
let synchronous = Lang.to_bool (List.assoc "synchronous" p) in
let timeout =
Lang.to_valued_option Lang.to_float (List.assoc "timeout" p)
in
new dynamic
~available ~priority ~retry_delay ~prefetch ~timeout ~synchronous f)
1 change: 1 addition & 0 deletions src/libs/extra/native.liq
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def native.request.dynamic(%argsof(request.dynamic), f) =
ignore(available)
ignore(timeout)
ignore(native)
ignore(synchronous)

def f() =
try
Expand Down
53 changes: 40 additions & 13 deletions src/libs/request.liq
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def request.single(
) =
id = string.id.default(default="single", id)

fallible = fallible ?? getter.is_constant(r)
fallible = fallible ?? not getter.is_constant(r)

infallible =
if
Expand All @@ -302,9 +302,20 @@ def request.single(
false
end

if
not fallible and not infallible
then
log.severe(
label=id,
"Source was marked a infallible but its request is not a static file. The \
source is considered fallible for backward compatibility but this will \
fail in future versions!"
)
end

static_request = ref(null())

def on_wake_up() =
def on_wake_up(s) =
if
infallible
then
Expand All @@ -315,8 +326,9 @@ def request.single(
label=id,
"#{uri} is static, resolving once for all..."
)

if
not request.resolve(initial_request, timeout=timeout)
not request.resolve(initial_request, timeout=timeout, content_type=s)
then
request.destroy(initial_request)
error.raise(
Expand All @@ -339,20 +351,35 @@ def request.single(
end

def next() =
def next() =
static_request() ?? getter.get(r)
end
static_request() ?? getter.get(r)
end

s = request.dynamic(prefetch=prefetch, thread_queue=thread_queue, next)
if infallible then s.set_queue([next()]) end
s
def mk_source(id) =
request.dynamic(
id=id,
prefetch=prefetch,
thread_queue=thread_queue,
synchronous=infallible,
next
)
end

# We want to mark infallible source as such. `source.dynamic` is a nice
# way to do it as it will raise a user-friendly error in case the underlying
# source does not respect the conditions for being infallible.
s =
source.dynamic(
id=id, infallible=infallible, self_sync=false, track_sensitive=true, next
)
s.on_wake_up(on_wake_up)
if
infallible
then
s = mk_source("#{id}.actual")
source.dynamic(
id=id, infallible=infallible, self_sync=false, track_sensitive=true, {s}
)
else
mk_source(id)
end

s.on_wake_up(fun () -> on_wake_up(s))
s.on_shutdown(on_shutdown)
(s : source_methods)
end
Expand Down
Loading