Skip to content

Commit

Permalink
fix: resolve discover-then-connect timeout
Browse files Browse the repository at this point in the history
for below cases, when `timeout` is not provided or provided less
than `connect_timeout`, choose the greater value.
  • Loading branch information
zmstone committed Oct 29, 2024
1 parent dfea85f commit 67d2ed2
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ edoc:
ex_doc:
@$(rebar_cmd) ex_doc

.PHONY: dialyze
dialyze: compile
.PHONY: dialyze dialyzer
dialyze: dialyzer
dialyzer: compile
@$(rebar_cmd) dialyzer

.PHONY: hex-publish
Expand Down
7 changes: 7 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
* 4.1.10
- Resolve timeout value for discover and connect
- partition leader
- consumer group coordinator
- cluster controller
Choose the greater value of connect timeout and request timeout.

* 4.1.9
- Upgrade crc32cer to 0.1.11 for build issue fix on OTP 27.

Expand Down
13 changes: 10 additions & 3 deletions src/kpro_brokers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ with_connection(Endpoints, Config, Fun) ->
topic(), partition(), #{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun =
fun(C) -> discover_partition_leader(C, Topic, Partition, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).
Expand All @@ -96,7 +96,7 @@ connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
connect_coordinator(Bootstrap, Config, #{ type := Type
, id := Id
} = Args) ->
Timeout = maps:get(timeout, Args, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Args),
DiscoverFun = fun(Conn) -> discover_coordinator(Conn, Type, Id, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand All @@ -105,7 +105,7 @@ connect_coordinator(Bootstrap, Config, #{ type := Type
#{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_controller(Bootstrap, Config, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun = fun(Conn) -> discover_controller(Conn, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand Down Expand Up @@ -318,6 +318,13 @@ random_order(L) ->
RI = lists:sort(lists:zip(RandL, L)),
[I || {_R, I} <- RI].

resolve_timeout(ConnConfig, Opts) when is_list(ConnConfig) ->
resolve_timeout(maps:from_list(ConnConfig), Opts);
resolve_timeout(ConnConfig, Opts) ->
ConnectTimeout = kpro_connection:get_connect_timeout(ConnConfig),
RequestTimeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
max(ConnectTimeout, RequestTimeout).

-ifdef(TEST).

api_vsn_range_intersection_test() ->
Expand Down
1 change: 1 addition & 0 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
, start/3
, stop/1
, debug/2
, get_connect_timeout/1
]).

%% system calls support for worker process
Expand Down

0 comments on commit 67d2ed2

Please sign in to comment.