diff --git a/c_src/erlkaf_nif.cc b/c_src/erlkaf_nif.cc index 41b0b01..0e7d309 100755 --- a/c_src/erlkaf_nif.cc +++ b/c_src/erlkaf_nif.cc @@ -119,6 +119,7 @@ static ErlNifFunc nif_funcs[] = {"producer_new", 2, enif_producer_new}, {"producer_set_owner", 2, enif_producer_set_owner}, {"producer_topic_new", 3, enif_producer_topic_new}, + {"producer_topic_delete", 3, enif_producer_topic_delete}, {"producer_cleanup", 1, enif_producer_cleanup}, {"produce", 7, enif_produce}, {"get_metadata", 1, enif_get_metadata, ERL_NIF_DIRTY_JOB_IO_BOUND}, diff --git a/c_src/erlkaf_producer.cc b/c_src/erlkaf_producer.cc index 749585c..03253ce 100644 --- a/c_src/erlkaf_producer.cc +++ b/c_src/erlkaf_producer.cc @@ -146,6 +146,38 @@ ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TER return ATOMS.atomOk; } +ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + UNUSED(argc); + + std::string topic_name; + enif_producer* producer; + + erlkaf_data* data = static_cast(enif_priv_data(env)); + + if(!enif_get_resource(env, argv[0], data->res_producer, reinterpret_cast(&producer))) + return make_badarg(env); + + if(!get_string(env, argv[1], &topic_name)) + return make_badarg(env); + + rd_kafka_DeleteTopic_t **del_topics; + del_topics = reinterpret_cast(malloc(sizeof(*del_topics))); + del_topics[0] = rd_kafka_DeleteTopic_new(topic_name.data()); + + bool not_found; + + producer->topics->DeleteTopic(topic_name, *del_topics, ¬_found); + + if(not_found) + return make_error(env, "topic not found"); + + rd_kafka_DeleteTopic_destroy_array(del_topics, 1); + free(del_topics); + + return ATOMS.atomOk; +} + ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { UNUSED(argc); diff --git a/c_src/erlkaf_producer.h b/c_src/erlkaf_producer.h index a51ae93..8235dc1 100644 --- a/c_src/erlkaf_producer.h +++ b/c_src/erlkaf_producer.h @@ -7,6 +7,7 @@ void enif_producer_free(ErlNifEnv* env, void* obj); ERL_NIF_TERM enif_producer_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_topic_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM enif_producer_topic_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_set_owner(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_producer_cleanup(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/topicmanager.cc b/c_src/topicmanager.cc index 1e91998..d5e7b4b 100644 --- a/c_src/topicmanager.cc +++ b/c_src/topicmanager.cc @@ -30,6 +30,33 @@ rd_kafka_topic_t* TopicManager::AddTopic(const std::string& name, rd_kafka_topic return topic; } +void* TopicManager::DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found) +{ + CritScope ss(&crt_); + + auto it = topics_.find(name); + + if(it == topics_.end()) + { + *not_found = true; + return NULL; + } + + rd_kafka_AdminOptions_t *options; + options = rd_kafka_AdminOptions_new(rk_, RD_KAFKA_ADMIN_OP_DELETETOPICS); + + rd_kafka_queue_t *rkqu; + rkqu = rd_kafka_queue_new(rk_); + + *not_found = false; + rd_kafka_DeleteTopics(rk_, &del_topics, 1, options, rkqu); + + rd_kafka_AdminOptions_destroy(options); + rd_kafka_queue_destroy(rkqu); + + return NULL; +} + void TopicManager::Cleanup() { CritScope ss(&crt_); diff --git a/c_src/topicmanager.h b/c_src/topicmanager.h index 767cc6b..a00fe84 100644 --- a/c_src/topicmanager.h +++ b/c_src/topicmanager.h @@ -3,6 +3,7 @@ #include "macros.h" #include "critical_section.h" +#include "rdkafka.h" #include #include @@ -19,6 +20,7 @@ class TopicManager ~TopicManager(); rd_kafka_topic_t* AddTopic(const std::string& name, rd_kafka_topic_conf_t* conf, bool* already_exist); + void* DeleteTopic(const std::string& name, rd_kafka_DeleteTopic_t* del_topics, bool* not_found); rd_kafka_topic_t* GetOrCreateTopic(const std::string& name); private: diff --git a/src/erlkaf.erl b/src/erlkaf.erl index e78ff12..0da3ca4 100644 --- a/src/erlkaf.erl +++ b/src/erlkaf.erl @@ -16,6 +16,8 @@ create_topic/2, create_topic/3, + delete_topic/2, + produce/4, produce/5, produce/6, @@ -109,6 +111,16 @@ create_topic(ClientId, TopicName, TopicConfig) -> {error, ?ERR_UNDEFINED_CLIENT} end. +-spec delete_topic(client_id(), binary()) -> + ok | {error, reason()}. +delete_topic(ClientId, TopicName) -> + case erlkaf_cache_client:get(ClientId) of + {ok, ClientRef, _ClientPid} -> + erlkaf_manager:delete_topic(ClientRef, TopicName); + _ -> + {error, ?ERR_UNDEFINED_CLIENT} + end. + -spec get_metadata(client_id()) -> {ok, map()} | {error, reason()}. get_metadata(ClientId) -> diff --git a/src/erlkaf_manager.erl b/src/erlkaf_manager.erl index 2ef9b4c..d9f25db 100644 --- a/src/erlkaf_manager.erl +++ b/src/erlkaf_manager.erl @@ -13,6 +13,7 @@ start_consumer_group/5, stop_client/1, create_topic/3, + delete_topic/2, % gen_server @@ -43,6 +44,9 @@ stop_client(ClientId) -> create_topic(ClientRef, TopicName, TopicConfig) -> erlkaf_utils:safe_call(?MODULE, {create_topic, ClientRef, TopicName, TopicConfig}). +delete_topic(ClientRef, TopicName) -> + erlkaf_utils:safe_call(?MODULE, {delete_topic, ClientRef, TopicName}). + %gen server init([]) -> @@ -51,6 +55,9 @@ init([]) -> handle_call({create_topic, ClientRef, TopicName, TopicConfig}, _From, State) -> {reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName, TopicConfig), State}; +handle_call({delete_topic, ClientRef, TopicName}, _From, State) -> + {reply, erlkaf_nif:producer_topic_new(ClientRef, TopicName), State}; + handle_call({start_producer, ClientId, ErlkafConfig, LibRdkafkaConfig}, _From, State) -> case internal_start_producer(ClientId, ErlkafConfig, LibRdkafkaConfig) of {ok, _Pid} -> @@ -149,4 +156,4 @@ valid_consumer_topics([H|T]) -> {error, {invalid_topic, H}} end; valid_consumer_topics([]) -> - ok. \ No newline at end of file + ok.