Skip to content

Commit

Permalink
fix: resolve discover-then-connect timeout
Browse files Browse the repository at this point in the history
for below cases, when `timeout` is not provided or provided less
than `connect_timeout`, choose the greater value.
  • Loading branch information
zmstone committed Oct 29, 2024
1 parent dfea85f commit 1579495
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
7 changes: 7 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
* 4.1.10
- Resolve timeout value for discover and connect
- partition leader
- consumer group coordinator
- cluster controller
Choose the greater value of connect timeout and request timeout.

* 4.1.9
- Upgrade crc32cer to 0.1.11 for build issue fix on OTP 27.

Expand Down
11 changes: 8 additions & 3 deletions src/kpro_brokers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ with_connection(Endpoints, Config, Fun) ->
topic(), partition(), #{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun =
fun(C) -> discover_partition_leader(C, Topic, Partition, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).
Expand All @@ -96,7 +96,7 @@ connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
connect_coordinator(Bootstrap, Config, #{ type := Type
, id := Id
} = Args) ->
Timeout = maps:get(timeout, Args, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Args),
DiscoverFun = fun(Conn) -> discover_coordinator(Conn, Type, Id, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand All @@ -105,7 +105,7 @@ connect_coordinator(Bootstrap, Config, #{ type := Type
#{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_controller(Bootstrap, Config, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun = fun(Conn) -> discover_controller(Conn, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand Down Expand Up @@ -318,6 +318,11 @@ random_order(L) ->
RI = lists:sort(lists:zip(RandL, L)),
[I || {_R, I} <- RI].

resolve_timeout(ConnConfig, Opts) ->
ConnectTimeout = kpro_connection:get_connect_timeout(ConnConfig),
RequestTimeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
max(ConnectTimeout, RequestTimeout).

-ifdef(TEST).

api_vsn_range_intersection_test() ->
Expand Down
1 change: 1 addition & 0 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
, start/3
, stop/1
, debug/2
, get_connect_timeout/1
]).

%% system calls support for worker process
Expand Down

0 comments on commit 1579495

Please sign in to comment.