Skip to content

Commit

Permalink
Merge pull request #75 from Nordix/add_config_manipulation
Browse files Browse the repository at this point in the history
Add describe_configs and alter_configs to API
  • Loading branch information
mikpe authored Apr 22, 2020
2 parents fd7c4a1 + 1a789e0 commit ec235a3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 3 deletions.
5 changes: 4 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
* 2.4.0
- Add Describe and Alter Configs APIs, part of KIP-133

* 2.3.6
- Upgrade snappyer and crc32cer to fix build in windows
- Upgrade snappyer and crc32cer to fix build in windows

* 2.3.5
- Improve produce request encoding performance by 35%
Expand Down
28 changes: 27 additions & 1 deletion src/kpro_req_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
, delete_topics/3
]).

-export([ describe_configs/3
, alter_configs/3
]).

-export([ encode/3
, make/3
]).
Expand Down Expand Up @@ -285,7 +289,7 @@ add_offsets_to_txn(TxnCtx, CgId) ->
%% @doc Make `create_topics' request.
%% if 0 is given as `timeout' option the request will trigger a creation
%% but return immediately.
%% `validate_only' option is only relavent when the API version is
%% `validate_only' option is only relevant when the API version is
%% greater than 0.
-spec create_topics(vsn(), [Topics :: kpro:struct()],
#{timeout => kpro:int32(),
Expand Down Expand Up @@ -321,6 +325,28 @@ delete_topics(Vsn, Topics, Opts) ->
},
make(delete_topics, Vsn, Body).

%% @doc Make a `describe_configs' request.
%% `include_synonyms' option is only relevant when the API version is
%% greater than 0.
-spec describe_configs(vsn(), [Resources :: kpro:struct()],
#{include_synonyms => boolean()}) -> req().
describe_configs(Vsn, Resources, Opts) ->
IncludeSynonyms = maps:get(include_synonyms, Opts, false),
Body = #{ resources => Resources
, include_synonyms => IncludeSynonyms
},
make(describe_configs, Vsn, Body).

%% @doc Make an `alter_configs' request.
-spec alter_configs(vsn(), [Resources :: kpro:struct()],
#{validate_only => boolean()}) -> req().
alter_configs(Vsn, Resources, Opts) ->
ValidateOnly = maps:get(validate_only, Opts, false),
Body = #{ resources => Resources
, validate_only => ValidateOnly
},
make(alter_configs, Vsn, Body).

%% @doc Help function to make a request body.
-spec make(api(), vsn(), struct()) -> req().
make(API, Vsn, Fields) ->
Expand Down
10 changes: 10 additions & 0 deletions test/kpro_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ parse_rsp(#kpro_rsp{ api = create_partitions
, msg = Msg
}) ->
error_if_any(kpro:find(topic_errors, Msg));
parse_rsp(#kpro_rsp{ api = describe_configs
, msg = Msg
}) ->
Resources = kpro:find(resources, Msg),
ok = error_if_any(Resources),
Resources;
parse_rsp(#kpro_rsp{ api = alter_configs
, msg = Msg
}) ->
error_if_any(kpro:find(resources, Msg));
parse_rsp(#kpro_rsp{msg = Msg}) ->
Msg.

Expand Down
73 changes: 72 additions & 1 deletion test/kpro_topic_mngr_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
%% create_topics
%% delete_topics
%% delete_records
%% describe_configs
%% alter_configs
-module(kpro_topic_mngr_tests).

-include_lib("eunit/include/eunit.hrl").
-include("kpro_private.hrl").

-define(RESOURCE_TYPE_TOPIC, 2).

%% Create a random-name partition with 1 partition 1 replica
%% Increase partition number to 2
create_topic_partition_test() ->
Expand Down Expand Up @@ -68,6 +72,59 @@ test_create_topic_partition(CreateTopicsVsn, CreatePartitionsVsn) ->
end
end).

%% Get all configurations for a topic.
describe_configs_test() ->
Vsn = get_max_api_vsn(describe_configs),
test_describe_configs(Vsn).

test_describe_configs(false) ->
io:format(user, " skipped ", []);
test_describe_configs(Vsn) ->
{ok, [Topic | _]} = get_test_topics(),
DescribeConfigArgs =
#{ resource_type => ?RESOURCE_TYPE_TOPIC
, resource_name => Topic
, config_names => ?null %% Get all configs
},
Opts = #{include_synonyms => false},
Req = kpro_req_lib:describe_configs(Vsn, [DescribeConfigArgs], Opts),
kpro_test_lib:with_connection(
fun(Endpoints, Config) -> kpro:connect_controller(Endpoints, Config) end,
fun(Conn) ->
{ok, Rsp} = kpro:request_sync(Conn, Req, infinity),
Resources = kpro_test_lib:parse_rsp(Rsp),
?assertMatch([#{ resource_name := Topic }], Resources)
end).

%% Alter the configuration for a topic.
alter_configs_test() ->
Vsn = get_max_api_vsn(alter_configs),
test_alter_configs(Vsn).

test_alter_configs(false) ->
io:format(user, " skipped ", []);
test_alter_configs(Vsn) ->
{ok, [Topic | _]} = get_test_topics(),
AlterConfigsArgs =
#{ resource_type => ?RESOURCE_TYPE_TOPIC
, resource_name => Topic
, config_entries => [
[ {config_name, "cleanup.policy"}
, {config_value, <<"compact">>}]
]
},
Opts = #{validate_only => false},
Req = kpro_req_lib:alter_configs(Vsn, [AlterConfigsArgs], Opts),
DescribeVsn = get_max_api_vsn(describe_configs),
kpro_test_lib:with_connection(
fun(Endpoints, Config) -> kpro:connect_controller(Endpoints, Config) end,
fun(Conn) ->
validate_topic_config(DescribeVsn, Conn, Topic, "cleanup.policy", <<"delete">>),
{ok, Rsp} = kpro:request_sync(Conn, Req, infinity),
ok = kpro_test_lib:parse_rsp(Rsp),
validate_topic_config(DescribeVsn, Conn, Topic, "cleanup.policy", <<"compact">>)
end).

%% Delete all topics created in this test module.
delete_topics_test() ->
Timeout = case is_integer(get_max_api_vsn(create_partitions)) of
Expand Down Expand Up @@ -119,7 +176,7 @@ get_test_topics(Connection) ->
ErrorCode = ?no_error, %% assert
Name = kpro:find(topic, Topic),
case lists:prefix(atom_to_list(?MODULE),
binary_to_list(Name)) of
binary_to_list(Name)) of
true -> [Name | Acc];
false -> Acc
end
Expand All @@ -141,6 +198,20 @@ get_max_api_vsn(API) ->

rand() -> rand:uniform(1000000).

validate_topic_config(false, _, _, _, _) ->
not_available;
validate_topic_config(Vsn, Conn, Topic, ConfigName, ConfigValue) ->
DescribeConfigArgs =
#{ resource_type => ?RESOURCE_TYPE_TOPIC
, resource_name => Topic
, config_names => [ConfigName]
},
Req = kpro_req_lib:describe_configs(Vsn, [DescribeConfigArgs], #{}),
{ok, Rsp} = kpro:request_sync(Conn, Req, infinity),
[Resource] = kpro_test_lib:parse_rsp(Rsp),
[Entry] = kpro:find(config_entries, Resource),
?assertEqual(ConfigValue, kpro:find(config_value, Entry)).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down

0 comments on commit ec235a3

Please sign in to comment.