diff --git a/changelog.md b/changelog.md index 4d65668..b8da7fd 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,8 @@ +* 2.4.0 + - Add Describe and Alter Configs APIs, part of KIP-133 + * 2.3.6 - - Upgrade snappyer and crc32cer to fix build in windows + - Upgrade snappyer and crc32cer to fix build in windows * 2.3.5 - Improve produce request encoding performance by 35% diff --git a/src/kpro_req_lib.erl b/src/kpro_req_lib.erl index cf690c0..ea47d39 100644 --- a/src/kpro_req_lib.erl +++ b/src/kpro_req_lib.erl @@ -43,6 +43,10 @@ , delete_topics/3 ]). +-export([ describe_configs/3 + , alter_configs/3 + ]). + -export([ encode/3 , make/3 ]). @@ -285,7 +289,7 @@ add_offsets_to_txn(TxnCtx, CgId) -> %% @doc Make `create_topics' request. %% if 0 is given as `timeout' option the request will trigger a creation %% but return immediately. -%% `validate_only' option is only relavent when the API version is +%% `validate_only' option is only relevant when the API version is %% greater than 0. -spec create_topics(vsn(), [Topics :: kpro:struct()], #{timeout => kpro:int32(), @@ -321,6 +325,28 @@ delete_topics(Vsn, Topics, Opts) -> }, make(delete_topics, Vsn, Body). +%% @doc Make a `describe_configs' request. +%% `include_synonyms' option is only relevant when the API version is +%% greater than 0. +-spec describe_configs(vsn(), [Resources :: kpro:struct()], + #{include_synonyms => boolean()}) -> req(). +describe_configs(Vsn, Resources, Opts) -> + IncludeSynonyms = maps:get(include_synonyms, Opts, false), + Body = #{ resources => Resources + , include_synonyms => IncludeSynonyms + }, + make(describe_configs, Vsn, Body). + +%% @doc Make an `alter_configs' request. +-spec alter_configs(vsn(), [Resources :: kpro:struct()], + #{validate_only => boolean()}) -> req(). +alter_configs(Vsn, Resources, Opts) -> + ValidateOnly = maps:get(validate_only, Opts, false), + Body = #{ resources => Resources + , validate_only => ValidateOnly + }, + make(alter_configs, Vsn, Body). + %% @doc Help function to make a request body. -spec make(api(), vsn(), struct()) -> req(). make(API, Vsn, Fields) -> diff --git a/test/kpro_test_lib.erl b/test/kpro_test_lib.erl index 44a5527..0969375 100644 --- a/test/kpro_test_lib.erl +++ b/test/kpro_test_lib.erl @@ -180,6 +180,16 @@ parse_rsp(#kpro_rsp{ api = create_partitions , msg = Msg }) -> error_if_any(kpro:find(topic_errors, Msg)); +parse_rsp(#kpro_rsp{ api = describe_configs + , msg = Msg + }) -> + Resources = kpro:find(resources, Msg), + ok = error_if_any(Resources), + Resources; +parse_rsp(#kpro_rsp{ api = alter_configs + , msg = Msg + }) -> + error_if_any(kpro:find(resources, Msg)); parse_rsp(#kpro_rsp{msg = Msg}) -> Msg. diff --git a/test/kpro_topic_mngr_tests.erl b/test/kpro_topic_mngr_tests.erl index 4b8b68c..4b7b641 100644 --- a/test/kpro_topic_mngr_tests.erl +++ b/test/kpro_topic_mngr_tests.erl @@ -17,11 +17,15 @@ %% create_topics %% delete_topics %% delete_records +%% describe_configs +%% alter_configs -module(kpro_topic_mngr_tests). -include_lib("eunit/include/eunit.hrl"). -include("kpro_private.hrl"). +-define(RESOURCE_TYPE_TOPIC, 2). + %% Create a random-name partition with 1 partition 1 replica %% Increase partition number to 2 create_topic_partition_test() -> @@ -68,6 +72,59 @@ test_create_topic_partition(CreateTopicsVsn, CreatePartitionsVsn) -> end end). +%% Get all configurations for a topic. +describe_configs_test() -> + Vsn = get_max_api_vsn(describe_configs), + test_describe_configs(Vsn). + +test_describe_configs(false) -> + io:format(user, " skipped ", []); +test_describe_configs(Vsn) -> + {ok, [Topic | _]} = get_test_topics(), + DescribeConfigArgs = + #{ resource_type => ?RESOURCE_TYPE_TOPIC + , resource_name => Topic + , config_names => ?null %% Get all configs + }, + Opts = #{include_synonyms => false}, + Req = kpro_req_lib:describe_configs(Vsn, [DescribeConfigArgs], Opts), + kpro_test_lib:with_connection( + fun(Endpoints, Config) -> kpro:connect_controller(Endpoints, Config) end, + fun(Conn) -> + {ok, Rsp} = kpro:request_sync(Conn, Req, infinity), + Resources = kpro_test_lib:parse_rsp(Rsp), + ?assertMatch([#{ resource_name := Topic }], Resources) + end). + +%% Alter the configuration for a topic. +alter_configs_test() -> + Vsn = get_max_api_vsn(alter_configs), + test_alter_configs(Vsn). + +test_alter_configs(false) -> + io:format(user, " skipped ", []); +test_alter_configs(Vsn) -> + {ok, [Topic | _]} = get_test_topics(), + AlterConfigsArgs = + #{ resource_type => ?RESOURCE_TYPE_TOPIC + , resource_name => Topic + , config_entries => [ + [ {config_name, "cleanup.policy"} + , {config_value, <<"compact">>}] + ] + }, + Opts = #{validate_only => false}, + Req = kpro_req_lib:alter_configs(Vsn, [AlterConfigsArgs], Opts), + DescribeVsn = get_max_api_vsn(describe_configs), + kpro_test_lib:with_connection( + fun(Endpoints, Config) -> kpro:connect_controller(Endpoints, Config) end, + fun(Conn) -> + validate_topic_config(DescribeVsn, Conn, Topic, "cleanup.policy", <<"delete">>), + {ok, Rsp} = kpro:request_sync(Conn, Req, infinity), + ok = kpro_test_lib:parse_rsp(Rsp), + validate_topic_config(DescribeVsn, Conn, Topic, "cleanup.policy", <<"compact">>) + end). + %% Delete all topics created in this test module. delete_topics_test() -> Timeout = case is_integer(get_max_api_vsn(create_partitions)) of @@ -119,7 +176,7 @@ get_test_topics(Connection) -> ErrorCode = ?no_error, %% assert Name = kpro:find(topic, Topic), case lists:prefix(atom_to_list(?MODULE), - binary_to_list(Name)) of + binary_to_list(Name)) of true -> [Name | Acc]; false -> Acc end @@ -141,6 +198,20 @@ get_max_api_vsn(API) -> rand() -> rand:uniform(1000000). +validate_topic_config(false, _, _, _, _) -> + not_available; +validate_topic_config(Vsn, Conn, Topic, ConfigName, ConfigValue) -> + DescribeConfigArgs = + #{ resource_type => ?RESOURCE_TYPE_TOPIC + , resource_name => Topic + , config_names => [ConfigName] + }, + Req = kpro_req_lib:describe_configs(Vsn, [DescribeConfigArgs], #{}), + {ok, Rsp} = kpro:request_sync(Conn, Req, infinity), + [Resource] = kpro_test_lib:parse_rsp(Rsp), + [Entry] = kpro:find(config_entries, Resource), + ?assertEqual(ConfigValue, kpro:find(config_value, Entry)). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t