From bda8e994ac696a5c3e880142ba013ece723672bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Ara=C3=BAjo?= Date: Tue, 5 Nov 2024 16:57:19 -0300 Subject: [PATCH] fix: Add default config based on target object (#76) Only apply "enable.auto.commit", "enable.auto.offset.store", "enable.partition.eof" and "allow.auto.create.topics" configurations if the target object is a consumer. That change fixes #73 --- c_src/erlkaf_config.cc | 24 +++++++++++++----------- c_src/erlkaf_config.h | 7 ++++++- c_src/erlkaf_consumer.cc | 2 +- c_src/erlkaf_producer.cc | 2 +- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/c_src/erlkaf_config.cc b/c_src/erlkaf_config.cc index c9297cd..bf747b2 100644 --- a/c_src/erlkaf_config.cc +++ b/c_src/erlkaf_config.cc @@ -54,19 +54,21 @@ template ERL_NIF_TERM parse_config(ErlNifEnv* env, ERL_NIF_TERM lis return ATOMS.atomOk; } -bool appy_kafka_default_config(rd_kafka_conf_t* config) +bool appy_kafka_default_config(rd_kafka_conf_t* config, TargetObject target) { - if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK) - return false; + if(target == TargetObject::Consumer) { + if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK) + return false; - if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK) - return false; + if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK) + return false; - if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK) - return false; + if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK) + return false; - if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK) - return false; + if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK) + return false; + } #ifdef SIGIO // quick termination @@ -99,9 +101,9 @@ ERL_NIF_TERM parse_topic_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_topi return parse_config(env, list, conf, kTopicConfFuns); } -ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf) +ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf, TargetObject target) { - if(!appy_kafka_default_config(conf)) + if(!appy_kafka_default_config(conf, target)) return make_error(env, "failed to apply default kafka config"); return parse_config(env, list, conf, kKafkaConfFuns); diff --git a/c_src/erlkaf_config.h b/c_src/erlkaf_config.h index beaffc7..f933b35 100644 --- a/c_src/erlkaf_config.h +++ b/c_src/erlkaf_config.h @@ -6,8 +6,13 @@ typedef struct rd_kafka_conf_s rd_kafka_conf_t; typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t; +enum TargetObject { + Producer = 0, + Consumer, +}; + ERL_NIF_TERM parse_topic_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_topic_conf_t* conf); -ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf); +ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf, TargetObject target); #endif // C_SRC_ERLKAF_CONFIG_H_ diff --git a/c_src/erlkaf_consumer.cc b/c_src/erlkaf_consumer.cc index c430457..c47673e 100644 --- a/c_src/erlkaf_consumer.cc +++ b/c_src/erlkaf_consumer.cc @@ -264,7 +264,7 @@ ERL_NIF_TERM enif_consumer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv scoped_ptr(client_conf, rd_kafka_conf_t, rd_kafka_conf_new(), rd_kafka_conf_destroy); scoped_ptr(topic_conf, rd_kafka_topic_conf_t, rd_kafka_topic_conf_new(), rd_kafka_topic_conf_destroy); - ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[2], client_conf.get()); + ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[2], client_conf.get(), TargetObject::Consumer); if(parse_result != ATOMS.atomOk) return parse_result; diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index b937a24..d800cce 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -178,7 +178,7 @@ ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv scoped_ptr(config, rd_kafka_conf_t, rd_kafka_conf_new(), rd_kafka_conf_destroy); - ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[1], config.get()); + ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[1], config.get(), TargetObject::Producer); if(parse_result != ATOMS.atomOk) return parse_result;