Skip to content

Commit

Permalink
feat: added fanout and batch message delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
gabheadz committed Jan 5, 2025
1 parent cebd61e commit 359548f
Show file tree
Hide file tree
Showing 11 changed files with 649 additions and 40 deletions.
132 changes: 121 additions & 11 deletions channel-sender/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,78 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/InvalidRequest'
$ref: '#/components/schemas/InvalidBodyResponse'

/deliver_message:
post:
tags:
- /ext/channel/
summary: Deliver an event from a message
description: Deliver an event message to a previusly registered channel_ref
summary: Deliver an event message to a channel or group of channels
description: Deliver an event message to a previusly registered channel_ref, or deliver a message to all channels related to an specific app_ref or user_ref
operationId: deliverMessage
requestBody:
description: "Triggers internal workflow to deliver message. The message may not be delivered immediately, or not at all. Depends if the channel_ref was previusly registered. The message_data schema is not enforced, but its recommeded to use CloudEvents."
content:
application/json:
schema:
$ref: '#/components/schemas/Message'
oneOf:
- $ref: '#/components/schemas/Message'
- $ref: '#/components/schemas/AppMessage'
- $ref: '#/components/schemas/UserMessage'
responses:
"202":
description: Ok
content:
application/json:
schema:
type: object
properties:
result:
type: string
example: Ok
$ref: '#/components/schemas/SuccessResponse'
"400":
description: Bad request due to invalid body or missing required fields
content:
application/json:
schema:
$ref: '#/components/schemas/InvalidRequest'
$ref: '#/components/schemas/InvalidBodyResponse'

/deliver_batch:
post:
tags:
- /ext/channel/
summary: Batch deliver up to 10 event messages
description: Deliver event messages to a group of channel_refs
operationId: deliverBatchMessages
requestBody:
description: ""
content:
application/json:
schema:
$ref: '#/components/schemas/Messages'
responses:
"202":
description: If all messages were accepted SuccessResponse is returned. If some messages were rejected PartialSuccessResponse is returned.
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/SuccessResponse'
- $ref: '#/components/schemas/PartialSuccessResponse'
"400":
description: Bad request due to invalid body
content:
application/json:
schema:
$ref: '#/components/schemas/InvalidBodyResponse'

components:
schemas:
Messages:
type: object
required:
- messages
properties:
messages:
type: array
items:
$ref: '#/components/schemas/Message'
Message:
required:
- channel_ref
Expand All @@ -97,6 +135,56 @@ components:
event_name:
type: string
example: event.productCreated
AppMessage:
required:
- app_ref
- event_name
- message_data
- message_id
type: object
properties:
app_ref:
type: string
example: app01
message_id:
type: string
format: uuid
example: d290f1ee-6c54-4b01-90e6-d701748f0851
correlation_id:
type: string
format: uuid
example: d290f1ee-6c54-4b01-90e6-d701748f0851
message_data:
type: object
example: {"product_id": "1234", "product_name": "product name"}
event_name:
type: string
example: event.productCreated
UserMessage:
required:
- user_ref
- event_name
- message_data
- message_id
type: object
properties:
user_ref:
type: string
example: user.1
message_id:
type: string
format: uuid
example: d290f1ee-6c54-4b01-90e6-d701748f0851
correlation_id:
type: string
format: uuid
example: d290f1ee-6c54-4b01-90e6-d701748f0851
message_data:
type: object
example: {"product_id": "1234", "product_name": "product name"}
event_name:
type: string
example: event.productCreated
ChannelRequest:
required:
- application_ref
Expand All @@ -121,7 +209,29 @@ components:
channel_secret:
type: string
example: SFMyNTY.g2gDaANtAAAAQWJlZWM2MzQ1MDNjMjM4ZjViODRmNzM3Mjc1YmZkNGJhLjg1NWI4MTkzYmI2ZjQxOTM4MWVhYzZjYzA4N2FlYTNmbQAAAAZ4eHh4eHhtAAAAB3h4eHh4eHhuBgDbcXMIlAFiAAFRgA.......
InvalidRequest:
SuccessResponse:
type: object
properties:
result:
type: string
example: Ok
PartialSuccessResponse:
type: object
properties:
result:
type: string
example: partial-success
accepted_messages:
type: integer
example: 5
rejected_messages:
type: integer
example: 2
discarded:
type: array
items:
$ref: '#/components/schemas/Message'
InvalidBodyResponse:
required:
- error
- request
Expand Down
2 changes: 1 addition & 1 deletion channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule ChannelSenderEx.Core.Channel do
@type output_message() :: {delivery_ref(), ProtocolMessage.t()}
@type pending_ack() :: BoundedMap.t()
@type pending_sending() :: BoundedMap.t()
@type deliver_response :: :accepted_waiting | :accepted_connected

defmodule Data do
@moduledoc """
Expand Down Expand Up @@ -81,7 +82,6 @@ defmodule ChannelSenderEx.Core.Channel do
@doc """
operation to request a message delivery
"""
@type deliver_response :: :accepted_waiting | :accepted_connected
@spec deliver_message(:gen_statem.server_ref(), ProtocolMessage.t()) :: deliver_response()
def deliver_message(server, message) do
GenStateMachine.call(server, {:deliver_message, message},
Expand Down
31 changes: 23 additions & 8 deletions channel-sender/lib/channel_sender_ex/core/channel_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do
use Horde.Registry
require Logger

@type channel_ref :: String.t()
@type channel_addr :: pid()

def start_link(_) do
Horde.Registry.start_link(__MODULE__, [keys: :unique], name: __MODULE__)
end
Expand All @@ -17,8 +20,6 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do
result
end

@type channel_ref :: String.t()
@type channel_addr :: pid()
@spec lookup_channel_addr(channel_ref()) :: :noproc | channel_addr()
@compile {:inline, lookup_channel_addr: 1}
def lookup_channel_addr(channel_ref) do
Expand All @@ -28,6 +29,26 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do
end
end

@spec query_by_app(String.t()) :: Enumerable.t()
def query_by_app(app) do
# select pattern is: $1 = channel_ref, $2 = pid, $3 = app_ref, $4 = user_ref
# guard condition is to match $3 with app ref
# return $2 which is the pid of the process
Stream.map(Horde.Registry.select(__MODULE__, [
{{:"$1", :"$2", {:"$3", :"$4"}}, [{:==, :"$3", app}], [:"$2"]}
]), fn pid -> pid end)
end

@spec query_by_user(String.t()) :: Enumerable.t()
def query_by_user(user) do
# select pattern is: $1 = channel_ref, $2 = pid, $3 = app_ref, $4 = user_ref
# guard condition is to match $4 with user ref
# return $2 which is the pid of the process
Stream.map(Horde.Registry.select(__MODULE__, [
{{:"$1", :"$2", {:"$3", :"$4"}}, [{:==, :"$4", user}], [:"$2"]}
]), fn pid -> pid end)
end

@compile {:inline, via_tuple: 1}
def via_tuple(channel_ref), do: {:via, Horde.Registry, {__MODULE__, channel_ref}}

Expand All @@ -37,10 +58,4 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do
Enum.map([Node.self() | Node.list()], &{__MODULE__, &1})
end

# def lookup_channel_addr(channel_ref, registry) do
# case @registry_module.lookup(via_tuple(channel_ref, registry)) do
# [{pid, _}] -> pid
# [] -> :noproc
# end
# end
end
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do

@spec channel_child_spec(channel_init_args()) :: any()
@compile {:inline, channel_child_spec: 1}
def channel_child_spec(channel_args = {channel_ref, _application, _user_ref}) do
channel_child_spec(channel_args, via_tuple(channel_ref))
def channel_child_spec(channel_args = {channel_ref, application, user_ref}) do
channel_child_spec(channel_args, via_tuple(channel_ref, application, user_ref))
end

@compile {:inline, channel_child_spec: 2}
Expand All @@ -49,8 +49,8 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do
}
end

defp via_tuple(name) do
{:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, name}}
defp via_tuple(ref, app, usr) do
{:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, ref, {app, usr}}}
end

defp get_shutdown_tolerance do
Expand Down
30 changes: 29 additions & 1 deletion channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do
import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [execute: 5]

@type channel_ref() :: String.t()
@type app_ref() :: String.t()
@type delivery_result() :: %{accepted_waiting: number(), accepted_connected: number()}

@max_retries 10
@min_backoff 50
@max_backoff 2000

@spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: any()
@doc """
Delivers a message to a single channel associated with the given channel reference.
If the channel is not found, the message is retried up to @max_retries times with exponential backoff.
"""
@spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: Channel.deliver_response()
def deliver_to_channel(channel_ref, message) do
action_fn = fn _ -> do_deliver_to_channel(channel_ref, message) end
execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn ->
Expand All @@ -29,6 +35,28 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do
:error
end

@doc """
Delivers a message to all channels associated with the given application reference. The message is delivered to each channel in a separate process.
No retries are performed since the message is delivered to existing and queriyable channels at the given time.
"""
@spec deliver_to_app_channels(app_ref(), ProtocolMessage.t()) :: delivery_result()
def deliver_to_app_channels(app_ref, message) do
ChannelRegistry.query_by_app(app_ref)
|> Stream.map(fn pid -> Channel.deliver_message(pid, message) end)
|> Enum.frequencies()
end

@doc """
Delivers a message to all channels associated with the given user reference. The message is delivered to each channel in a separate process.
No retries are performed since the message is delivered to existing and queriyable channels at the given time.
"""
@spec deliver_to_user_channels(app_ref(), ProtocolMessage.t()) :: delivery_result()
def deliver_to_user_channels(user_ref, message) do
ChannelRegistry.query_by_user(user_ref)
|> Stream.map(fn pid -> Channel.deliver_message(pid, message) end)
|> Enum.frequencies()
end

defp do_deliver_to_channel(channel_ref, message) do
case ChannelRegistry.lookup_channel_addr(channel_ref) do
pid when is_pid(pid) -> Channel.deliver_message(pid, message)
Expand Down
Loading

0 comments on commit 359548f

Please sign in to comment.