From 359548ffca32e9b01f55b9fc48f568f12077cde4 Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Date: Sun, 5 Jan 2025 09:08:13 -0500 Subject: [PATCH] feat: added fanout and batch message delivery --- channel-sender/docs/swagger.yaml | 132 ++++++++++- .../lib/channel_sender_ex/core/channel.ex | 2 +- .../core/channel_registry.ex | 31 ++- .../core/channel_supervisor.ex | 8 +- .../core/pubsub/pub_sub_core.ex | 30 ++- .../transport/rest/rest_controller.ex | 185 ++++++++++++++- channel-sender/mix.exs | 2 +- .../core/channel_registry_test.exs | 32 +++ .../channel_sender_ex/core/channel_test.exs | 6 + .../core/pubsub/pub_sub_core_test.exs | 41 +++- .../transport/rest/rest_controller_test.exs | 220 ++++++++++++++++++ 11 files changed, 649 insertions(+), 40 deletions(-) create mode 100644 channel-sender/test/channel_sender_ex/core/channel_registry_test.exs diff --git a/channel-sender/docs/swagger.yaml b/channel-sender/docs/swagger.yaml index dbacbe0..f5030a2 100644 --- a/channel-sender/docs/swagger.yaml +++ b/channel-sender/docs/swagger.yaml @@ -38,40 +38,78 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/InvalidRequest' + $ref: '#/components/schemas/InvalidBodyResponse' /deliver_message: post: tags: - /ext/channel/ - summary: Deliver an event from a message - description: Deliver an event message to a previusly registered channel_ref + summary: Deliver an event message to a channel or group of channels + description: Deliver an event message to a previusly registered channel_ref, or deliver a message to all channels related to an specific app_ref or user_ref operationId: deliverMessage requestBody: description: "Triggers internal workflow to deliver message. The message may not be delivered immediately, or not at all. Depends if the channel_ref was previusly registered. The message_data schema is not enforced, but its recommeded to use CloudEvents." content: application/json: schema: - $ref: '#/components/schemas/Message' + oneOf: + - $ref: '#/components/schemas/Message' + - $ref: '#/components/schemas/AppMessage' + - $ref: '#/components/schemas/UserMessage' responses: "202": description: Ok content: application/json: schema: - type: object - properties: - result: - type: string - example: Ok + $ref: '#/components/schemas/SuccessResponse' "400": description: Bad request due to invalid body or missing required fields content: application/json: schema: - $ref: '#/components/schemas/InvalidRequest' + $ref: '#/components/schemas/InvalidBodyResponse' + + /deliver_batch: + post: + tags: + - /ext/channel/ + summary: Batch deliver up to 10 event messages + description: Deliver event messages to a group of channel_refs + operationId: deliverBatchMessages + requestBody: + description: "" + content: + application/json: + schema: + $ref: '#/components/schemas/Messages' + responses: + "202": + description: If all messages were accepted SuccessResponse is returned. If some messages were rejected PartialSuccessResponse is returned. + content: + application/json: + schema: + oneOf: + - $ref: '#/components/schemas/SuccessResponse' + - $ref: '#/components/schemas/PartialSuccessResponse' + "400": + description: Bad request due to invalid body + content: + application/json: + schema: + $ref: '#/components/schemas/InvalidBodyResponse' + components: schemas: + Messages: + type: object + required: + - messages + properties: + messages: + type: array + items: + $ref: '#/components/schemas/Message' Message: required: - channel_ref @@ -97,6 +135,56 @@ components: event_name: type: string example: event.productCreated + AppMessage: + required: + - app_ref + - event_name + - message_data + - message_id + type: object + properties: + app_ref: + type: string + example: app01 + message_id: + type: string + format: uuid + example: d290f1ee-6c54-4b01-90e6-d701748f0851 + correlation_id: + type: string + format: uuid + example: d290f1ee-6c54-4b01-90e6-d701748f0851 + message_data: + type: object + example: {"product_id": "1234", "product_name": "product name"} + event_name: + type: string + example: event.productCreated + UserMessage: + required: + - user_ref + - event_name + - message_data + - message_id + type: object + properties: + user_ref: + type: string + example: user.1 + message_id: + type: string + format: uuid + example: d290f1ee-6c54-4b01-90e6-d701748f0851 + correlation_id: + type: string + format: uuid + example: d290f1ee-6c54-4b01-90e6-d701748f0851 + message_data: + type: object + example: {"product_id": "1234", "product_name": "product name"} + event_name: + type: string + example: event.productCreated ChannelRequest: required: - application_ref @@ -121,7 +209,29 @@ components: channel_secret: type: string example: SFMyNTY.g2gDaANtAAAAQWJlZWM2MzQ1MDNjMjM4ZjViODRmNzM3Mjc1YmZkNGJhLjg1NWI4MTkzYmI2ZjQxOTM4MWVhYzZjYzA4N2FlYTNmbQAAAAZ4eHh4eHhtAAAAB3h4eHh4eHhuBgDbcXMIlAFiAAFRgA....... - InvalidRequest: + SuccessResponse: + type: object + properties: + result: + type: string + example: Ok + PartialSuccessResponse: + type: object + properties: + result: + type: string + example: partial-success + accepted_messages: + type: integer + example: 5 + rejected_messages: + type: integer + example: 2 + discarded: + type: array + items: + $ref: '#/components/schemas/Message' + InvalidBodyResponse: required: - error - request diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index b12f403..12dc72d 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -17,6 +17,7 @@ defmodule ChannelSenderEx.Core.Channel do @type output_message() :: {delivery_ref(), ProtocolMessage.t()} @type pending_ack() :: BoundedMap.t() @type pending_sending() :: BoundedMap.t() + @type deliver_response :: :accepted_waiting | :accepted_connected defmodule Data do @moduledoc """ @@ -81,7 +82,6 @@ defmodule ChannelSenderEx.Core.Channel do @doc """ operation to request a message delivery """ - @type deliver_response :: :accepted_waiting | :accepted_connected @spec deliver_message(:gen_statem.server_ref(), ProtocolMessage.t()) :: deliver_response() def deliver_message(server, message) do GenStateMachine.call(server, {:deliver_message, message}, diff --git a/channel-sender/lib/channel_sender_ex/core/channel_registry.ex b/channel-sender/lib/channel_sender_ex/core/channel_registry.ex index 6d42a59..578f072 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel_registry.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel_registry.ex @@ -5,6 +5,9 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do use Horde.Registry require Logger + @type channel_ref :: String.t() + @type channel_addr :: pid() + def start_link(_) do Horde.Registry.start_link(__MODULE__, [keys: :unique], name: __MODULE__) end @@ -17,8 +20,6 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do result end - @type channel_ref :: String.t() - @type channel_addr :: pid() @spec lookup_channel_addr(channel_ref()) :: :noproc | channel_addr() @compile {:inline, lookup_channel_addr: 1} def lookup_channel_addr(channel_ref) do @@ -28,6 +29,26 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do end end + @spec query_by_app(String.t()) :: Enumerable.t() + def query_by_app(app) do + # select pattern is: $1 = channel_ref, $2 = pid, $3 = app_ref, $4 = user_ref + # guard condition is to match $3 with app ref + # return $2 which is the pid of the process + Stream.map(Horde.Registry.select(__MODULE__, [ + {{:"$1", :"$2", {:"$3", :"$4"}}, [{:==, :"$3", app}], [:"$2"]} + ]), fn pid -> pid end) + end + + @spec query_by_user(String.t()) :: Enumerable.t() + def query_by_user(user) do + # select pattern is: $1 = channel_ref, $2 = pid, $3 = app_ref, $4 = user_ref + # guard condition is to match $4 with user ref + # return $2 which is the pid of the process + Stream.map(Horde.Registry.select(__MODULE__, [ + {{:"$1", :"$2", {:"$3", :"$4"}}, [{:==, :"$4", user}], [:"$2"]} + ]), fn pid -> pid end) + end + @compile {:inline, via_tuple: 1} def via_tuple(channel_ref), do: {:via, Horde.Registry, {__MODULE__, channel_ref}} @@ -37,10 +58,4 @@ defmodule ChannelSenderEx.Core.ChannelRegistry do Enum.map([Node.self() | Node.list()], &{__MODULE__, &1}) end -# def lookup_channel_addr(channel_ref, registry) do -# case @registry_module.lookup(via_tuple(channel_ref, registry)) do -# [{pid, _}] -> pid -# [] -> :noproc -# end -# end end diff --git a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex index a27af64..5747a4c 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex @@ -35,8 +35,8 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do @spec channel_child_spec(channel_init_args()) :: any() @compile {:inline, channel_child_spec: 1} - def channel_child_spec(channel_args = {channel_ref, _application, _user_ref}) do - channel_child_spec(channel_args, via_tuple(channel_ref)) + def channel_child_spec(channel_args = {channel_ref, application, user_ref}) do + channel_child_spec(channel_args, via_tuple(channel_ref, application, user_ref)) end @compile {:inline, channel_child_spec: 2} @@ -49,8 +49,8 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do } end - defp via_tuple(name) do - {:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, name}} + defp via_tuple(ref, app, usr) do + {:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, ref, {app, usr}}} end defp get_shutdown_tolerance do diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex index d790920..948a2d7 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex @@ -11,12 +11,18 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [execute: 5] @type channel_ref() :: String.t() + @type app_ref() :: String.t() + @type delivery_result() :: %{accepted_waiting: number(), accepted_connected: number()} @max_retries 10 @min_backoff 50 @max_backoff 2000 - @spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: any() + @doc """ + Delivers a message to a single channel associated with the given channel reference. + If the channel is not found, the message is retried up to @max_retries times with exponential backoff. + """ + @spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: Channel.deliver_response() def deliver_to_channel(channel_ref, message) do action_fn = fn _ -> do_deliver_to_channel(channel_ref, message) end execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn -> @@ -29,6 +35,28 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do :error end + @doc """ + Delivers a message to all channels associated with the given application reference. The message is delivered to each channel in a separate process. + No retries are performed since the message is delivered to existing and queriyable channels at the given time. + """ + @spec deliver_to_app_channels(app_ref(), ProtocolMessage.t()) :: delivery_result() + def deliver_to_app_channels(app_ref, message) do + ChannelRegistry.query_by_app(app_ref) + |> Stream.map(fn pid -> Channel.deliver_message(pid, message) end) + |> Enum.frequencies() + end + + @doc """ + Delivers a message to all channels associated with the given user reference. The message is delivered to each channel in a separate process. + No retries are performed since the message is delivered to existing and queriyable channels at the given time. + """ + @spec deliver_to_user_channels(app_ref(), ProtocolMessage.t()) :: delivery_result() + def deliver_to_user_channels(user_ref, message) do + ChannelRegistry.query_by_user(user_ref) + |> Stream.map(fn pid -> Channel.deliver_message(pid, message) end) + |> Enum.frequencies() + end + defp do_deliver_to_channel(channel_ref, message) do case ChannelRegistry.lookup_channel_addr(channel_ref) do pid when is_pid(pid) -> Channel.deliver_message(pid, message) diff --git a/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex b/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex index c45b69c..a1da639 100644 --- a/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex +++ b/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex @@ -23,6 +23,7 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do get("/health", do: send_resp(conn, 200, "UP")) post("/ext/channel/create", do: create_channel(conn)) post("/ext/channel/deliver_message", do: deliver_message(conn)) + post("/ext/channel/deliver_batch", do: deliver_message(conn)) match(_, do: send_resp(conn, 404, "Route not found.")) defp create_channel(conn) do @@ -60,6 +61,20 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do route_deliver(conn.body_params, conn) end + defp route_deliver(_body = %{ + messages: messages + }, conn) do + + # takes N first messages and separates them into valid and invalid messages + {valid_messages, invalid_messages} = batch_separate_messages(messages) + + valid_messages + |> perform_delivery + + batch_build_response({valid_messages, invalid_messages}, messages, conn) + + end + defp route_deliver( message = %{ channel_ref: channel_ref, @@ -69,9 +84,52 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do event_name: _event_name }, conn ) do + assert_deliver_request(message) + |> perform_delivery(%{"channel_ref" => channel_ref}) + |> build_and_send_response(conn) + end - # assert that minimum required fields are present - is_valid = message + defp route_deliver( + message = %{ + app_ref: app_ref, + message_id: _message_id, + correlation_id: _correlation_id, + message_data: _message_data, + event_name: _event_name + }, conn + ) do + + assert_deliver_request(message) + |> perform_delivery(%{"app_ref" => app_ref}) + |> build_and_send_response(conn) + + end + + defp route_deliver( + message = %{ + user_ref: user_ref, + message_id: _message_id, + correlation_id: _correlation_id, + message_data: _message_data, + event_name: _event_name + }, conn + ) do + + assert_deliver_request(message) + |> perform_delivery(%{"user_ref" => user_ref}) + |> build_and_send_response(conn) + + end + + defp route_deliver(_, conn), do: invalid_body(conn) + + #""" + # Asserts that the message is a valid delivery request + #""" + @spec assert_deliver_request(map()) :: {:ok, map()} | {:error, :invalid_message} + defp assert_deliver_request(message) do + # Check if minimal fields are present and not nil + result = message |> Enum.all?(fn {key, value} -> case key do :message_data -> @@ -83,21 +141,123 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do end end) - case is_valid do + case result do true -> - Task.start(fn -> PubSubCore.deliver_to_channel(channel_ref, - Map.drop(message, [:channel_ref]) |> ProtocolMessage.to_protocol_message) - end) - conn - |> put_resp_header("content-type", "application/json") - |> send_resp(202, Jason.encode!(%{result: "Ok"})) - + {:ok, message} false -> - invalid_body(conn) + {:error, :invalid_message} end end - defp route_deliver(_, conn), do: invalid_body(conn) + defp perform_delivery(messages) when is_list(messages) do + Enum.map(messages, fn message -> + Task.start(fn -> + {channel_ref, new_msg} = Map.pop(message, :channel_ref) + PubSubCore.deliver_to_channel(channel_ref, ProtocolMessage.to_protocol_message(new_msg)) + end) + end) + end + + defp perform_delivery({:ok, message}, %{"channel_ref" => channel_ref}) do + Task.start(fn -> + new_msg = message + |> Map.drop([:channel_ref]) + |> ProtocolMessage.to_protocol_message + + PubSubCore.deliver_to_channel(channel_ref, new_msg) + end) + {202, %{result: "Ok"}} + end + + defp perform_delivery({:ok, message}, %{"app_ref" => app_ref}) do + Task.start(fn -> + new_msg = message + |> Map.drop([:app_ref]) + |> ProtocolMessage.to_protocol_message + PubSubCore.deliver_to_app_channels(app_ref, new_msg) + end) + + {202, %{result: "Ok"}} + end + + defp perform_delivery({:ok, message}, %{"user_ref" => user_ref}) do + Task.start(fn -> + new_msg = message + |> Map.drop([:user_ref]) + |> ProtocolMessage.to_protocol_message + PubSubCore.deliver_to_user_channels(user_ref, new_msg) + end) + + {202, %{result: "Ok"}} + end + + defp perform_delivery(e = {:error, :invalid_message}, _) do + {400, e} + end + + @spec batch_separate_messages([map()]) :: {[map()], [map()]} + defp batch_separate_messages(messages) do + {valid, invalid} = Enum.take(messages, 10) + |> Enum.map(fn message -> + case assert_deliver_request(message) do + {:ok, _} -> + {:ok, message} + {:error, _} -> + {:error, {message, :invalid_message}} + end + end) + |> Enum.split_with(fn {outcome, _detail} -> case outcome do + :ok -> true + :error -> false + end + end) + + { + Enum.map(valid, fn {:ok, message} -> message end), + Enum.map(invalid, fn {:error, {message, _}} -> message end) + } + end + + defp batch_build_response({valid, invalid}, messages, conn) do + original_size = length(messages) + l_valid = length(valid) + l_invalid = length(invalid) + case {l_valid, l_invalid} do + {0, 0} -> + build_and_send_response({400, nil}, conn) + {0, i} when i > 0 -> + build_and_send_response({400, %{result: "invalid-messages", + accepted_messages: 0, + discarded_messages: i, + discarded: invalid}}, conn) + {v, 0} -> + procesed = l_valid + l_invalid + discarded = original_size - procesed + msg = case discarded do + 0 -> %{result: "Ok"} + _ -> %{result: "partial-success", + accepted_messages: v, + discarded_messages: discarded, + discarded: Enum.drop(messages, 10)} + end + build_and_send_response({202, msg}, conn) + {v, i} -> + build_and_send_response({202, %{result: "partial-success", + accepted_messages: v, + discarded_messages: i, + discarded: invalid}}, conn) + end + end + + defp build_and_send_response({202, body}, conn) do + conn + |> put_resp_header("content-type", "application/json") + |> send_resp(202, Jason.encode!(body)) + end + + defp build_and_send_response({400, _}, conn) do + invalid_body(conn) + end @compile {:inline, invalid_body: 1} defp invalid_body(conn = %{body_params: invalid_body}) do @@ -105,4 +265,5 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do |> put_resp_header("content-type", "application/json") |> send_resp(400, Jason.encode!(%{error: "Invalid request", request: invalid_body})) end + end diff --git a/channel-sender/mix.exs b/channel-sender/mix.exs index 9d9c84c..940e9f4 100644 --- a/channel-sender/mix.exs +++ b/channel-sender/mix.exs @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.MixProject do def project do [ app: :channel_sender_ex, - version: "0.2.0", + version: "0.2.1", elixir: "~> 1.16", start_permanent: Mix.env() == :prod, deps: deps(), diff --git a/channel-sender/test/channel_sender_ex/core/channel_registry_test.exs b/channel-sender/test/channel_sender_ex/core/channel_registry_test.exs new file mode 100644 index 0000000..e2d3528 --- /dev/null +++ b/channel-sender/test/channel_sender_ex/core/channel_registry_test.exs @@ -0,0 +1,32 @@ +Code.compiler_options(ignore_module_conflict: true) + +defmodule ChannelSenderEx.Core.ChannelRegistryTest do + use ExUnit.Case, sync: true + import Mock + + alias ChannelSenderEx.Core.ChannelRegistry + alias Horde.Registry + + @moduletag :capture_log + + test "Should query by app" do + with_mocks([ + {Registry, [], [ + select: fn(_, _) -> [:c.pid(0, 255, 0), :c.pid(0, 254, 0)] end + ]} + ]) do + assert [_, _] = ChannelRegistry.query_by_app("app_ref") |> Enum.to_list() + end + end + + test "Should query by user" do + with_mocks([ + {Registry, [], [ + select: fn(_, _) -> [:c.pid(0, 255, 0), :c.pid(0, 254, 0)] end + ]} + ]) do + assert [_, _] = ChannelRegistry.query_by_user("user_ref") |> Enum.to_list() + end + end + +end diff --git a/channel-sender/test/channel_sender_ex/core/channel_test.exs b/channel-sender/test/channel_sender_ex/core/channel_test.exs index 029a868..7aeb8c5 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_test.exs @@ -5,6 +5,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do import Mock alias ChannelSenderEx.Core.Channel + alias ChannelSenderEx.Core.Channel.Data alias ChannelSenderEx.Core.ChannelIDGenerator alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.RulesProvider @@ -62,6 +63,11 @@ defmodule ChannelSenderEx.Core.ChannelTest do message_to_send = ProtocolMessage.to_protocol_message(message) :accepted_connected = Channel.deliver_message(pid, message_to_send) assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send} + + {_, app, user} = init_args + data = %Data{application: app, user_ref: user} + assert data.application == app + Process.exit(pid, :kill) end diff --git a/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs b/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs index 5ee4456..7210af8 100644 --- a/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs +++ b/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs @@ -1,20 +1,57 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCoreTest do use ExUnit.Case + alias ChannelSenderEx.Core.Channel alias ChannelSenderEx.Core.ChannelRegistry alias ChannelSenderEx.Core.PubSub.PubSubCore + import Mock - test "should retry when channel not found" do + test "should deliver to channel" do + with_mocks([ + {ChannelRegistry, [], [ + lookup_channel_addr: fn(_) -> :c.pid(0, 255, 0) end + ]}, + {Channel, [], [deliver_message: fn(_, _) -> :accepted_connected end]} + ]) do + assert :accepted_connected == PubSubCore.deliver_to_channel("channel_ref", %{}) + assert_called_exactly ChannelRegistry.lookup_channel_addr("channel_ref"), 1 + end + end + test "should retry when channel not found" do with_mock( ChannelRegistry, [lookup_channel_addr: fn(_) -> :noproc end] ) do - assert :error == PubSubCore.deliver_to_channel("channel_ref", %{}) + assert_called_exactly ChannelRegistry.lookup_channel_addr("channel_ref"), 10 + end + end + test "should deliver to all channels associated with the given application reference" do + with_mocks([ + {ChannelRegistry, [], [ + query_by_app: fn(_) -> [:c.pid(0, 255, 0), :c.pid(0, 254, 0)] end + ]}, + {Channel, [], [deliver_message: fn(_, _) -> :accepted_connected end]} + ]) do + assert %{accepted_connected: 2} == PubSubCore.deliver_to_app_channels("app_ref", %{}) + assert_called_exactly ChannelRegistry.query_by_app("app_ref"), 1 + assert_called_exactly Channel.deliver_message(:_, :_), 2 end + end + test "should deliver to all channels associated with the given user reference" do + with_mocks([ + {ChannelRegistry, [], [ + query_by_user: fn(_) -> [:c.pid(0, 255, 0), :c.pid(0, 254, 0)] end + ]}, + {Channel, [], [deliver_message: fn(_, _) -> :accepted_connected end]} + ]) do + assert %{accepted_connected: 2} == PubSubCore.deliver_to_user_channels("user_ref", %{}) + assert_called_exactly ChannelRegistry.query_by_user("user_ref"), 1 + assert_called_exactly Channel.deliver_message(:_, :_), 2 + end end end diff --git a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs index 8db9e83..ea2bdbd 100644 --- a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs @@ -93,6 +93,226 @@ defmodule ChannelSenderEx.Transport.Rest.RestControllerTest do end + test "Should send message to app channels" do + + body = + Jason.encode!(%{ + app_ref: "app01", + message_id: "message_id", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + }) + + with_mock PubSubCore, [deliver_to_app_channels: fn(_app_ref, _msg) -> + %{accepted_waiting: 1, accepted_connected: 3} + end] do + conn = conn(:post, "/ext/channel/deliver_message", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 202 + + assert %{"result" => "Ok"} = Jason.decode!(conn.resp_body) + + assert Enum.member?(conn.resp_headers, {"content-type", "application/json"}) + end + end + + test "Should send message to users channels" do + + body = + Jason.encode!(%{ + user_ref: "user01", + message_id: "message_id", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + }) + + with_mock PubSubCore, [deliver_to_user_channels: fn(_user_ref, _msg) -> + %{accepted_waiting: 0, accepted_connected: 2} + end] do + conn = conn(:post, "/ext/channel/deliver_message", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 202 + + assert %{"result" => "Ok"} = Jason.decode!(conn.resp_body) + + assert Enum.member?(conn.resp_headers, {"content-type", "application/json"}) + end + end + + test "Should send message batch of max messages allowed" do + + messages = Enum.map(1..10, fn i -> + %{ + channel_ref: "channel_ref_#{i}", + message_id: "message_#{i}", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + } + end) |> Enum.to_list() + + body = + Jason.encode!(%{ + messages: messages + }) + + with_mock PubSubCore, [deliver_to_channel: fn(_channel_ref, _msg) -> + :accepted_connected + end] do + conn = conn(:post, "/ext/channel/deliver_batch", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 202 + + assert %{"result" => "Ok"} = Jason.decode!(conn.resp_body) + + assert Enum.member?(conn.resp_headers, {"content-type", "application/json"}) + end + end + + test "Should send message batch of max allowed messages and discard the rest" do + + messages = Enum.map(1..15, fn i -> + %{ + channel_ref: "channel_ref_#{i}", + message_id: "message_#{i}", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + } + end) |> Enum.to_list() + + body = + Jason.encode!(%{ + messages: messages + }) + + with_mock PubSubCore, [deliver_to_channel: fn(_channel_ref, _msg) -> + :accepted_connected + end] do + conn = conn(:post, "/ext/channel/deliver_batch", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 202 + + result = Jason.decode!(conn.resp_body) + assert result["result"] == "partial-success" + assert result["accepted_messages"] == 10 + assert result["discarded_messages"] == 5 + assert length(result["discarded"]) == 5 + end + end + + test "Should handle invalid messages in a batch" do + messages = Enum.map(1..5, fn i -> + ref = if i == 5 do + "" + else + "ref00000_#{i}" + end + %{ + channel_ref: ref, + message_id: "message_#{i}", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + } + end) |> Enum.to_list() + + body = + Jason.encode!(%{ + messages: messages + }) + + with_mock PubSubCore, [deliver_to_channel: fn(_channel_ref, _msg) -> + :accepted_connected + end] do + conn = conn(:post, "/ext/channel/deliver_batch", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 202 + + result = Jason.decode!(conn.resp_body) + + assert result["result"] == "partial-success" + assert result["accepted_messages"] == 4 + assert result["discarded_messages"] == 1 + assert length(result["discarded"]) == 1 + end + end + + test "Should handle invalid request batch" do + body = + Jason.encode!(%{ + messages: [] + }) + + with_mock PubSubCore, [deliver_to_channel: fn(_channel_ref, _msg) -> + :accepted_connected + end] do + conn = conn(:post, "/ext/channel/deliver_batch", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 400 + + result = Jason.decode!(conn.resp_body) + + assert result["error"] == "Invalid request" + assert result["request"] == %{"messages" => []} + + end + end + + test "Should handle batch with all messages invalid" do + body = + Jason.encode!(%{ + messages: [%{ + channel_ref: "", + message_id: "message_1", + correlation_id: "correlation_id", + message_data: "message_data", + event_name: "event_name" + }] + }) + + with_mock PubSubCore, [deliver_to_channel: fn(_channel_ref, _msg) -> + :accepted_connected + end] do + conn = conn(:post, "/ext/channel/deliver_batch", body) + |> put_req_header("content-type", "application/json") + + conn = RestController.call(conn, @options) + + assert conn.status == 400 + + result = Jason.decode!(conn.resp_body) + + assert result["error"] == "Invalid request" + assert result["request"] == %{"messages" => [%{"channel_ref" => "", + "correlation_id" => "correlation_id", + "event_name" => "event_name", + "message_data" => "message_data", + "message_id" => "message_1"}]} + + end + end + test "Should fail on invalid body" do body = Jason.encode!(%{