Skip to content

Commit

Permalink
Merge pull request #8642 from juhlig/fix_erpc_call_local
Browse files Browse the repository at this point in the history
Introduce always_spawn option to erpc calls

OTP-19343
  • Loading branch information
rickard-green authored Nov 7, 2024
2 parents e57deca + e5eb252 commit c3585a8
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 83 deletions.
174 changes: 92 additions & 82 deletions lib/kernel/src/erpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ is available on the involved nodes.
reqids_add/3,
reqids_to_list/1]).

-export_type([request_id/0, request_id_collection/0, timeout_time/0]).
-export_type([call_options/0,
request_id/0,
request_id_collection/0,
timeout_time/0]).

%% Internal exports (also used by the 'rpc' module)

Expand Down Expand Up @@ -121,45 +124,60 @@ The value can be:
a time further into the future than `4294967295` milliseconds. Identifying the
timeout using an absolute timeout value is especially handy when you have a
deadline for responses corresponding to a complete collection of requests
(`t:request_id_collection/0`) , since you do not have to recalculate the
(`t:request_id_collection/0`), since you do not have to recalculate the
relative time until the deadline over and over again.
""".
-type timeout_time() :: 0..?MAX_INT_TIMEOUT | 'infinity' | {abs, integer()}.

-doc """
Options to be used in [`call/3,5`](`call/5`) and
[`multicall/3,5`](`multicall/5`) functions.
- **`timeout`** - Upper time limit for call operations to complete, see
`t:timeout_time/0`. Default: `infinity`.
- **`always_spawn`** - If `true`, the `apply()` will _always_ be performed
in a freshly spawned process. If `false`, the calling process _may_ be
used instead, if possible. Default: `false`.
""".
-doc(#{since => <<"OTP 28.0">>}).
-type call_options() :: #{'timeout' => Timeout :: timeout_time(),
'always_spawn' => AlwaysSpawn :: boolean()}.

%%------------------------------------------------------------------------
%% Exported API
%%------------------------------------------------------------------------

-doc(#{equiv => call(Node, Fun, infinity)}).
-doc(#{equiv => call(Node, Fun, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Fun) -> Result when
Node :: node(),
Fun :: function(),
Result :: term().

call(N, Fun) ->
call(N, Fun, infinity).
call(N, Fun, #{timeout => infinity}).

-doc """
Equivalent to
[`erpc:call(Node, erlang, apply, [Fun,[]], Timeout)`](`call/5`).
[`erpc:call(Node, erlang, apply, [Fun,[]], #{timeout => Timeout})`](`call/5`).
May raise all the same exceptions as [`call/5`](`call/5`) plus an `{erpc, badarg}`
`error` exception if `Fun` is not a fun of zero arity.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Fun, Timeout) -> Result when
-spec call(Node, Fun, TimeoutOrOptions) -> Result when
Node :: node(),
Fun :: function(),
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

call(N, Fun, Timeout) when is_function(Fun, 0) ->
call(N, erlang, apply, [Fun, []], Timeout);
call(N, Fun, TimeoutOrOptions) when is_function(Fun, 0) ->
call(N, erlang, apply, [Fun, []], TimeoutOrOptions);
call(_N, _Fun, _Timeout) ->
error({?MODULE, badarg}).

-doc(#{equiv => call(Node, Module, Function, Args, infinity)}).
-doc(#{equiv => call(Node, Module, Function, Args, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Module, Function, Args) -> Result when
Node :: node(),
Expand All @@ -169,14 +187,16 @@ call(_N, _Fun, _Timeout) ->
Result :: term().

call(N, M, F, A) ->
call(N, M, F, A, infinity).
call(N, M, F, A, #{timeout => infinity}).

-dialyzer([{nowarn_function, call/5}, no_return]).

-doc """
Evaluates [`apply(Module, Function, Args)`](`apply/3`) on node `Node` and
returns the corresponding value `Result`. `Timeout` sets an upper time limit for
the `call` operation to complete.
returns the corresponding value `Result`.
`TimeoutOrOptions` can be either a [`timeout time`](`t:timeout_time/0`) or a
[`call options`](`t:call_options/0`) map (since OTP 28.0).
The `call()` function only returns if the applied function successfully returned
without raising any uncaught exceptions, the operation did not time out, and no
Expand Down Expand Up @@ -239,23 +259,24 @@ communication may, of course, reach the calling process.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be the calling process itself, a server, or a freshly
> spawned process.
> If the `always_spawn` option is `false` (which is the default), you cannot make
> _any_ assumptions about the process that will perform the `apply()`. It may be
> the calling process itself, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec call(Node, Module, Function, Args, Timeout) -> Result when
-spec call(Node, Module, Function, Args, TimeoutOrOptions) -> Result when
Node :: node(),
Module :: atom(),
Function :: atom(),
Args :: [term()],
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
is_atom(M),
is_atom(F),
is_list(A) ->
call(N, M, F, A, #{timeout := infinity,
always_spawn := false}) when node() =:= N, %% Optimize local call
is_atom(M),
is_atom(F),
is_list(A) ->
try
{return, Return} = execute_call(M,F,A),
Return
Expand All @@ -271,10 +292,12 @@ call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
error({exception, Reason, ErpcStack})
end
end;
call(N, M, F, A, T) when is_atom(N),
is_atom(M),
is_atom(F),
is_list(A) ->
call(N, M, F, A, #{timeout := T,
always_spawn := AlwaysSpawn}) when is_atom(N),
is_atom(M),
is_atom(F),
is_list(A),
is_boolean(AlwaysSpawn) ->
Timeout = timeout_value(T),
Res = make_ref(),
ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A],
Expand All @@ -287,8 +310,15 @@ call(N, M, F, A, T) when is_atom(N),
after Timeout ->
result(timeout, ReqId, Res, undefined)
end;
call(_N, _M, _F, _A, _T) ->
error({?MODULE, badarg}).
call(_N, _M, _F, _A, #{timeout := _T,
always_spawn := _AlwaysSpawn} = _Opts) ->
error({?MODULE, badarg});
call(N, M, F, A, #{} = Opts) ->
call(N, M, F, A, maps:merge(#{timeout => infinity,
always_spawn => false}, Opts));
call(N, M, F, A, T) ->
call(N, M, F, A, #{timeout => T,
always_spawn => false}).

%% Asynchronous call

Expand All @@ -309,11 +339,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Node` is not an atom.
- `Fun` is not a fun of zero arity.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec send_request(Node, Fun) -> RequestId when
Expand Down Expand Up @@ -364,11 +389,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
Equivalent to
[`erpc:send_request(Node, erlang, apply, [Fun,[]]), Label, RequestIdCollection)`](`send_request/6`).
Expand All @@ -377,11 +397,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Node` is not an atom.
- `Fun` is not a fun of zero arity.
- `RequestIdCollection` is detected not to be request identifier collection.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec send_request(Node, Module, Function, Args) -> RequestId when
Expand Down Expand Up @@ -433,11 +448,6 @@ Fails with an `{erpc, badarg}` `error` exception if:
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.
- `RequestIdCollection` is detected not to be request identifier collection.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 25.0">>}).
-spec send_request(Node, Module, Function, Args,
Expand Down Expand Up @@ -992,36 +1002,36 @@ reqids_to_list(_) ->
| {error, {?MODULE, Reason :: term()}}.


-doc(#{equiv => multicall(Nodes, Fun, infinity)}).
-doc(#{equiv => multicall(Nodes, Fun, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Fun) -> Result when
Nodes :: [atom()],
Fun :: function(),
Result :: term().

multicall(Ns, Fun) ->
multicall(Ns, Fun, infinity).
multicall(Ns, Fun, #{timeout => infinity}).

-doc """
Equivalent to
[`erpc:multicall(Nodes, erlang, apply, [Fun,[]], Timeout)`](`multicall/5`).
[`erpc:multicall(Nodes, erlang, apply, [Fun,[]], #{timeout => Timeout})`](`multicall/5`).
May raise all the same exceptions as [`multicall/5`](`multicall/5`) plus an
`{erpc, badarg}` `error` exception if `Fun` is not a fun of zero arity.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Fun, Timeout) -> Result when
-spec multicall(Nodes, Fun, TimeoutOrOptions) -> Result when
Nodes :: [atom()],
Fun :: function(),
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: term().

multicall(Ns, Fun, Timeout) when is_function(Fun, 0) ->
multicall(Ns, erlang, apply, [Fun, []], Timeout);
multicall(_Ns, _Fun, _Timeout) ->
multicall(Ns, Fun, TimeoutOrOptions) when is_function(Fun, 0) ->
multicall(Ns, erlang, apply, [Fun, []], TimeoutOrOptions);
multicall(_Ns, _Fun, _TimeoutOrOptions) ->
error({?MODULE, badarg}).

-doc(#{equiv => multicall(Nodes, Module, Function, Args, infinity)}).
-doc(#{equiv => multicall(Nodes, Module, Function, Args, #{timeout => infinity})}).
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Module, Function, Args) -> Result when
Nodes :: [atom()],
Expand All @@ -1031,14 +1041,18 @@ multicall(_Ns, _Fun, _Timeout) ->
Result :: [{ok, ReturnValue :: term()} | caught_call_exception()].

multicall(Ns, M, F, A) ->
multicall(Ns, M, F, A, infinity).
multicall(Ns, M, F, A, #{timeout => infinity}).

-doc """
Performs multiple `call` operations in parallel on multiple nodes.
That is, evaluates [`apply(Module, Function, Args)`](`apply/3`) on the nodes `Nodes` in
parallel. `Timeout` sets an upper time limit for all `call` operations to
complete. The result is returned as a list where the result from each node is
parallel.
`TimeoutOrOptions` can be either a [`timeout time`](`t:timeout_time/0`) or a
[`call options`](`t:call_options/0`) map (since OTP 28.0).
The result is returned as a list where the result from each node is
placed at the same position as the node name is placed in `Nodes`. Each item in
the resulting list is formatted as either:
Expand Down Expand Up @@ -1094,32 +1108,38 @@ calling process, such communication may, of course, reach the calling process.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be the calling process itself, a server, or a freshly
> spawned process.
> If the `always_spawn` option is `false` (which is the default), you cannot make
> _any_ assumptions about the processes that will perform the `apply()`s. It may be
> the calling process itself, or freshly spawned processes, or a mix of both.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicall(Nodes, Module, Function, Args, Timeout) -> Result when
-spec multicall(Nodes, Module, Function, Args, TimeoutOrOptions) -> Result when
Nodes :: [atom()],
Module :: atom(),
Function :: atom(),
Args :: [term()],
Timeout :: timeout_time(),
TimeoutOrOptions :: timeout_time() | call_options(),
Result :: [{ok, ReturnValue :: term()} | caught_call_exception()].

multicall(Ns, M, F, A, T) ->
multicall(Ns, M, F, A, #{} = Opts) ->
try
true = is_atom(M),
true = is_atom(F),
true = is_list(A),
Tag = make_ref(),
Timeout = timeout_value(T),
SendState = mcall_send_requests(Tag, Ns, M, F, A, Timeout),
Timeout = timeout_value(maps:get(timeout, Opts, infinity)),
LocalCall = case maps:get(always_spawn, Opts, false) of
true -> always_spawn;
false -> allow_local_call
end,
SendState = mcall_send_requests(Tag, Ns, M, F, A, LocalCall, Timeout),
mcall_receive_replies(Tag, SendState)
catch
error:NotIErr when NotIErr /= internal_error ->
error({?MODULE, badarg})
end.
end;
multicall(Ns, M, F, A, T) ->
multicall(Ns, M, F, A, #{timeout => T}).

-doc """
Equivalent to
Expand Down Expand Up @@ -1155,11 +1175,6 @@ if:
- `Function` is not an atom.
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec multicast(Nodes, Module, Function, Args) -> 'ok' when
Expand Down Expand Up @@ -1215,11 +1230,6 @@ ignored.
- `Function` is not an atom.
- `Args` is not a list. Note that the list is not verified to be a proper list
at the client side.
> #### Note {: .info }
>
> You cannot make _any_ assumptions about the process that will perform the
> `apply()`. It may be a server, or a freshly spawned process.
""".
-doc(#{since => <<"OTP 23.0">>}).
-spec cast(Node, Module, Function, Args) -> 'ok' when
Expand Down Expand Up @@ -1515,9 +1525,9 @@ mcall_send_request(T, N, M, F, A) when is_reference(T),
{reply_tag, T},
{monitor, [{tag, T}]}]).

mcall_send_requests(Tag, Ns, M, F, A, Tmo) ->
mcall_send_requests(Tag, Ns, M, F, A, LC, Tmo) ->
DL = deadline(Tmo),
mcall_send_requests(Tag, Ns, M, F, A, [], DL, undefined, 0).
mcall_send_requests(Tag, Ns, M, F, A, [], DL, LC, 0).

mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
%% Timeout infinity and call on local node wanted;
Expand All @@ -1527,7 +1537,7 @@ mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) ->
{ok, RIDs, #{}, NRs, DL};
mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs,
infinity, undefined, NRs) when N == node() ->
infinity, allow_local_call, NRs) when N == node() ->
mcall_send_requests(Tag, Ns, M, F, A, [local_call|RIDs],
infinity, local_call, NRs);
mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, DL, LC, NRs) ->
Expand Down
Loading

0 comments on commit c3585a8

Please sign in to comment.