Skip to content

Commit

Permalink
[AUTO] [ RTI-14085 ] Update Dependencies (#79)
Browse files Browse the repository at this point in the history
* Update dependencies

* [update-deps] Thank you, Elvis!

---------

Co-authored-by: Brujo Benavides Rodriguez <[email protected]>
  • Loading branch information
adroll-rtb-ci and elbrujohalcon authored Mar 6, 2023
1 parent 8d9326c commit 30ddad5
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@
{project_plugins,
[{rebar3_hex, "~> 7.0.4"},
{rebar3_format, "~> 1.2.1"},
{rebar3_lint, "~> 2.0.1"},
{rebar3_lint, "~> 3.0.0"},
{rebar3_hank, "~> 1.3.0"},
{rebar3_sheldon, "~> 0.4.2"}]}.
2 changes: 1 addition & 1 deletion src/kinetic_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ handle_call(stop, _From, State) ->
handle_cast(_Arg, State) ->
{noreply, State}.

terminate(_Reason, _State = #kinetic_config{tref = TRef}) ->
terminate(_Reason, #kinetic_config{tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
true = ets:delete(?KINETIC_DATA),
true = ets:delete(?KINETIC_STREAM),
Expand Down
30 changes: 15 additions & 15 deletions src/kinetic_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ init([StreamName,
% buffer + Data is not bigger than ?KINESIS_MAX_PUT_SIZE
handle_call({put_record, Data, DataSize},
_From,
State = #kinetic_stream{buffer_size = BSize})
#kinetic_stream{buffer_size = BSize} = State)
when BSize + DataSize > ?KINESIS_MAX_PUT_SIZE ->
NewState = internal_flush(State),
{reply, ok, reset_timer(NewState#kinetic_stream{buffer_size = DataSize, buffer = Data})};
handle_call({put_record, Data, DataSize},
_From,
State = #kinetic_stream{buffer = Buffer, buffer_size = BSize}) ->
#kinetic_stream{buffer = Buffer, buffer_size = BSize} = State) ->
{reply,
ok,
reset_timer(State#kinetic_stream{buffer = <<Buffer/binary, Data/binary>>,
Expand All @@ -100,9 +100,9 @@ handle_call(stop, _From, State) ->
handle_cast(_Arg, State) ->
{noreply, State}.

terminate(_Reason, #kinetic_stream{stream_name = StreamName, flush_tref = Tref}) ->
terminate(_Reason, #kinetic_stream{stream_name = StreamName, flush_tref = TRef}) ->
ets:delete(?MODULE, StreamName),
timer:cancel(Tref),
timer:cancel(TRef),
ok.

code_change(_OldVsn, State, _Extra) ->
Expand Down Expand Up @@ -139,32 +139,32 @@ get_stream(StreamName, Config) ->
end
end.

internal_flush(State = #kinetic_stream{buffer = <<"">>}) ->
internal_flush(#kinetic_stream{buffer = <<"">>} = State) ->
State;
internal_flush(State =
#kinetic_stream{stream_name = StreamName,
buffer = Buffer,
timeout = Timeout,
retries = Retries}) ->
internal_flush(#kinetic_stream{stream_name = StreamName,
buffer = Buffer,
timeout = Timeout,
retries = Retries} =
State) ->
PartitionKey = partition_key(State),
spawn_link(fun() ->
send_to_kinesis(StreamName, Buffer, PartitionKey, Timeout, Retries + 1)
end),
increment_partition_num(State#kinetic_stream{buffer = <<"">>, buffer_size = 0}).

increment_partition_num(State =
#kinetic_stream{current_partition_num = Number,
partitions_number = Number}) ->
increment_partition_num(#kinetic_stream{current_partition_num = Number,
partitions_number = Number} =
State) ->
State#kinetic_stream{current_partition_num = 0};
increment_partition_num(State = #kinetic_stream{current_partition_num = Number}) ->
increment_partition_num(#kinetic_stream{current_partition_num = Number} = State) ->
State#kinetic_stream{current_partition_num = Number + 1}.

partition_key(#kinetic_stream{current_partition_num = Number,
base_partition_name = BasePartitionName}) ->
BinNumber = integer_to_binary(Number),
<<BasePartitionName/binary, "-", BinNumber/binary>>.

reset_timer(State = #kinetic_stream{flush_interval = FlushInterval, flush_tref = TRef}) ->
reset_timer(#kinetic_stream{flush_interval = FlushInterval, flush_tref = TRef} = State) ->
timer:cancel(TRef),
{ok, NewTRef} = timer:send_after(FlushInterval, self(), flush),
State#kinetic_stream{flush_tref = NewTRef}.
Expand Down

0 comments on commit 30ddad5

Please sign in to comment.