Skip to content

Commit

Permalink
Allow channel handler to control adjust_window message sending
Browse files Browse the repository at this point in the history
The channel handler callback module can implement the get_adjust/0 function
returning either 'immediate' or 'delayed' values.
In the latter case the channel handler module is responsible for invoking
ssh_connection:adjust_window/3 to send ssh_msg_adjust_window to the peer.
  • Loading branch information
yarisx committed Sep 13, 2024
1 parent ea35529 commit 9826e65
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
36 changes: 30 additions & 6 deletions lib/ssh/src/ssh_client_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ The following message is taken care of by the `ssh_client_channel` behavior.
channel_cb,
channel_state,
channel_id,
channel_adjust_fun, % :: fun/2
close_sent = false
}).

Expand Down Expand Up @@ -351,6 +352,24 @@ The user is responsible for any initialization of the process and must call
enter_loop(State) ->
gen_server:enter_loop(?MODULE, [], State).

check_adjust_fun(Cb, ChState) ->
case catch Cb:get_adjust(ChState) of
Val when Val == immediate orelse Val == delayed ->
%% The existence of the get_adjust should not change in runtime
%% So it should be safe to use it here
fun(Msg, ChannelState) ->
Adjust = Cb:get_adjust(ChannelState),
if Adjust == immediate ->
adjust_window(Msg);
true -> % delayed
ok
end
end;
_ ->
%% If the channel handler is not aware that it can manage adjustments
%% then OTP SSH function is used
fun(Msg, _) -> adjust_window(Msg) end
end.
%%====================================================================
%% gen_server callbacks
%%====================================================================
Expand Down Expand Up @@ -398,17 +417,21 @@ init([Options]) ->
process_flag(trap_exit, true),
try Cb:init(channel_cb_init_args(Options)) of
{ok, ChannelState} ->
ChannelAdjustFun = check_adjust_fun(Cb, ChannelState),
State = #state{cm = ConnectionManager,
channel_cb = Cb,
channel_id = ChannelId,
channel_state = ChannelState},
channel_state = ChannelState,
channel_adjust_fun = ChannelAdjustFun},
self() ! {ssh_channel_up, ChannelId, ConnectionManager},
{ok, State};
{ok, ChannelState, Timeout} ->
ChannelAdjustFun = check_adjust_fun(Cb, ChannelState),
State = #state{cm = ConnectionManager,
channel_cb = Cb,
channel_id = ChannelId,
channel_state = ChannelState},
channel_state = ChannelState,
channel_adjust_fun = ChannelAdjustFun},
self() ! {ssh_channel_up, ChannelId, ConnectionManager},
{ok, State, Timeout};
{stop, Why} ->
Expand Down Expand Up @@ -496,14 +519,15 @@ handle_info({ssh_cm, ConnectionManager, {closed, ChannelId}},
(catch ssh_connection:close(ConnectionManager, ChannelId)),
{stop, normal, State#state{close_sent = true}};

handle_info({ssh_cm, _, _} = Msg, #state{channel_cb = Module,
channel_state = ChannelState0} = State) ->
handle_info({ssh_cm, _, _} = Msg, #state{channel_cb = Module,
channel_adjust_fun = AdjustFun,
channel_state = ChannelState0} = State) ->
try Module:handle_ssh_msg(Msg, ChannelState0) of
{ok, ChannelState} ->
adjust_window(Msg),
AdjustFun(Msg, ChannelState),
{noreply, State#state{channel_state = ChannelState}};
{ok, ChannelState, Timeout} ->
adjust_window(Msg),
AdjustFun(Msg, ChannelState),
{noreply, State#state{channel_state = ChannelState}, Timeout};
{stop, ChannelId, ChannelState} ->
do_the_close(Msg, ChannelId, State#state{channel_state = ChannelState})
Expand Down
15 changes: 15 additions & 0 deletions lib/ssh/src/ssh_connection_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@
-define(call_disconnectfun_and_log_cond(LogMsg, DetailedText, StateName, D),
call_disconnectfun_and_log_cond(LogMsg, DetailedText, ?MODULE, ?LINE, StateName, D)).

%% Minimum number of bytes reported by the "upper layer" that cause
%% #ssh_msg_channel_adjust_window to be sent to the SSH peer
-define(MIN_ADJUST, 64).

%%====================================================================
%% Start / stop
%%====================================================================
Expand Down Expand Up @@ -834,6 +838,17 @@ handle_event(cast, {adjust_window,ChannelId,Bytes}, StateName, D) when ?CONNECTE
Channel#channel{recv_window_pending = Pending + Bytes}),
keep_state_and_data;

#channel{recv_window_size = WinSize,
recv_window_pending = Pending,
recv_packet_size = _PktSize} = Channel
when ((Bytes + Pending) < ?MIN_ADJUST andalso (WinSize > 0)) ->
%% It does not make sense to send updates of e.g. 1 byte
%% if we are still able to receive something
ssh_client_channel:cache_update(cache(D),
Channel#channel{recv_window_pending =
Pending + Bytes}),
keep_state_and_data;

#channel{recv_window_size = WinSize,
recv_window_pending = Pending,
remote_id = Id} = Channel ->
Expand Down

0 comments on commit 9826e65

Please sign in to comment.