Skip to content

Commit

Permalink
Merge pull request #610 from mtvch/docs-fixes
Browse files Browse the repository at this point in the history
Docs improvements
  • Loading branch information
zmstone authored Dec 8, 2024
2 parents 5172dbe + 831da74 commit f3deba6
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion guides/examples/elixir/Consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ defmodule BrodSample.PartitionSubscriber do
@impl true
def init({topic, partition}) do
# start the consumer(s)
# if you have more than one partition, do it somewhere else once for all paritions
# if you have more than one partition, do it somewhere else once for all partitions
# (e.g. in the parent process)
:ok = :brod.start_consumer(:kafka_client, topic, begin_offset: :latest)

Expand Down
18 changes: 9 additions & 9 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ get_producer(Client, Topic, Partition) ->
Error
end.

%% @doc Get consumer of the given topic-parition.
%% @doc Get consumer of the given topic-partition.
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, get_consumer_error()}.
get_consumer(Client, Topic, Partition) ->
Expand Down Expand Up @@ -255,7 +255,7 @@ get_metadata(Client, Topic) ->
-spec get_metadata_safe(client(), topic()) ->
{ok, kpro:struct()} | {error, any()}.
get_metadata_safe(Client, Topic) ->
safe_gen_call(Client, {get_metadata, {_FetchMetdataForTopic = all, Topic}}, infinity).
safe_gen_call(Client, {get_metadata, {_FetchMetadataForTopic = all, Topic}}, infinity).

%% @doc Get number of partitions for a given topic.
-spec get_partitions_count(client(), topic()) -> {ok, pos_integer()} | {error, any()}.
Expand Down Expand Up @@ -610,11 +610,11 @@ do_get_metadata({all, Topic}, State) ->
do_get_metadata(Topic, State) when not is_tuple(Topic) ->
do_get_metadata(Topic, Topic, State).

do_get_metadata(FetchMetdataFor, Topic,
do_get_metadata(FetchMetadataFor, Topic,
#state{ client_id = ClientId
, workers_tab = Ets
} = State0) ->
Topics = case FetchMetdataFor of
Topics = case FetchMetadataFor of
all -> all; %% in case no topic is given, get all
_ -> [Topic]
end,
Expand Down Expand Up @@ -865,21 +865,21 @@ do_get_partitions_count(Client, Ets, Topic, #{allow_topic_auto_creation := Allow
get_metadata_safe(Client, Topic)
end,
%% the metadata is returned, no need to look up the cache again
%% find the topic's parition count from the metadata directly
%% find the topic's partition count from the metadata directly
find_partition_count_in_metadata(MetadataResponse, Topic)
end.

find_partition_count_in_metadata({ok, Meta}, Topic) ->
TopicMetadataArrary = kf(topics, Meta),
find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic);
TopicMetadataArray = kf(topics, Meta),
find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic);
find_partition_count_in_metadata({error, Reason}, _) ->
{error, Reason}.

find_partition_count_in_topic_metadata_array(TopicMetadataArrary, Topic) ->
find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic) ->
FilterF = fun(#{name := N}) when N =:= Topic -> true;
(_) -> false
end,
case lists:filter(FilterF, TopicMetadataArrary) of
case lists:filter(FilterF, TopicMetadataArray) of
[TopicMetadata] ->
get_partitions_count_in_metadata(TopicMetadata);
[] ->
Expand Down
2 changes: 1 addition & 1 deletion src/brod_group_subscriber_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
%% brod_group_coordinator:start_link/6} Optional</li>
%%
%% <li>`consumer_config': For partition consumer,
%% {@link brod_topic_subscriber:start_link/6}. Optional
%% {@link brod_consumer:start_link/5}. Optional
%% </li>
%%
%% <li>`message_type': The type of message that is going to be handled
Expand Down
8 changes: 4 additions & 4 deletions src/brod_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
-include("brod_int.hrl").

%% default number of messages in buffer before block callers
-define(DEFAULT_PARITION_BUFFER_LIMIT, 512).
-define(DEFAULT_PARTITION_BUFFER_LIMIT, 512).
%% default number of message sets sent on wire before block waiting for acks
-define(DEFAULT_PARITION_ONWIRE_LIMIT, 1).
-define(DEFAULT_PARTITION_ONWIRE_LIMIT, 1).
%% by default, send max 1 MB of data in one batch (message set)
-define(DEFAULT_MAX_BATCH_SIZE, 1048576).
%% by default, require acks from all ISR
Expand Down Expand Up @@ -288,8 +288,8 @@ stop(Pid) -> ok = gen_server:call(Pid, stop).
%% @private
init({ClientPid, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARITION_BUFFER_LIMIT),
OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARITION_ONWIRE_LIMIT),
BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARTITION_BUFFER_LIMIT),
OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARTITION_ONWIRE_LIMIT),
MaxBatchSize = ?config(max_batch_size, ?DEFAULT_MAX_BATCH_SIZE),
MaxRetries = ?config(max_retries, ?DEFAULT_MAX_RETRIES),
RetryBackoffMs = ?config(retry_backoff_ms, ?DEFAULT_RETRY_BACKOFF_MS),
Expand Down
2 changes: 1 addition & 1 deletion src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ start_link(Client, Topic, Partitions, ConsumerConfig,
%% functions implemented for message processing. Mandatory</li>
%%
%% <li>`consumer_config': For partition consumer, {@link
%% brod_topic_subscriber:start_link/6}. Optional, defaults to `[]'
%% brod_consumer:start_link/5}. Optional, defaults to `[]'
%% </li>
%%
%% <li>`message_type': The type of message that is going to be handled
Expand Down

0 comments on commit f3deba6

Please sign in to comment.