Skip to content

Commit

Permalink
fix: Add default config based on target object (#76)
Browse files Browse the repository at this point in the history
Only apply "enable.auto.commit", "enable.auto.offset.store",
"enable.partition.eof" and "allow.auto.create.topics" configurations if
the target object is a consumer.

That change fixes #73
  • Loading branch information
gabriel-araujjo authored Nov 5, 2024
1 parent 49cc053 commit bda8e99
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
24 changes: 13 additions & 11 deletions c_src/erlkaf_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ template <typename T> ERL_NIF_TERM parse_config(ErlNifEnv* env, ERL_NIF_TERM lis
return ATOMS.atomOk;
}

bool appy_kafka_default_config(rd_kafka_conf_t* config)
bool appy_kafka_default_config(rd_kafka_conf_t* config, TargetObject target)
{
if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(target == TargetObject::Consumer) {
if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;

if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;

if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;

if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
}

#ifdef SIGIO
// quick termination
Expand Down Expand Up @@ -99,9 +101,9 @@ ERL_NIF_TERM parse_topic_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_topi
return parse_config(env, list, conf, kTopicConfFuns);
}

ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf)
ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf, TargetObject target)
{
if(!appy_kafka_default_config(conf))
if(!appy_kafka_default_config(conf, target))
return make_error(env, "failed to apply default kafka config");

return parse_config(env, list, conf, kKafkaConfFuns);
Expand Down
7 changes: 6 additions & 1 deletion c_src/erlkaf_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
typedef struct rd_kafka_conf_s rd_kafka_conf_t;
typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;

enum TargetObject {
Producer = 0,
Consumer,
};

ERL_NIF_TERM parse_topic_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_topic_conf_t* conf);
ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf);
ERL_NIF_TERM parse_kafka_config(ErlNifEnv* env, ERL_NIF_TERM list, rd_kafka_conf_t* conf, TargetObject target);

#endif // C_SRC_ERLKAF_CONFIG_H_

2 changes: 1 addition & 1 deletion c_src/erlkaf_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ ERL_NIF_TERM enif_consumer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv
scoped_ptr(client_conf, rd_kafka_conf_t, rd_kafka_conf_new(), rd_kafka_conf_destroy);
scoped_ptr(topic_conf, rd_kafka_topic_conf_t, rd_kafka_topic_conf_new(), rd_kafka_topic_conf_destroy);

ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[2], client_conf.get());
ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[2], client_conf.get(), TargetObject::Consumer);

if(parse_result != ATOMS.atomOk)
return parse_result;
Expand Down
2 changes: 1 addition & 1 deletion c_src/erlkaf_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv

scoped_ptr(config, rd_kafka_conf_t, rd_kafka_conf_new(), rd_kafka_conf_destroy);

ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[1], config.get());
ERL_NIF_TERM parse_result = parse_kafka_config(env, argv[1], config.get(), TargetObject::Producer);

if(parse_result != ATOMS.atomOk)
return parse_result;
Expand Down

0 comments on commit bda8e99

Please sign in to comment.