diff --git a/Makefile b/Makefile index 515b018..76adce2 100644 --- a/Makefile +++ b/Makefile @@ -56,8 +56,9 @@ edoc: ex_doc: @$(rebar_cmd) ex_doc -.PHONY: dialyze -dialyze: compile +.PHONY: dialyze dialyzer +dialyze: dialyzer +dialyzer: compile @$(rebar_cmd) dialyzer .PHONY: hex-publish diff --git a/changelog.md b/changelog.md index 6f6d363..88dad00 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,10 @@ +* 4.1.10 + - Resolve timeout value for discover and connect + - partition leader + - consumer group coordinator + - cluster controller + Choose the greater value of connect timeout and request timeout. + * 4.1.9 - Upgrade crc32cer to 0.1.11 for build issue fix on OTP 27. diff --git a/src/kpro_brokers.erl b/src/kpro_brokers.erl index 964a6a4..4249516 100644 --- a/src/kpro_brokers.erl +++ b/src/kpro_brokers.erl @@ -79,7 +79,7 @@ with_connection(Endpoints, Config, Fun) -> topic(), partition(), #{timeout => timeout()}) -> {ok, connection()} | {error, any()}. connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) -> - Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Opts), DiscoverFun = fun(C) -> discover_partition_leader(C, Topic, Partition, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -96,7 +96,7 @@ connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) -> connect_coordinator(Bootstrap, Config, #{ type := Type , id := Id } = Args) -> - Timeout = maps:get(timeout, Args, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Args), DiscoverFun = fun(Conn) -> discover_coordinator(Conn, Type, Id, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -105,7 +105,7 @@ connect_coordinator(Bootstrap, Config, #{ type := Type #{timeout => timeout()}) -> {ok, connection()} | {error, any()}. connect_controller(Bootstrap, Config, Opts) -> - Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Opts), DiscoverFun = fun(Conn) -> discover_controller(Conn, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -318,6 +318,13 @@ random_order(L) -> RI = lists:sort(lists:zip(RandL, L)), [I || {_R, I} <- RI]. +resolve_timeout(ConnConfig, Opts) when is_list(ConnConfig) -> + resolve_timeout(maps:from_list(ConnConfig), Opts); +resolve_timeout(ConnConfig, Opts) -> + ConnectTimeout = kpro_connection:get_connect_timeout(ConnConfig), + RequestTimeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + max(ConnectTimeout, RequestTimeout). + -ifdef(TEST). api_vsn_range_intersection_test() -> diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index f489e98..50d4149 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -28,6 +28,7 @@ , start/3 , stop/1 , debug/2 + , get_connect_timeout/1 ]). %% system calls support for worker process