From 820c8834c99d0b340836ed48c697042638900f1c Mon Sep 17 00:00:00 2001 From: Silviu Caragea Date: Thu, 11 Oct 2018 15:00:22 +0300 Subject: [PATCH] - Upgraded to librdkafka v0.11.5 - Add support for the new broker configs: ssl_curves_list, ssl_sigalgs_list, ssl_keystore_location, ssl_keystore_password, fetch_max_bytes - Add support for the new topic configs: queuing_strategy, compression_level, partitioner --- .gitignore | 1 + CHANGELOG.md | 15 +++++++++++++++ CONFIGURATION.md | 20 ++++++++++++++------ build_deps.sh | 2 +- include/erlkaf.hrl | 10 ++++++++++ include/erlkaf_private.hrl | 5 +++++ rebar.config | 5 +++-- rebar.lock | 34 ++++++++++++++++++++++++++++++++++ src/erlkaf.app.src | 5 ++++- src/erlkaf_config.erl | 16 ++++++++++++++++ test/sys.config | 10 +++++----- test/test_consumer.erl | 2 +- test/test_producer.erl | 2 +- 13 files changed, 110 insertions(+), 17 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 rebar.lock diff --git a/.gitignore b/.gitignore index 21909b8..df9a261 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /log/ /priv/ *.iml +/_build/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..816ea57 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,15 @@ +### Changelog: + +##### v1.1.0 + +- Based on librdkafka v0.11.5 +- Add support for the new broker configs: ssl_curves_list, ssl_sigalgs_list, ssl_keystore_location, ssl_keystore_password, fetch_max_bytes +- Add support for the new topic configs: queuing_strategy, compression_level, partitioner +- Fix build process on OSX High Sierra +- Upgrade deps to work on OTP 21 (thanks to Tomislav Trajakovic) + +##### v1.0 + +- Initial implementation (both producer and consumer) supporting most of the features available in librdkafka. +- Based on librdkafka v0.11.3 +- Tested on Mac OSX, Ubuntu 14.04 LTS, Ubuntu 16.04 LTS diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 62e8ec6..c825514 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -4,16 +4,16 @@ C/P legend: C = Consumer, P = Producer, * = both Property | C/P | Range | Default | Description -----------------------------------------|-----|--------------------|--------------:|-------------------------- -debug | * | generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, interceptor, plugin, all | undefined | A comma-separated list of debug contexts to enable. Debugging the Producer: broker,topic,msg. Consumer: cgrp,topic,fetch +debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, all | undefined | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch client_id | * | | rdkafka | Client identifier bootstrap_servers | * | | | Initial list of brokers host:port separated by comma message_max_bytes | * | 1000 .. 1000000000 | 1000000 | Maximum transmit message size. message_copy_max_bytes | * | 0 .. 1000000000 | 65535 | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs -receive_message_max_bytes | * | 1000 .. 1000000000 | 100000000 | Maximum receive message size. This is a safety precaution to avoid memory exhaustion in case of protocol hickups. The value should be at least `fetch_message_max_bytes` * number of partitions consumed from + messaging overhead (e.g. 200000 bytes) +receive_message_max_bytes | * | 1000 .. 2147483647 | 100000000 | Maximum receive message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. The value should be at least `fetch_message_max_bytes` * number of partitions consumed from + messaging overhead (e.g. 200000 bytes) max_in_flight | * | 1 .. 1000000 | 1000000 | Maximum number of in-flight requests the client will send. This setting applies per broker connection metadata_request_timeout_ms | * | 10 .. 900000 | 60000 | Non-topic request timeout in milliseconds. This is for metadata requests topic_metadata_refresh_interval_ms | * | -1 .. 3600000 | 300000 | Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh -metadata_max_age_ms | * | 1 .. 86400000 | -1 | Metadata cache max age. Defaults to `metadata_refresh_interval_ms` * 3 +metadata_max_age_ms | * | 1 .. 86400000 | -1 | Metadata cache max age. Defaults to `topic_metadata_refresh_interval_ms` * 3 topic_metadata_refresh_fast_interval_ms | * | 1 .. 60000 | 250 | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers topic_metadata_refresh_sparse | * | true, false | true | Sparse metadata requests (consumes less network bandwidth) topic_blacklist | * | | | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist @@ -21,8 +21,8 @@ socket_timeout_ms | * | 10 .. 300000 | 60 socket_send_buffer_bytes | * | 0 .. 100000000 | 0 | Broker socket send buffer size. System default is used if 0 socket_receive_buffer_bytes | * | 0 .. 100000000 | 0 | Broker socket receive buffer size. System default is used if 0 socket_keepalive_enable | * | true, false | false | Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets -socket_nagle_disable | * | true, false | false | Disable the Nagle algorithm (TCP_NODELAY) -socket_max_fails | * | 0 .. 1000000 | 3 | Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. NOTE: The connection is automatically re-established +socket_nagle_disable | * | true, false | false | Disable the Nagle algorithm on broker sockets (TCP_NODELAY) +socket_max_fails | * | 0 .. 1000000 | 1 | Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. NOTE: The connection is automatically re-established broker_address_ttl | * | 0 .. 86400000 | 1000 | How long to cache the broker address resolving results (milliseconds) broker_address_family | * | any, v4, v6 | any | Allowed broker IP address families: any, v4, v6 reconnect_backoff_jitter_ms | * | 0 .. 3600000 | 500 | Throttle broker reconnection attempts by this value +-50% @@ -35,11 +35,15 @@ api_version_fallback_ms | * | 0 .. 604800000 | 1200 broker_version_fallback | * | | 0.9.0 | Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see `api_version_request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api_version_fallback_ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | Protocol used to communicate with brokers ssl_cipher_suites | * | | | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3) +ssl_curves_list | * | | | The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required. +ssl_sigalgs_list | * | | | The client uses the TLS ClientHello signature_algorithms extension to indicate to the server which signature/hash algorithm pairs may be used in digital signatures. See manual page for `SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required. ssl_key_location | * | | | Path to client's private key (PEM) used for authentication. ssl_key_password | * | | | Private key passphrase ssl_certificate_location | * | | | Path to client's public key (PEM) used for authentication. ssl_ca_location | * | | | File or directory path to CA certificate(s) for verifying the broker's key. -ssl_crl_location | * | | | Path to CRL for verifying broker's certificate validity. +ssl_crl_location | * | | | Path to CRL for verifying broker's certificate validity. +ssl_keystore_location | * | | | Path to client's keystore (PKCS#12) used for authentication.
*Type: string* +ssl_keystore_password | * | | | Client's keystore (PKCS#12) password.
*Type: string* sasl_mechanisms | * | | GSSAPI | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. **NOTE**: Despite the name only one mechanism must be configured. sasl_kerberos_service_name | * | | kafka | Kerberos principal name that Kafka runs as sasl_kerberos_principal | * | | kafkaclient | This client's Kerberos principal name. @@ -57,6 +61,7 @@ queued_min_messages | C | 1 .. 10000000 | 100000 queued_max_messages_kbytes | C | 1 .. 1000000000 | 1048576 | Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by `fetch_message_max_bytes` fetch_wait_max_ms | C | 0 .. 300000 | 100 | Maximum time the broker may wait to fill the response with `fetch_min_bytes` fetch_message_max_bytes | C | 1 .. 1000000000 | 1048576 | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched +fetch_max_bytes | C | 0 .. 2147483135 | 52428800 | Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message_max_bytes` (broker config) or `max_message_bytes` (broker topic config). `fetch_max_bytes` is automatically adjusted upwards to be at least `message_max_bytes` (consumer config). fetch_min_bytes | C | 1 .. 100000000 | 1 | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting fetch_error_backoff_ms | C | 0 .. 300000 | 500 | How long to postpone the next fetch request for a topic+partition in case of a fetch error offset_store_method | C | none, file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker) @@ -81,9 +86,12 @@ Property | C/P | Range | Default request_required_acks | P | -1 .. 1000 | 1 | This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *1*=Only the leader broker will need to ack the message, *-1* or *all*=broker will block until message is committed by all in sync replicas (ISRs) or broker's `in.sync.replicas` setting before sending response. request_timeout_ms | P | 1 .. 900000 | 5000 | The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request_required_acks` being != 0 message_timeout_ms | P | 0 .. 900000 | 300000 | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite +queuing_strategy | P | fifo, lifo | fifo | Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages. WARNING: `lifo` is experimental and subject to change or removal. compression_codec | P | none, gzip, snappy, lz4, inherit | inherit | Compression codec to use for compressing message sets +compression_level | P | -1 .. 12 | -1 | Compression level parameter for algorithm selected by configuration property `compression_codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level. auto_commit_interval_ms | C | 10 .. 86400000 | 60000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. This setting is used by the low-level legacy consumer auto_offset_reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error. +partitioner | P | random,consistent,consistent_random, murmur2, murmur2_random | consistent_random | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.). offset_store_path | C | | . | Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition offset_store_sync_interval_ms | C | -1 .. 86400000 | -1 | fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write offset_store_method | C | file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.) diff --git a/build_deps.sh b/build_deps.sh index 985b30c..106229b 100755 --- a/build_deps.sh +++ b/build_deps.sh @@ -10,7 +10,7 @@ fi REPO=https://github.com/edenhill/librdkafka.git BRANCH=master -REV=b581d0d9df282847f76e8b9e87337161959d39c9 +REV=44242a464c43e09c685f47a7f3dca2963b10e2a9 function fail_check { diff --git a/include/erlkaf.hrl b/include/erlkaf.hrl index de770c5..ff48e1c 100644 --- a/include/erlkaf.hrl +++ b/include/erlkaf.hrl @@ -21,14 +21,19 @@ -type ip_family() :: any| v4| v6. -type security_protocol() :: plaintext | ssl | sasl_plaintext | sasl_ssl. -type overflow_strategy() :: local_disk_queue | block_calling_process | drop_records. +-type queuing_strategy() :: fifo | lifo. +-type partitioner() :: random|consistent|consistent_random|murmur2|murmur2_random. -type topic_option() :: {request_required_acks, integer()} | {request_timeout_ms, integer()} | {message_timeout_ms, integer()} | + {queuing_strategy, queuing_strategy()} | {compression_codec, compression_codec()} | + {compression_level, integer()} | {auto_commit_interval_ms, integer()} | {auto_offset_reset, offset_reset()} | + {partitioner, partitioner()} | {offset_store_path, binary()} | {offset_store_sync_interval_ms, integer()} | {offset_store_method, offset_store_method()}. @@ -63,11 +68,15 @@ {broker_version_fallback, boolean()} | {security_protocol, security_protocol()} | {ssl_cipher_suites, binary()} | + {ssl_curves_list, binary()} | + {ssl_sigalgs_list, binary()} | {ssl_key_location, binary()} | {ssl_key_password, binary()} | {ssl_certificate_location, binary()} | {ssl_ca_location, binary()} | {ssl_crl_location, binary()} | + {ssl_keystore_location, binary()} | + {ssl_keystore_password, binary()} | {sasl_mechanisms, binary()} | {sasl_kerberos_service_name, binary()} | {sasl_kerberos_principal, binary()} | @@ -85,6 +94,7 @@ {queued_max_messages_kbytes, integer()} | {fetch_wait_max_ms, integer()} | {fetch_message_max_bytes, integer()} | + {fetch_max_bytes, integer()} | {fetch_min_bytes, integer()} | {fetch_error_backoff_ms, integer()} | {offset_store_method, offset_store_method()} | diff --git a/include/erlkaf_private.hrl b/include/erlkaf_private.hrl index 3250c63..9848830 100644 --- a/include/erlkaf_private.hrl +++ b/include/erlkaf_private.hrl @@ -69,6 +69,11 @@ -define(RD_KAFKA_RESP_ERR_VALUE_SERIALIZATION, -161). -define(RD_KAFKA_RESP_ERR_KEY_DESERIALIZATION, -160). -define(RD_KAFKA_RESP_ERR_VALUE_DESERIALIZATION, -159). +-define(RD_KAFKA_RESP_ERR_PARTIAL, -158). +-define(RD_KAFKA_RESP_ERR_READ_ONLY, -157). +-define(RD_KAFKA_RESP_ERR_NOENT, -156). +-define(RD_KAFKA_RESP_ERR_UNDERFLOW, -155). +-define(RD_KAFKA_RESP_ERR_INVALID_TYPE, -154). -define(RD_KAFKA_RESP_ERR_END, -100). %broker errors diff --git a/rebar.config b/rebar.config index da6bf90..e2a1966 100644 --- a/rebar.config +++ b/rebar.config @@ -1,11 +1,12 @@ {pre_hooks, [{"(linux|darwin)", compile, "make compile_nif"}]}. {post_hooks, [{"(linux|darwin)", clean, "make clean_nif"}]}. +{artifacts, ["priv/erlkaf_nif.so"]}. {deps, [ - {lager, ".*", {git, "https://github.com/erlang-lager/lager.git", {tag, "3.4.2"}}}, + {lager, ".*", {git, "https://github.com/erlang-lager/lager.git", {tag, "3.6.6"}}}, {jsone, ".*", {git, "https://github.com/sile/jsone.git", {tag, "1.4.7"}}}, {plists, ".*", {git, "https://github.com/silviucpp/plists.git", {tag, "1.1.2"}}}, - {esq, ".*", {git, "https://github.com/fogfish/esq.git", "d4be009cace3aa901966c60947ea9dfe0fe6aefd"}} + {esq, ".*", {git, "https://github.com/fogfish/esq.git", "d4c220695f2ca538e0e2b10ef33d93ddf8753ec7"}} ]}. {erl_opts, [ diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..d8f313c --- /dev/null +++ b/rebar.lock @@ -0,0 +1,34 @@ +{"1.1.0", +[{<<"datum">>,{pkg,<<"datum">>,<<"4.3.6">>},1}, + {<<"esq">>, + {git,"https://github.com/fogfish/esq.git", + {ref,"d4c220695f2ca538e0e2b10ef33d93ddf8753ec7"}}, + 0}, + {<<"feta">>, + {git,"https://github.com/fogfish/feta", + {ref,"c5d251b3f995b96afd5e8ec7da61516842aa658c"}}, + 1}, + {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, + {<<"jsone">>, + {git,"https://github.com/sile/jsone.git", + {ref,"e273151504fbb1201f3a2d5effe350a4a27c3121"}}, + 0}, + {<<"lager">>, + {git,"https://github.com/erlang-lager/lager.git", + {ref,"37b7facc410daabf3ebd50bac8da7e0e0f3cc0e2"}}, + 0}, + {<<"pipe">>,{pkg,<<"pipes">>,<<"2.0.0">>},1}, + {<<"plists">>, + {git,"https://github.com/silviucpp/plists.git", + {ref,"13f6c97bd432bfa493d93b279e7b90d22ff6971a"}}, + 0}, + {<<"uid">>, + {git,"https://github.com/fogfish/uid", + {ref,"698ebcb27b273a2f879d8bd24be5f5ea54c98850"}}, + 1}]}. +[ +{pkg_hash,[ + {<<"datum">>, <<"07AA802EF8C68FC990164BB398890FBE03FF4FA30570C37BF6D836A4FDF27D54">>}, + {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, + {<<"pipe">>, <<"A10DF54F1EC80EAA491EE7BD689C794886E6E668A9F05436514695E9CF5370AE">>}]} +]. diff --git a/src/erlkaf.app.src b/src/erlkaf.app.src index 4b099be..f5845f0 100644 --- a/src/erlkaf.app.src +++ b/src/erlkaf.app.src @@ -1,6 +1,9 @@ {application, erlkaf, [ {description, "erlkaf - Erlang Kafka library based on librdkafka"}, - {vsn, "1.0"}, + {maintainers, ["Silviu Caragea"]}, + {licenses, ["MIT"]}, + {links,[{"Github","https://github.com/silviucpp/erlkaf"}]}, + {vsn, "1.1.0"}, {registered, []}, {applications, [kernel, stdlib, lager, jsone, esq]}, {mod, {erlkaf_app, []}}, diff --git a/src/erlkaf_config.erl b/src/erlkaf_config.erl index c8d451c..0145098 100644 --- a/src/erlkaf_config.erl +++ b/src/erlkaf_config.erl @@ -55,8 +55,14 @@ to_librdkafka_topic_config(request_timeout_ms, V) -> {<<"request.timeout.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(message_timeout_ms, V) -> {<<"message.timeout.ms">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_topic_config(queuing_strategy, V) -> + {<<"queuing.strategy">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_topic_config(partitioner, V) -> + {<<"partitioner">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(compression_codec, V) -> {<<"compression.codec">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_topic_config(compression_level, V) -> + {<<"compression.level">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(auto_commit_interval_ms, V) -> {<<"auto.commit.interval.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_topic_config(auto_offset_reset, V) -> @@ -150,6 +156,10 @@ to_librdkafka_config(security_protocol, V) -> {<<"security.protocol">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(ssl_cipher_suites, V) -> {<<"ssl.cipher.suites">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(ssl_curves_list, V) -> + {<<"ssl.curves.list">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(ssl_sigalgs_list, V) -> + {<<"ssl.sigalgs.list">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(ssl_key_location, V) -> {<<"ssl.key.location">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(ssl_key_password, V) -> @@ -160,6 +170,10 @@ to_librdkafka_config(ssl_ca_location, V) -> {<<"ssl.ca.location">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(ssl_crl_location, V) -> {<<"ssl.crl.location">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(ssl_keystore_location, V) -> + {<<"ssl.keystore.location">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(ssl_keystore_password, V) -> + {<<"ssl.keystore.password">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(sasl_mechanisms, V) -> {<<"sasl.mechanisms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(sasl_kerberos_service_name, V) -> @@ -194,6 +208,8 @@ to_librdkafka_config(fetch_wait_max_ms, V) -> {<<"fetch.wait.max.ms">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(fetch_message_max_bytes, V) -> {<<"fetch.message.max.bytes">>, erlkaf_utils:to_binary(V)}; +to_librdkafka_config(fetch_max_bytes, V) -> + {<<"fetch.max.bytes">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(fetch_min_bytes, V) -> {<<"fetch.min.bytes">>, erlkaf_utils:to_binary(V)}; to_librdkafka_config(fetch_error_backoff_ms, V) -> diff --git a/test/sys.config b/test/sys.config index 6d5aa26..a54c074 100644 --- a/test/sys.config +++ b/test/sys.config @@ -15,9 +15,9 @@ {clients, [ {kafka_client, [ {endpoints, [ - {"172.17.33.123", 9092}, - {"172.17.33.124", 9092}, - {"172.17.33.125", 9092} + {"172.17.3.163", 9092}, + {"172.17.3.164", 9092}, + {"172.17.3.165", 9092} ]}, {auto_start_producers, true}, @@ -38,7 +38,7 @@ ]}, {client_options, [ - {bootstrap_servers, "172.17.33.123:9092,172.17.33.124:9092,172.17.33.125:9092"}, + {bootstrap_servers, "172.17.3.163:9092,172.17.3.164:9092,172.17.3.165:9092"}, {delivery_report_only_error, true}, {delivery_report_callback, benchmark_producer}, {queue_buffering_max_messages, 1000000} @@ -58,7 +58,7 @@ ]}, {client_options, [ - {bootstrap_servers, "172.17.33.123:9092"} + {bootstrap_servers, "172.17.3.163:9092"} ]} ]} ]} diff --git a/test/test_consumer.erl b/test/test_consumer.erl index e282d63..ba98e47 100644 --- a/test/test_consumer.erl +++ b/test/test_consumer.erl @@ -20,7 +20,7 @@ create_consumer() -> GroupId = <<"erlkaf_consumer">>, ClientConfig = [ - {bootstrap_servers, "172.17.33.123:9092"} + {bootstrap_servers, "172.17.3.163:9092"} ], TopicConf = [ diff --git a/test/test_producer.erl b/test/test_producer.erl index c1c9ef1..062f460 100644 --- a/test/test_producer.erl +++ b/test/test_producer.erl @@ -23,7 +23,7 @@ create_producer() -> erlkaf:start(), ProducerConfig = [ - {bootstrap_servers, "172.17.33.123:9092"}, + {bootstrap_servers, "172.17.3.163:9092"}, {delivery_report_only_error, true}, {delivery_report_callback, ?MODULE} ],