Skip to content

Commit

Permalink
feat: add zstd compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Jul 24, 2021
1 parent 9ee00f2 commit 531a6d3
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 10 deletions.
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* 4.1.0
- Add support for `zstd` compression codec
* 4.0.2
- Bug fix: empty bytes is encoded to `0`, but not `-1`
- Respect `connect` API's timeout parameter as an overall timeout, rather not the timeout for each internal step
Expand Down
6 changes: 6 additions & 0 deletions include/kpro_private.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
-define(gzip, gzip).
-define(snappy, snappy).
-define(lz4, lz4).
-define(zstd, zstd).

%% Compression attributes
-define(KPRO_COMPRESS_NONE, 0).
-define(KPRO_COMPRESS_GZIP, 1).
-define(KPRO_COMPRESS_SNAPPY, 2).
-define(KPRO_COMPRESS_LZ4, 3).
-define(KPRO_COMPRESS_ZSTD, 4).

-define(KPRO_COMPRESSION_MASK, 2#111).
-define(KPRO_IS_GZIP_ATTR(ATTR),
Expand All @@ -38,6 +40,9 @@
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_SNAPPY)).
-define(KPRO_IS_LZ4_ATTR(ATTR),
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_LZ4)).
-define(KPRO_IS_ZSTD_ATTR(ATTR),
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_ZSTD)).


-define(KPRO_TS_TYPE_CREATE, 0).
-define(KPRO_TS_TYPE_APPEND, 2#1000).
Expand Down Expand Up @@ -95,6 +100,7 @@
-define(KAFKA_0_11, 11).
-define(KAFKA_1_0, 100).
-define(KAFKA_1_1, 110).
-define(KAFKA_2_1, 210).

-ifdef(OTP_RELEASE).
-define(BIND_STACKTRACE(Var), :Var).
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[ { test,
[ {deps,
[ {snappyer, "1.2.8"},
{lz4b, "0.0.8"}
{lz4b, "0.0.8"},
{zstd, {git, "https://github.com/zmstone/zstd-erlang.git", {tag, "v0.3.0-zm"}}}
]}
]
}
Expand Down
17 changes: 11 additions & 6 deletions src/kpro_compress.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

%% @doc Set snappy or lz4 compression modules.
%% This should override the default usage of `snappyer' and `lz4b_frame'.
-spec provide([{snappy | lz4, module()}]) -> ok.
-spec provide([{snappy | lz4 | zstd | module()}]) -> ok.
provide(Libs) ->
lists:foreach(fun({Name, Module}) ->
persistent_term:put({?MODULE, Name}, Module)
Expand All @@ -37,12 +37,14 @@ provide(Libs) ->
codec_to_method(A) when ?KPRO_IS_GZIP_ATTR(A) -> ?gzip;
codec_to_method(A) when ?KPRO_IS_SNAPPY_ATTR(A) -> ?snappy;
codec_to_method(A) when ?KPRO_IS_LZ4_ATTR(A) -> ?lz4;
codec_to_method(A) when ?KPRO_IS_ZSTD_ATTR(A) -> ?zstd;
codec_to_method(_) -> ?no_compression.

%% @doc Translate compression method to bits for kafka batch attributes.
method_to_codec(?gzip) -> ?KPRO_COMPRESS_GZIP;
method_to_codec(?snappy) -> ?KPRO_COMPRESS_SNAPPY;
method_to_codec(?lz4) -> ?KPRO_COMPRESS_LZ4;
method_to_codec(?zstd) -> ?KPRO_COMPRESS_ZSTD;
method_to_codec(?no_compression) -> ?KPRO_COMPRESS_NONE.

%% @doc Compress encoded batch.
Expand All @@ -56,7 +58,8 @@ compress(Name, IoData) -> do_compress(Name, IoData).
decompress(?no_compression, Bin) -> Bin;
decompress(?gzip, Bin) -> zlib:gunzip(Bin);
decompress(?snappy, Bin) -> java_snappy_unpack(Bin);
decompress(?lz4, Bin) -> do_decompress(?lz4, Bin).
decompress(?lz4, Bin) -> do_decompress(?lz4, Bin);
decompress(?zstd, Bin) -> do_decompress(?zstd, Bin).

%%%_* Internals ================================================================

Expand Down Expand Up @@ -90,10 +93,12 @@ do_decompress(Name, Bin) ->
Module = get_module(Name),
iodata(Module:decompress(Bin)).

get_module(snappy) ->
get_module(snappy, snappyer);
get_module(lz4) ->
get_module(lz4, lz4b_frame).
get_module(?snappy) ->
get_module(?snappy, snappyer);
get_module(?lz4) ->
get_module(?lz4, lz4b_frame);
get_module(?zstd) ->
get_module(?zstd, zstd).

get_module(Name, Default) ->
persistent_term:get({?MODULE, Name}, Default).
Expand Down
2 changes: 1 addition & 1 deletion test/kpro_batch_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ encode_decode_test_() ->
?assertMatch(<<"v">>, Value)
end,
MagicVersions = [0, 1, 2],
CompressionOpts = [no_compression, gzip, snappy, lz4],
CompressionOpts = [no_compression, gzip, snappy, lz4, zstd],
[{atom_to_list(CompressionOpt), " magic v" ++ integer_to_list(MagicV),
fun() -> F(MagicV, CompressionOpt) end} ||
CompressionOpt <- CompressionOpts,
Expand Down
13 changes: 11 additions & 2 deletions test/kpro_produce_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,29 @@ non_monotoic_ts_in_batch_test() ->
end.

%% batches can be encoded by caller before making a produce request
encode_batch_beforehand_test() ->
encode_batch_beforehand(Compression) ->
{_, Vsn} = get_api_vsn_range(),
Batch = [#{ts => kpro_lib:now_ts(),
value => make_value(?LINE),
headers => []}],
Magic = kpro_lib:produce_api_vsn_to_magic_vsn(Vsn),
Bin = kpro:encode_batch(Magic, Batch, no_compression),
Bin = kpro:encode_batch(Magic, Batch, Compression),
Req = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Bin),
with_connection(
fun(Pid) ->
{ok, Rsp} = kpro:request_sync(Pid, Req, ?TIMEOUT),
?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp)
end).

encode_batch_beforehand_test_() ->
Methods0 = [?no_compression, ?gzip, ?snappy],
Methods = case kpro_test_lib:get_kafka_version() >= ?KAFKA_2_1 of
true -> Methods0 ++ [?zstd];
false -> Methods0
end,
[{atom_to_list(Method), fun() -> encode_batch_beforehand(Method) end}
|| Method <- Methods].

%% async send test
async_send_test() ->
{_, Vsn} = get_api_vsn_range(),
Expand Down

0 comments on commit 531a6d3

Please sign in to comment.