diff --git a/changelog.md b/changelog.md index da0fff2..21fe601 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,5 @@ +* 4.1.0 + - Add support for `zstd` compression codec * 4.0.2 - Bug fix: empty bytes is encoded to `0`, but not `-1` - Respect `connect` API's timeout parameter as an overall timeout, rather not the timeout for each internal step diff --git a/include/kpro_private.hrl b/include/kpro_private.hrl index d80e17f..4e4bbbe 100644 --- a/include/kpro_private.hrl +++ b/include/kpro_private.hrl @@ -24,12 +24,14 @@ -define(gzip, gzip). -define(snappy, snappy). -define(lz4, lz4). +-define(zstd, zstd). %% Compression attributes -define(KPRO_COMPRESS_NONE, 0). -define(KPRO_COMPRESS_GZIP, 1). -define(KPRO_COMPRESS_SNAPPY, 2). -define(KPRO_COMPRESS_LZ4, 3). +-define(KPRO_COMPRESS_ZSTD, 4). -define(KPRO_COMPRESSION_MASK, 2#111). -define(KPRO_IS_GZIP_ATTR(ATTR), @@ -38,6 +40,9 @@ ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_SNAPPY)). -define(KPRO_IS_LZ4_ATTR(ATTR), ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_LZ4)). +-define(KPRO_IS_ZSTD_ATTR(ATTR), + ((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_ZSTD)). + -define(KPRO_TS_TYPE_CREATE, 0). -define(KPRO_TS_TYPE_APPEND, 2#1000). @@ -95,6 +100,7 @@ -define(KAFKA_0_11, 11). -define(KAFKA_1_0, 100). -define(KAFKA_1_1, 110). +-define(KAFKA_2_1, 210). -ifdef(OTP_RELEASE). -define(BIND_STACKTRACE(Var), :Var). diff --git a/rebar.config b/rebar.config index 4b4901b..11abffa 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,8 @@ [ { test, [ {deps, [ {snappyer, "1.2.8"}, - {lz4b, "0.0.8"} + {lz4b, "0.0.8"}, + {zstd, {git, "https://github.com/zmstone/zstd-erlang.git", {tag, "v0.3.0-zm"}}} ]} ] } diff --git a/src/kpro_compress.erl b/src/kpro_compress.erl index 1176a74..65551f9 100644 --- a/src/kpro_compress.erl +++ b/src/kpro_compress.erl @@ -26,7 +26,7 @@ %% @doc Set snappy or lz4 compression modules. %% This should override the default usage of `snappyer' and `lz4b_frame'. --spec provide([{snappy | lz4, module()}]) -> ok. +-spec provide([{snappy | lz4 | zstd | module()}]) -> ok. provide(Libs) -> lists:foreach(fun({Name, Module}) -> persistent_term:put({?MODULE, Name}, Module) @@ -37,12 +37,14 @@ provide(Libs) -> codec_to_method(A) when ?KPRO_IS_GZIP_ATTR(A) -> ?gzip; codec_to_method(A) when ?KPRO_IS_SNAPPY_ATTR(A) -> ?snappy; codec_to_method(A) when ?KPRO_IS_LZ4_ATTR(A) -> ?lz4; +codec_to_method(A) when ?KPRO_IS_ZSTD_ATTR(A) -> ?zstd; codec_to_method(_) -> ?no_compression. %% @doc Translate compression method to bits for kafka batch attributes. method_to_codec(?gzip) -> ?KPRO_COMPRESS_GZIP; method_to_codec(?snappy) -> ?KPRO_COMPRESS_SNAPPY; method_to_codec(?lz4) -> ?KPRO_COMPRESS_LZ4; +method_to_codec(?zstd) -> ?KPRO_COMPRESS_ZSTD; method_to_codec(?no_compression) -> ?KPRO_COMPRESS_NONE. %% @doc Compress encoded batch. @@ -56,7 +58,8 @@ compress(Name, IoData) -> do_compress(Name, IoData). decompress(?no_compression, Bin) -> Bin; decompress(?gzip, Bin) -> zlib:gunzip(Bin); decompress(?snappy, Bin) -> java_snappy_unpack(Bin); -decompress(?lz4, Bin) -> do_decompress(?lz4, Bin). +decompress(?lz4, Bin) -> do_decompress(?lz4, Bin); +decompress(?zstd, Bin) -> do_decompress(?zstd, Bin). %%%_* Internals ================================================================ @@ -90,10 +93,12 @@ do_decompress(Name, Bin) -> Module = get_module(Name), iodata(Module:decompress(Bin)). -get_module(snappy) -> - get_module(snappy, snappyer); -get_module(lz4) -> - get_module(lz4, lz4b_frame). +get_module(?snappy) -> + get_module(?snappy, snappyer); +get_module(?lz4) -> + get_module(?lz4, lz4b_frame); +get_module(?zstd) -> + get_module(?zstd, zstd). get_module(Name, Default) -> persistent_term:get({?MODULE, Name}, Default). diff --git a/test/kpro_batch_tests.erl b/test/kpro_batch_tests.erl index 69a16f1..fa053e9 100644 --- a/test/kpro_batch_tests.erl +++ b/test/kpro_batch_tests.erl @@ -40,7 +40,7 @@ encode_decode_test_() -> ?assertMatch(<<"v">>, Value) end, MagicVersions = [0, 1, 2], - CompressionOpts = [no_compression, gzip, snappy, lz4], + CompressionOpts = [no_compression, gzip, snappy, lz4, zstd], [{atom_to_list(CompressionOpt), " magic v" ++ integer_to_list(MagicV), fun() -> F(MagicV, CompressionOpt) end} || CompressionOpt <- CompressionOpts, diff --git a/test/kpro_produce_tests.erl b/test/kpro_produce_tests.erl index 8d885b4..4499140 100644 --- a/test/kpro_produce_tests.erl +++ b/test/kpro_produce_tests.erl @@ -78,13 +78,13 @@ non_monotoic_ts_in_batch_test() -> end. %% batches can be encoded by caller before making a produce request -encode_batch_beforehand_test() -> +encode_batch_beforehand(Compression) -> {_, Vsn} = get_api_vsn_range(), Batch = [#{ts => kpro_lib:now_ts(), value => make_value(?LINE), headers => []}], Magic = kpro_lib:produce_api_vsn_to_magic_vsn(Vsn), - Bin = kpro:encode_batch(Magic, Batch, no_compression), + Bin = kpro:encode_batch(Magic, Batch, Compression), Req = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Bin), with_connection( fun(Pid) -> @@ -92,6 +92,15 @@ encode_batch_beforehand_test() -> ?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp) end). +encode_batch_beforehand_test_() -> + Methods0 = [?no_compression, ?gzip, ?snappy], + Methods = case kpro_test_lib:get_kafka_version() >= ?KAFKA_2_1 of + true -> Methods0 ++ [?zstd]; + false -> Methods0 + end, + [{atom_to_list(Method), fun() -> encode_batch_beforehand(Method) end} + || Method <- Methods]. + %% async send test async_send_test() -> {_, Vsn} = get_api_vsn_range(),