Skip to content

Commit

Permalink
Add support for dispatch_mode settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Silviu Caragea committed Mar 5, 2019
1 parent 47c852d commit 247abb7
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 43 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
### Changelog:

##### v1.1.4

- Add support for dispatch_mode topic setting.
- Based on librdkafka v0.11.6

##### v1.1.3

- Add support for Trevis CI
Expand Down
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ partitioner | P | random,consistent,consistent_ra
offset_store_path | C | | . | Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition
offset_store_sync_interval_ms | C | -1 .. 86400000 | -1 | fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write
offset_store_method | C | file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.)

dispatch_mode | C | one_by_one, {batch, integer()} | one_by_one | This field indicates how messages are dispatched to the message handler. One message per callback call (default) or multiple messages up to the specified size (batch). If the batch approach is used the consumer will mark as processed the last offset form the batch.
2 changes: 1 addition & 1 deletion build_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fi

REPO=https://github.com/edenhill/librdkafka.git
BRANCH=master
REV=44242a464c43e09c685f47a7f3dca2963b10e2a9
REV=849c066b559950b02e37a69256f0cb7b04381d0e

function fail_check
{
Expand Down
1 change: 1 addition & 0 deletions c_src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ifeq ($(UNAME_SYS), darwin)
-Wl,-U,_enif_make_list_from_array \
-Wl,-U,_enif_make_ulong \
-Wl,-U,_enif_get_ulong \
-Wl,-U,_enif_get_uint \
-Wl,-U,_enif_make_list_cell \
-Wl,-U,_enif_make_reverse_list
endif
Expand Down
14 changes: 10 additions & 4 deletions c_src/erlkaf_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

static const char* kThreadOptsId = "librdkafka_consumer_thread_opts";
static const char* kPollThreadId = "librdkafka_consumer_poll_thread";
static const uint32_t kMaxConsumeBatchSize = 100;

#include <vector>
#include <memory>
Expand Down Expand Up @@ -344,14 +343,20 @@ ERL_NIF_TERM enif_consumer_queue_poll(ErlNifEnv* env, int argc, const ERL_NIF_TE
if(!enif_get_resource(env, argv[0], data->res_queue, (void**) &q))
return make_badarg(env);

uint32_t max_batch_size;

if(!enif_get_uint(env, argv[1], &max_batch_size))
return make_badarg(env);

std::vector<ERL_NIF_TERM> messages;
messages.reserve(kMaxConsumeBatchSize);
messages.reserve(max_batch_size);

ERL_NIF_TERM topic = 0;
ERL_NIF_TERM partition = 0;
bool first = true;
int64_t last_offset = -1;

while(messages.size() <= kMaxConsumeBatchSize)
while(messages.size() < max_batch_size)
{
rd_kafka_event_t* event = rd_kafka_queue_poll(q->queue, 0);

Expand Down Expand Up @@ -379,14 +384,15 @@ ERL_NIF_TERM enif_consumer_queue_poll(ErlNifEnv* env, int argc, const ERL_NIF_TE
ERL_NIF_TERM value = make_binary(env, reinterpret_cast<const char*>(msg->payload), msg->len);
ERL_NIF_TERM msg_term = enif_make_tuple6(env, ATOMS.atomMessage, topic, partition, offset, key, value);

last_offset = msg->offset;
messages.push_back(msg_term);
}

rd_kafka_event_destroy(event);
}

ERL_NIF_TERM list = enif_make_list_from_array(env, &messages[0], messages.size());
return make_ok_result(env, list);
return enif_make_tuple(env, 3, ATOMS.atomOk, list, enif_make_int64(env, last_offset));
}

ERL_NIF_TERM enif_consumer_offset_store(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
Expand Down
2 changes: 1 addition & 1 deletion c_src/erlkaf_nif.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static ErlNifFunc nif_funcs[] =

{"consumer_new", 4, enif_consumer_new},
{"consumer_partition_revoke_completed", 1, enif_consumer_partition_revoke_completed},
{"consumer_queue_poll", 1, enif_consumer_queue_poll},
{"consumer_queue_poll", 2, enif_consumer_queue_poll},
{"consumer_queue_cleanup", 1, enif_consumer_queue_cleanup},
{"consumer_offset_store", 4, enif_consumer_offset_store},
{"consumer_cleanup", 1, enif_consumer_cleanup}
Expand Down
2 changes: 1 addition & 1 deletion src/erlkaf.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{maintainers, ["Silviu Caragea"]},
{licenses, ["MIT"]},
{links,[{"Github","https://github.com/silviucpp/erlkaf"}]},
{vsn, "1.1.3"},
{vsn, "1.1.4"},
{registered, []},
{applications, [
kernel,
Expand Down
9 changes: 9 additions & 0 deletions src/erlkaf_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ filter_kafka_config([], ErlkafAcc, RdKafkaConf) ->

% topic related configs

is_erlkaf_topic_config(dispatch_mode = K, V) ->
case V of
one_by_one ->
true;
{batch, Max} when is_integer(Max) ->
true;
_ ->
throw({error, {options, {K, V}}})
end;
is_erlkaf_topic_config(_, _) ->
false.

Expand Down
99 changes: 80 additions & 19 deletions src/erlkaf_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
-include("erlkaf_private.hrl").

-define(POLL_IDLE_MS, 1000).
-define(DEFAULT_BATCH_SIZE, 100).

-behaviour(gen_server).

-export([
start_link/7,
start_link/8,
stop/1,

% gen_server
Expand All @@ -28,11 +29,14 @@
queue_ref,
cb_module,
cb_state,
messages = []
poll_batch_size,
dispatch_mode,
messages = [],
last_offset = -1
}).

start_link(ClientRef, TopicName, Partition, Offset, QueueRef, CbModule, CbArgs) ->
gen_server:start_link(?MODULE, [ClientRef, TopicName, Partition, Offset, QueueRef, CbModule, CbArgs], []).
start_link(ClientRef, TopicName, DispatchMode, Partition, Offset, QueueRef, CbModule, CbArgs) ->
gen_server:start_link(?MODULE, [ClientRef, TopicName, DispatchMode, Partition, Offset, QueueRef, CbModule, CbArgs], []).

stop(Pid) ->
case erlang:is_process_alive(Pid) of
Expand All @@ -50,13 +54,25 @@ stop(Pid) ->
{error, not_alive}
end.

init([ClientRef, TopicName, Partition, Offset, QueueRef, CbModule, CbArgs]) ->
init([ClientRef, TopicName, DispatchMode, Partition, Offset, QueueRef, CbModule, CbArgs]) ->
?INFO_MSG("start consumer for: ~p partition: ~p offset: ~p", [TopicName, Partition, Offset]),

case catch CbModule:init(TopicName, Partition, Offset, CbArgs) of
{ok, CbState} ->
schedule_poll(0),
{ok, #state{client_ref = ClientRef, topic_name = TopicName, partition = Partition, queue_ref = QueueRef, cb_module = CbModule, cb_state = CbState}};

{DpMode, PollBatchSize} = dispatch_mode_parse(DispatchMode),

{ok, #state{
client_ref = ClientRef,
topic_name = TopicName,
partition = Partition,
queue_ref = QueueRef,
cb_module = CbModule,
cb_state = CbState,
poll_batch_size = PollBatchSize,
dispatch_mode = DpMode
}};
Error ->
{stop, Error}
end.
Expand All @@ -69,27 +85,33 @@ handle_cast(Request, State) ->
?ERROR_MSG("handle_cast unexpected message: ~p", [Request]),
{noreply, State}.

handle_info(poll_events, #state{queue_ref = Queue} = State) ->
case erlkaf_nif:consumer_queue_poll(Queue) of
{ok, Events} ->
handle_info(poll_events, #state{queue_ref = Queue, poll_batch_size = PollBatchSize} = State) ->
case erlkaf_nif:consumer_queue_poll(Queue, PollBatchSize) of
{ok, Events, LastOffset} ->
case Events of
[] ->
schedule_poll(?POLL_IDLE_MS),
{noreply, State};
_ ->
schedule_message_process(0),
{noreply, State#state{messages = Events}}
{noreply, State#state{messages = Events, last_offset = LastOffset}}
end;
Error ->
?INFO_MSG("~p poll events error: ~p", [?MODULE, Error]),
throw({error, Error})
end;

handle_info(process_messages, #state{messages = Msg, client_ref = ClientRef, cb_module = CbModule, cb_state = CbState} = State) ->
case process_events(Msg, ClientRef, CbModule, CbState) of
handle_info(process_messages, #state{
messages = Msgs,
dispatch_mode = DispatchMode,
client_ref = ClientRef,
cb_module = CbModule,
cb_state = CbState} = State) ->

case process_events(DispatchMode, Msgs, batch_offset(DispatchMode, State), ClientRef, CbModule, CbState) of
{ok, NewCbState} ->
schedule_poll(0),
{noreply, State#state{messages = [], cb_state = NewCbState}};
{noreply, State#state{messages = [], last_offset = -1, cb_state = NewCbState}};
{stop, From, Tag} ->
handle_stop(From, Tag, State),
{stop, normal, State};
Expand All @@ -114,6 +136,16 @@ code_change(_OldVsn, State, _Extra) ->

%internals

batch_offset(batch, #state{topic_name = T, partition = P, last_offset = O}) ->
{T, P, O};
batch_offset(_, _) ->
null.

dispatch_mode_parse(one_by_one) ->
{one_by_one, ?DEFAULT_BATCH_SIZE};
dispatch_mode_parse({batch, MaxBatchSize}) ->
{batch, MaxBatchSize}.

schedule_poll(Timeout) ->
erlang:send_after(Timeout, self(), poll_events).

Expand All @@ -123,24 +155,53 @@ schedule_message_process(Timeout) ->
commit_offset(ClientRef, #erlkaf_msg{topic = Topic, partition = Partition, offset = Offset}) ->
erlkaf_nif:consumer_offset_store(ClientRef, Topic, Partition, Offset).

process_events([H|T] = Msgs, ClientRef, CbModule, CbState) ->
process_events(one_by_one, Msgs, _LastBatchOffset, ClientRef, CbModule, CbState) ->
process_events_one_by_one(Msgs, ClientRef, CbModule, CbState);
process_events(batch, Msgs, LastBatchOffset, ClientRef, CbModule, CbState) ->
process_events_batch(Msgs, LastBatchOffset, ClientRef, CbModule, CbState).

process_events_batch(Msgs, LastBatchOffset, ClientRef, CbModule, CbState) ->
case catch CbModule:handle_message(Msgs, CbState) of
{ok, NewCbState} ->
{Topic, Partition, Offset} = LastBatchOffset,
ok = erlkaf_nif:consumer_offset_store(ClientRef, Topic, Partition, Offset),
{ok, NewCbState};
{error, Reason, NewCbState} ->
?ERROR_MSG("~p:handle_message for batch error: ~p", [CbModule, Reason]),
case recv_stop() of
false ->
process_events_batch(Msgs, LastBatchOffset, ClientRef, CbModule, NewCbState);
StopMsg ->
StopMsg
end;
Error ->
?ERROR_MSG("~p:handle_message for batch error: ~p", [CbModule, Error]),
case recv_stop() of
false ->
process_events_batch(Msgs, LastBatchOffset, ClientRef, CbModule, CbState);
StopMsg ->
StopMsg
end
end.

process_events_one_by_one([H|T] = Msgs, ClientRef, CbModule, CbState) ->
case recv_stop() of
false ->
case catch CbModule:handle_message(H, CbState) of
{ok, NewCbState} ->
ok = commit_offset(ClientRef, H),
process_events(T, ClientRef, CbModule, NewCbState);
process_events_one_by_one(T, ClientRef, CbModule, NewCbState);
{error, Reason, NewCbState} ->
?ERROR_MSG("~p:handle_message for: ~p error: ~p", [CbModule, H, Reason]),
process_events(Msgs, ClientRef, CbModule, NewCbState);
process_events_one_by_one(Msgs, ClientRef, CbModule, NewCbState);
Error ->
?ERROR_MSG("~p:handle_message for: ~p error: ~p", [CbModule, H, Error]),
process_events(Msgs, ClientRef, CbModule, CbState)
process_events_one_by_one(Msgs, ClientRef, CbModule, CbState)
end;
StopMsg ->
StopMsg
end;
process_events([], _ClientRef, _CbModule, CbState) ->
process_events_one_by_one([], _ClientRef, _CbModule, CbState) ->
{ok, CbState}.

recv_stop() ->
Expand All @@ -149,4 +210,4 @@ recv_stop() ->
handle_stop(From, Tag, #state{topic_name = TopicName, partition = Partition, queue_ref = Queue}) ->
?INFO_MSG("stop consumer for: ~p partition: ~p", [TopicName, Partition]),
ok = erlkaf_nif:consumer_queue_cleanup(Queue),
From ! {stopped, Tag}.
From ! {stopped, Tag}.
24 changes: 19 additions & 5 deletions src/erlkaf_consumer_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
client_ref,
cb_module,
cb_args,
dispatch_mode,
topics_map =#{},
stats_cb,
stats = []
Expand All @@ -30,14 +31,21 @@
start_link(ClientId, GroupId, Topics, EkClientConfig, RdkClientConfig, EkTopicConfig, RdkTopicConfig, CbModule, CbArgs) ->
gen_server:start_link(?MODULE, [ClientId, GroupId, Topics, EkClientConfig, RdkClientConfig, EkTopicConfig, RdkTopicConfig, CbModule, CbArgs], []).

init([ClientId, GroupId, Topics, EkClientConfig, RdkClientConfig, _EkTopicConfig, RdkTopicConfig, CbModule, CbArgs]) ->
init([ClientId, GroupId, Topics, EkClientConfig, RdkClientConfig, EkTopicConfig, RdkTopicConfig, CbModule, CbArgs]) ->
process_flag(trap_exit, true),

case erlkaf_nif:consumer_new(GroupId, Topics, RdkClientConfig, RdkTopicConfig) of
{ok, ClientRef} ->
ok = erlkaf_cache_client:set(ClientId, undefined, self()),
StatsCb = erlkaf_utils:lookup(stats_callback, EkClientConfig),
{ok, #state{client_id = ClientId, client_ref = ClientRef, cb_module = CbModule, cb_args = CbArgs, stats_cb = StatsCb}};

{ok, #state{
client_id = ClientId,
client_ref = ClientRef,
cb_module = CbModule,
cb_args = CbArgs,
dispatch_mode = erlkaf_utils:lookup(dispatch_mode, EkTopicConfig, one_by_one),
stats_cb = erlkaf_utils:lookup(stats_callback, EkClientConfig)
}};
Error ->
{stop, Error}
end.
Expand All @@ -62,11 +70,17 @@ handle_info({stats, Stats0}, #state{stats_cb = StatsCb, client_id = ClientId} =
end,
{noreply, State#state{stats = Stats}};

handle_info({assign_partitions, Partitions}, #state{client_ref = Crf, cb_module = CbModule, cb_args = CbState, topics_map = TopicsMap} = State) ->
handle_info({assign_partitions, Partitions}, #state{
client_ref = Crf,
cb_module = CbModule,
cb_args = CbState,
dispatch_mode = DispatchMode,
topics_map = TopicsMap} = State) ->

?INFO_MSG("assign partitions: ~p", [Partitions]),

PartFun = fun({TopicName, Partition, Offset, QueueRef}, Tmap) ->
{ok, Pid} = erlkaf_consumer:start_link(Crf, TopicName, Partition, Offset, QueueRef, CbModule, CbState),
{ok, Pid} = erlkaf_consumer:start_link(Crf, TopicName, DispatchMode, Partition, Offset, QueueRef, CbModule, CbState),
maps:put({TopicName, Partition}, Pid, Tmap)
end,

Expand Down
4 changes: 2 additions & 2 deletions src/erlkaf_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

consumer_new/4,
consumer_partition_revoke_completed/1,
consumer_queue_poll/1,
consumer_queue_poll/2,
consumer_queue_cleanup/1,
consumer_offset_store/4,
consumer_cleanup/1
Expand Down Expand Up @@ -56,7 +56,7 @@ consumer_new(_GroupId, _Topics, _ClientConfig, _TopicsConfig) ->
consumer_partition_revoke_completed(_ClientRef) ->
?NOT_LOADED.

consumer_queue_poll(_Queue) ->
consumer_queue_poll(_Queue, _BatchSize) ->
?NOT_LOADED.

consumer_queue_cleanup(_Queue) ->
Expand Down
3 changes: 2 additions & 1 deletion test/sys.config
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
{callback_args, []},
{topics, [<<"benchmark">>]},
{topic_options, [
{auto_offset_reset, smallest}
{auto_offset_reset, smallest},
{dispatch_mode, one_by_one}
]},

{client_options, [
Expand Down
Loading

0 comments on commit 247abb7

Please sign in to comment.