From fa2888358d696fa515c7e265b551e85f087e794a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Oct 2024 12:12:08 -0300 Subject: [PATCH 1/3] fix: don't crash if stopping an unknown consumer ``` =ERROR REPORT==== 25-Oct-2024::12:11:26.244821 === ** Generic server t_double_stop_consumer terminating ** Last message in was {stop_consumer,<<"brod-client-SUITE-topic">>} ** When Server state == {state,t_double_stop_consumer, [{"localhost",9192}], <0.664.0>,[],<0.667.0>,<0.668.0>, [{get_metadata_timeout_seconds,10}], t_double_stop_consumer} ** Reason for termination == ** {{badmatch,{error,not_found}}, [{brod_client,handle_call,3, [{file,"/home/thales/dev/emqx/brod/src/brod_client.erl"}, {line,391}]}, {gen_server,try_handle_call,4,[{file,"gen_server.erl"},{line,1131}]}, {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,1160}]}, {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}]} ** Client <0.661.0> stacktrace ** [{gen,do_call,4,[{file,"gen.erl"},{line,240}]}, {gen_server,call,3,[{file,"gen_server.erl"},{line,415}]}, {brod_client,safe_gen_call,3, [{file,"/home/thales/dev/emqx/brod/src/brod_client.erl"}, {line,979}]}, {brod_client_SUITE,t_double_stop_consumer,1, [{file,"/home/thales/dev/emqx/brod/test/brod_client_SUITE.erl"}, {line,395}]}, {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]}, {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1302}]}, {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1234}]}] =CRASH REPORT==== 25-Oct-2024::12:11:26.244863 === crasher: initial call: brod_client:init/1 pid: <0.663.0> registered_name: t_double_stop_consumer exception error: no match of right hand side value {error,not_found} in function brod_client:handle_call/3 (/home/thales/dev/emqx/brod/src/brod_client.erl, line 391) in call from gen_server:try_handle_call/4 (gen_server.erl, line 1131) in call from gen_server:handle_msg/6 (gen_server.erl, line 1160) ancestors: [brod_sup,<0.657.0>] message_queue_len: 3 messages: [{'EXIT',<0.667.0>,shutdown}, {'EXIT',<0.668.0>,shutdown}, {'EXIT',<0.664.0>,shutdown}] links: [<0.658.0>] dictionary: [{rand_seed,{#{type => exsss,next => #Fun, bits => 58,uniform => #Fun, uniform_n => #Fun, jump => #Fun}, [52087848521744273|168058237035269794]}}] trap_exit: true status: running heap_size: 1598 stack_size: 28 reductions: 2571 neighbours: =ERROR REPORT==== 25-Oct-2024::12:11:26.244935 === supervisor: {local,brod_sup} errorContext: child_terminated reason: {{badmatch,{error,not_found}}, [{brod_client,handle_call,3, [{file,"/home/thales/dev/emqx/brod/src/brod_client.erl"}, {line,391}]}, {gen_server,try_handle_call,4, [{file,"gen_server.erl"},{line,1131}]}, {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,1160}]}, {proc_lib,init_p_do_apply,3, [{file,"proc_lib.erl"},{line,241}]}]} offender: [{pid,<0.663.0>}, {id,t_double_stop_consumer}, {mfargs,{brod_client,start_link, [[{"localhost",9192}], t_double_stop_consumer, [{get_metadata_timeout_seconds,10}]]}}, {restart_type,{permanent,10}}, {shutdown,5000}, {child_type,worker}] ``` --- src/brod_client.erl | 4 ++-- test/brod_client_SUITE.erl | 22 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/brod_client.erl b/src/brod_client.erl index d52f1b38..b708578c 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -388,8 +388,8 @@ handle_call({stop_producer, Topic}, _From, State) -> ok = brod_producers_sup:stop_producer(State#state.producers_sup, Topic), {reply, ok, State}; handle_call({stop_consumer, Topic}, _From, State) -> - ok = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic), - {reply, ok, State}; + Reply = brod_consumers_sup:stop_consumer(State#state.consumers_sup, Topic), + {reply, Reply, State}; handle_call({get_leader_connection, Topic, Partition}, _From, State) -> {Result, NewState} = do_get_leader_connection(State, Topic, Partition), {reply, Result, NewState}; diff --git a/test/brod_client_SUITE.erl b/test/brod_client_SUITE.erl index eff92257..6217a687 100644 --- a/test/brod_client_SUITE.erl +++ b/test/brod_client_SUITE.erl @@ -44,6 +44,7 @@ , t_sasl_callback/1 , t_magic_version/1 , t_get_partitions_count_safe/1 + , t_double_stop_consumer/1 ]). -include_lib("common_test/include/ct.hrl"). @@ -51,9 +52,9 @@ -include("brod_int.hrl"). -define(HOST, "localhost"). --define(HOSTS, [{?HOST, 9092}]). --define(HOSTS_SSL, [{?HOST, 9093}]). --define(HOSTS_SASL_SSL, [{?HOST, 9094}]). +-define(HOSTS, [{?HOST, 9192}]). +-define(HOSTS_SSL, [{?HOST, 9193}]). +-define(HOSTS_SASL_SSL, [{?HOST, 9194}]). -define(TOPIC, <<"brod-client-SUITE-topic">>). -define(WAIT(PATTERN, RESULT, TIMEOUT), @@ -379,6 +380,21 @@ t_magic_version(Config) when is_list(Config) -> ?assert(is_integer(Ts)) end. +t_double_stop_consumer({init, Config}) -> Config; +t_double_stop_consumer({'end', Config}) -> + brod:stop_client(?FUNCTION_NAME), + Config; +t_double_stop_consumer(Config) when is_list(Config) -> + Client = ?FUNCTION_NAME, + ClientConfig = [{get_metadata_timeout_seconds, 10}], + ok = start_client(?HOSTS, Client, ClientConfig), + ok = brod:start_consumer(Client, ?TOPIC, []), + ?assertMatch({ok, _}, brod_client:get_consumer(Client, ?TOPIC, 0)), + ?assertMatch(ok, brod_client:stop_consumer(Client, ?TOPIC)), + ?assertMatch({error, {consumer_not_found, _}}, brod_client:get_consumer(Client, ?TOPIC, 0)), + ?assertMatch({error, not_found}, brod_client:stop_consumer(Client, ?TOPIC)), + ok. + %%%_* Help functions =========================================================== %% mocked callback From 378f17236ada7d56378cbb5a43cde345270ce358 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Oct 2024 13:48:10 -0300 Subject: [PATCH 2/3] fix(group subscriber v2): cleanup consumer processes on shutdown --- src/brod_group_subscriber_v2.erl | 17 +++++++++++++++-- test/brod_group_subscriber_SUITE.erl | 27 +++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index c1e43fdd..47ece4b0 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -395,12 +395,15 @@ handle_info(_Info, State) -> %%-------------------------------------------------------------------- -spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), State :: term()) -> any(). -terminate(_Reason, #state{workers = Workers, +terminate(_Reason, #state{config = Config, + workers = Workers, coordinator = Coordinator, group_id = GroupId }) -> ok = terminate_all_workers(Workers), - ok = flush_offset_commits(GroupId, Coordinator). + ok = flush_offset_commits(GroupId, Coordinator), + ok = stop_consumers(Config), + ok. %%%=================================================================== %%% Internal functions @@ -529,6 +532,16 @@ do_ack(Topic, Partition, Offset, #state{ workers = Workers {error, unknown_topic_or_partition} end. +stop_consumers(Config) -> + #{ client := Client + , topics := Topics + } = Config, + lists:foreach( + fun(Topic) -> + _ = brod_client:stop_consumer(Client, Topic) + end, + Topics). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index 7ad69d14..a08cf569 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -52,6 +52,7 @@ , t_assign_partitions_handles_updating_state/1 , t_get_workers/1 , v2_coordinator_crash/1 + , v2_consumer_cleanup/1 , v2_subscriber_shutdown/1 , v2_subscriber_assignments_revoked/1 ]). @@ -97,6 +98,7 @@ groups() -> , t_assign_partitions_handles_updating_state , t_get_workers , v2_coordinator_crash + , v2_consumer_cleanup , v2_subscriber_shutdown , v2_subscriber_assignments_revoked ]} @@ -391,6 +393,31 @@ v2_coordinator_crash(Config) when is_list(Config) -> ok end). +%% Checks that we don't leave `brod_consumer' processes lingering after we stop a group +%% subscriber v2. +v2_consumer_cleanup(Config) when is_list(Config) -> + InitArgs = #{}, + Topic = ?topic, + Partition = 0, + Client = ?CLIENT_ID, + ?check_trace( + #{timetrap => 5_000}, + begin + {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], InitArgs), + %% Send a message to the topic and wait until it's received to make sure + %% the subscriber is stable: + produce({Topic, Partition}, <<0>>), + {ok, _} = ?wait_message(Topic, Partition, <<0>>, _), + ?assertMatch({ok, _}, brod_client:get_consumer(Client, Topic, Partition)), + ok = stop_subscriber(Config, SubscriberPid), + ?assertMatch({error, {consumer_not_found, _}}, + brod_client:get_consumer(Client, Topic, Partition)), + ok + end, + [] + ), + ok. + v2_subscriber_shutdown(Config) when is_list(Config) -> %% Test graceful shutdown of the group subscriber: InitArgs = #{async_ack => true}, From 55b9ed5f44eb9a2187496c87395c419d578be7d7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Oct 2024 13:51:00 -0300 Subject: [PATCH 3/3] docs: add changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e28a5215..69ece89b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +- 4.3.1 + - Fixed `brod_client:stop_consumer` so that it doesn't crash the client process if an unknown consumer is given as argument. + - Previously, `brod_group_subscriber_v2` could leave `brod_consumer` processes lingering even after its shutdown. Now, those processes are terminated. + - 4.3.0 - Split brod-cli out to a separate project [kafka4beam/brod-cli](https://github.com/kafka4beam/brod-cli)