From 7f4aaf949f82b141773d0e2fddc826cfa6e2113b Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Date: Wed, 3 Apr 2024 16:36:46 -0500 Subject: [PATCH 1/2] feat: bridge core new features. --- .gitignore | 31 +++ channel-bridge/.gitignore | 6 +- .../apps/bridge_core/lib/bridge_core.ex | 39 ++- .../bridge_core/lib/bridge_core/app_client.ex | 18 +- .../bridge_core/boundary/channel_manager.ex | 179 ++++++------- .../bridge_core/lib/bridge_core/channel.ex | 118 +++++++- .../lib/bridge_core/cloud_event.ex | 37 +++ .../lib/bridge_core/cloud_event/mutator.ex | 11 +- .../cloud_event/mutator/default_mutator.ex | 8 +- .../cloud_event/mutator/webhook_mutator.ex | 158 +++++++++++ .../bridge_core/lib/bridge_core/reference.ex | 35 +++ .../lib/bridge_core/sender/connector.ex | 36 +++ .../boundary/channel_manager_test.exs | 251 ++++++++++-------- .../boundary/channel_supervisor_test.exs | 19 ++ .../boundary/node_observer_test.exs | 17 +- .../test/bridge_core/channel_test.exs | 105 +++++++- .../cloud_event/extractor_test.exs | 11 +- .../mutator/default_mutator_test.exs | 29 +- .../mutator/webhook_mutator_test.exs | 230 ++++++++++++++++ .../cloud_event/routing_error_test.exs | 16 ++ .../test/bridge_core/cloud_event_test.exs | 42 ++- .../bridge_core/sender/connector_test.exs | 26 ++ .../bridge_core/utils/json_search_test.exs | 87 +++--- .../bridge_core/test/bridge_core_test.exs | 179 ++++++++----- .../application_config.ex | 33 ++- .../test/test-config.yaml | 4 +- channel-bridge/apps/bridge_rabbitmq/mix.exs | 2 +- .../lib/bridge_api/rest/rest_helper.ex | 11 +- .../lib/bridge_api/rest/rest_router.ex | 8 +- .../bridge_api/rest/channel_request_test.exs | 6 +- .../apps/bridge_secretmanager/mix.exs | 2 +- channel-bridge/config-local.yaml | 35 ++- channel-bridge/mix.exs | 2 +- channel-bridge/mix.lock | 1 + 34 files changed, 1361 insertions(+), 431 deletions(-) create mode 100644 channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/webhook_mutator.ex create mode 100644 channel-bridge/apps/bridge_core/lib/bridge_core/reference.ex create mode 100644 channel-bridge/apps/bridge_core/lib/bridge_core/sender/connector.ex create mode 100644 channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/webhook_mutator_test.exs create mode 100644 channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/routing_error_test.exs create mode 100644 channel-bridge/apps/bridge_core/test/bridge_core/sender/connector_test.exs diff --git a/.gitignore b/.gitignore index e43b0f9..89c1ece 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,32 @@ .DS_Store + +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Temporary files, for example, from tests. +/tmp/ + +/.elixir_ls/ + +/**/.elixir_ls/ + +.vscode + +.idea + +*.iml diff --git a/channel-bridge/.gitignore b/channel-bridge/.gitignore index 3401a4e..4c955e9 100644 --- a/channel-bridge/.gitignore +++ b/channel-bridge/.gitignore @@ -26,4 +26,8 @@ erl_crash.dump /**/.elixir_ls/ -.vscode \ No newline at end of file +.vscode + +.idea + +*.iml \ No newline at end of file diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core.ex b/channel-bridge/apps/bridge_core/lib/bridge_core.ex index a841a97..58462a1 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core.ex @@ -12,8 +12,12 @@ defmodule BridgeCore do alias BridgeCore.Boundary.ChannelManager alias BridgeCore.Boundary.ChannelRegistry alias BridgeCore.Boundary.NodeObserver + alias BridgeCore.Sender.Connector - @default_mutator "Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator" + @default_mutator %{ + "mutator_module" => "Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator", + "config" => nil + } @doc false @impl Application @@ -23,6 +27,7 @@ defmodule BridgeCore do children = case (Application.get_env(:bridge_core, :env)) do e when e in [:test, :bench] -> + Logger.debug("Running in test mode") [ {Task.Supervisor, name: BridgeCore.TaskSupervisor}, AdfSenderConnector.spec([sender_url: sender_url_cfg]), @@ -30,6 +35,7 @@ defmodule BridgeCore do ] _ -> + Logger.debug("Running in production mode") [ {Task.Supervisor, name: BridgeCore.TaskSupervisor}, {Cluster.Supervisor, [topologies(), [name: BridgeCore.ClusterSupervisor]]}, @@ -56,21 +62,39 @@ defmodule BridgeCore do channel_registration = obtain_credentials(channel) case channel_registration do {:ok, registered_channel} -> - mutator = String.to_existing_atom( - BridgeHelperConfig.get([:bridge, "cloud_event_mutator"], @default_mutator) + + mutator_setup = BridgeHelperConfig.get([:bridge, "cloud_event_mutator"], @default_mutator) + ChannelSupervisor.start_channel_process( + registered_channel, + mutator_setup ) + + {:ok, {registered_channel, mutator_setup}} + + {:error, _reason} = err -> + err + end + + {:ok, pid} -> + # channel process already exists + Logger.debug("Channel already registered with alias : #{channel.channel_alias}") + + {:ok, {existing_channel, mutator}} = ChannelManager.get_channel_info(pid) + existing_channel_with_new_credentials = obtain_credentials(existing_channel) + case existing_channel_with_new_credentials do + {:ok, registered_channel} -> + ChannelSupervisor.start_channel_process( registered_channel, mutator ) + {:ok, {registered_channel, mutator}} {:error, _reason} = err -> err end - {:ok, pid} -> - # Logger.warning("Channel process already exists for alias #{inspect(channel.channel_alias)}") - ChannelManager.get_channel_info(pid) + end end @@ -96,7 +120,7 @@ defmodule BridgeCore do defp obtain_credentials(channel) do Task.Supervisor.async(BridgeCore.TaskSupervisor, fn -> - with {:ok, creds} <- AdfSenderConnector.channel_registration(channel.application_ref.id, channel.user_ref.id) + with {:ok, creds} <- Connector.channel_registration(channel.application_ref.id, channel.user_ref.id) do {:ok, Channel.update_credentials(channel, creds["channel_ref"], creds["channel_secret"]) } else @@ -131,7 +155,6 @@ defmodule BridgeCore do Logger.warning("No libcluster topology defined!!! -> Using Default [Gossip]") [ strategy: Cluster.Strategy.Gossip ] _ -> - IO.inspect(topology, label: "topology strategy") [ strategy: String.to_existing_atom(topology["strategy"]), config: parse_config_key(topology["config"]) diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/app_client.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/app_client.ex index e4903ab..0f4d727 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/app_client.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/app_client.ex @@ -3,31 +3,33 @@ defmodule BridgeCore.AppClient do An application that uses ADF to route messages to front end """ - alias BridgeCore.CloudEvent - require Logger + @default_channel_inactivity_timeout 420 # in seconds = 7 minutes + @type id() :: String.t() @type name() :: String.t() - @type cloud_event() :: CloudEvent.t() @type t() :: %__MODULE__{ id: id(), - name: name() + name: name(), + channel_timeout: integer() } @derive Jason.Encoder defstruct id: nil, - name: nil + name: nil, + channel_timeout: 0 @doc """ creates a simple client application representation """ - @spec new(id(), name()) :: t() - def new(id, name) do + @spec new(id(), name(), integer()) :: t() + def new(id, name, ch_timeout \\ @default_channel_inactivity_timeout) do %__MODULE__{ id: id, - name: name + name: name, + channel_timeout: ch_timeout } end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/boundary/channel_manager.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/boundary/channel_manager.ex index 7336781..010bac1 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/boundary/channel_manager.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/boundary/channel_manager.ex @@ -4,10 +4,10 @@ defmodule BridgeCore.Boundary.ChannelManager do """ use GenStateMachine, callback_mode: [:state_functions, :state_enter] require Logger + import Bitwise - alias BridgeCore.CloudEvent - alias BridgeCore.Channel - alias AdfSenderConnector.Message + alias BridgeCore.Sender.Connector; + alias BridgeCore.{CloudEvent, Channel} @type channel_ref() :: String.t() @type channel_secret() :: String.t() @@ -23,8 +23,8 @@ defmodule BridgeCore.Boundary.ChannelManager do GenStateMachine.cast(server, {:deliver_message, message}) end - def update(server, message) do - GenStateMachine.call(server, {:update_channel, message}) + def update(server, new_channel_data) do + GenStateMachine.call(server, {:update_channel, new_channel_data}) end @spec close_channel(:gen_statem.server_ref()) :: :ok | {:error, reason :: term} @@ -46,8 +46,8 @@ defmodule BridgeCore.Boundary.ChannelManager do def init({channel, _mutator} = args) do Process.flag(:trap_exit, true) - Enum.map(channel.procs, fn {ch_ref, _} -> - AdfSenderConnector.start_router_process(ch_ref, []) + Enum.map(channel.procs, fn ref -> + Connector.start_router_process(ref.channel_ref, []) end) Logger.debug("new channel manager : #{inspect(args)} ") @@ -55,12 +55,14 @@ defmodule BridgeCore.Boundary.ChannelManager do {:ok, :open, args} end - ######################################### - ### OPEN STATE #### - ### open state callbacks definitions #### + ################################################################################ + ### OPEN STATE #### + ### open state callbacks definitions #### + ################################################################################ def open(:enter, _old_state, _data) do - :keep_state_and_data + # sets up process to validate channel state every 60 seconds + some drift + {:keep_state_and_data, [{:state_timeout, 60_000 + rand_increment(1_000), :validate_state}]} end def open( @@ -71,36 +73,58 @@ defmodule BridgeCore.Boundary.ChannelManager do {:keep_state_and_data, [{:reply, from, {:ok, data}}]} end - @doc """ - Delivers a cloud_event, performing any steps necesary prior to calling ADF Sender endpoint. - """ + # Delivers a cloud_event, performing any steps necesary prior to calling ADF Sender endpoint. def open( :cast, {:deliver_message, cloud_event}, {channel, mutator} = _data ) do - mutate_event(cloud_event, mutator) + result = CloudEvent.mutate(cloud_event, mutator) |> call_send(channel) - :keep_state_and_data + case result do + {:error, _} -> + :keep_state_and_data + + _ -> + # updates chanel with timestamp of last processed message + {:keep_state, {Channel.update_last_message(channel), mutator}} + end end def open( {:call, from}, - {:update_channel, channel_param}, + {:update_channel, new_channel_data}, {channel, mutator} = _data ) do - new_procs = Enum.dedup(channel_param.procs ++ channel.procs) - new_channel = %{ channel | procs: new_procs, updated_at: DateTime.utc_now() } - - Logger.debug("ChannelManager, new state: #{inspect(new_channel)}") + case Channel.get_procs(new_channel_data) do + {:error, :empty_refs} -> + {:keep_state_and_data, [{:reply, from, {:error, :empty_refs}}]} + + {:ok, procs} -> + {ch_ref, sec} = Enum.map(procs, fn ref -> {ref.channel_ref, ref.channel_secret} end) + |> List.first() + new_channel = Channel.update_credentials(channel, ch_ref, sec) + Connector.start_router_process(ch_ref, []) + Logger.debug("ChannelManager, new state: #{inspect(new_channel)}") + {:keep_state, {new_channel, mutator}, [{:reply, from, {:ok, new_channel}}]} + end + end - {ch_ref, _} = List.first(new_channel.procs) - AdfSenderConnector.start_router_process(ch_ref, []) + # Validates idling condition on an open channel. If channel is considered as being idle after a certain time frame + # then is forced to close. + def open( + :state_timeout, + :validate_state, + {channel, _} = data + ) do - {:keep_state, {new_channel, mutator}, [{:reply, from, {:ok, new_channel}}]} + case Channel.check_state_inactivity(channel) do + :noop -> {:keep_state_and_data, [{:state_timeout, 60_000 + rand_increment(1_000), :validate_state}]} + :timeout -> {:next_state, :closed, data, []} + end end @@ -113,32 +137,13 @@ defmodule BridgeCore.Boundary.ChannelManager do {channel, _mutator} = _data ) do - {:ok, new_channel} = Channel.close(channel) - - Logger.debug("Channel changing to status closed, #{inspect(new_channel)}") +# {:ok, new_channel} = Channel.close(channel) - {:next_state, :closed, {new_channel, nil}, [ + {:next_state, :closed, {channel, nil}, [ {:reply, from, :ok} ]} end - defp mutate_event(cloud_event, mutator) do - cloud_event - |> mutator.mutate - |> (fn result -> - case result do - {:ok, _mutated} -> - Logger.debug("Cloud event mutated!") - result - - {:error, reason} -> - Logger.error("Message mutation error. #{inspect(reason)}") - # raise "Error performing mutations on event..." - {:error, reason} - end - end).() - end - defp call_send({:error, _reason} = result, channel) do Logger.error("Message not routeable to #{inspect(channel.procs)} due to error: #{inspect(result)}") result @@ -149,73 +154,49 @@ defmodule BridgeCore.Boundary.ChannelManager do {:error, :invalid_status, nil} end - # defp call_send(_, %{status: :closed} = _channel) do - # Logger.error("Channel status is :closed, routing message is not posible.") - # {:error, :invalid_status, nil} - # end - defp call_send({:ok, cloud_event}, %{status: :ready} = channel) do + case Channel.prepare_messages(channel, cloud_event) do + {:error, _} = err -> + err + + {:ok, messages} -> + messages + |> Stream.map(fn msg -> - with {:ok, _procs} <- check_channel_procs(channel), - {:ok, _verified_event } <- check_cloud_event(cloud_event) do + send_result = Connector.route_message(msg.channel_ref, msg) - Stream.map(channel.procs, fn {channel_ref, _} -> - Message.new(channel_ref, cloud_event.id, cloud_event.id, Map.from_struct(cloud_event), cloud_event.type) - end) |> - Stream.map(fn msg -> - send_result = AdfSenderConnector.route_message(msg.channel_ref, msg.event_name, msg) case send_result do {:ok, _} -> Logger.debug("Message routed to #{inspect(msg.channel_ref)}") - {:ok, msg.channel_ref} + {msg.channel_ref, :ok} {:error, reason} -> Logger.error("Message not routed to #{msg.channel_ref}, reason: #{inspect(reason)}") - {:error, reason, msg.channel_ref} + {msg.channel_ref, :error, reason} end - end) |> - Enum.to_list() + end) - else - {:error, :empty_refs} = err-> - Logger.error("channel_ref is empty or unknown. Routing messages is not posible. #{inspect(cloud_event)}") - err + |> Enum.to_list() - {:error, :invalid_message} = err -> - Logger.error("Invalid or nil cloud_event. Routing is not posible. #{inspect(cloud_event)}") - err end end - defp check_channel_procs(channel) do - case channel.procs do - nil -> - {:error, :empty_refs} + ################################################################################ + ### CLOSED STATE #### + ### closed state callbacks definitions #### + ################################################################################ - [] -> - {:error, :empty_refs} - _ -> - {:ok, channel.procs} - end - end + def closed(:enter, _old_state, {channel, _} = _data) do - defp check_cloud_event(cloud_event) do - case cloud_event do - nil -> - {:error, :invalid_message} - _ -> - {:ok, cloud_event} - end - end + # close related routing processes + Enum.map(channel.procs, fn ref -> + Connector.stop_router_process(ref.channel_ref, []) + end) - ########################################### - ### CLOSED STATE #### - ### closed state callbacks definitions #### + {:ok, new_channel} = Channel.close(channel) - def closed(:enter, _old_state, data) do - # :keep_state_and_data closing_timeout = 10 * 1000 - {:keep_state, data, [{:state_timeout, closing_timeout, :closing_timeout}]} + {:keep_state, {new_channel, nil}, [{:state_timeout, closing_timeout, :closing_timeout}]} end def closed( @@ -247,14 +228,6 @@ defmodule BridgeCore.Boundary.ChannelManager do :keep_state_and_data end - def closed( - :info, - _old_state, - _data - ) do - :keep_state_and_data - end - @impl true def terminate(reason, state, {channel, _} = _data) do Logger.warning( @@ -262,4 +235,10 @@ defmodule BridgeCore.Boundary.ChannelManager do ) end + defp rand_increment(n) do + # New delay chosen from [N, 3N], i.e. [0.5 * 2N, 1.5 * 2N] + width = n <<< 1 + n + :rand.uniform(width + 1) - 1 + end + end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/channel.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/channel.ex index aaad68c..85181ce 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/channel.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/channel.ex @@ -3,8 +3,8 @@ defmodule BridgeCore.Channel do Abstraction for an async-dataflow-channel-sender's channel """ - alias BridgeCore.User - alias BridgeCore.AppClient + alias BridgeCore.{User, AppClient, CloudEvent} + alias AdfSenderConnector.Message require Logger @@ -26,6 +26,7 @@ defmodule BridgeCore.Channel do status: atom(), updated_at: datetime() | nil, user_ref: user_ref(), + last_message_at: datetime() | nil } defstruct application_ref: nil, @@ -35,7 +36,8 @@ defmodule BridgeCore.Channel do reason: nil, status: nil, updated_at: nil, - user_ref: nil + user_ref: nil, + last_message_at: nil @doc """ creates a simple channel representation @@ -51,32 +53,124 @@ defmodule BridgeCore.Channel do status: :new, updated_at: nil, user_ref: user_ref, + last_message_at: nil } end @doc """ - updates creds info provided by ADF Channel Sender + register credentials info provided by ADF Channel Sender """ @spec update_credentials(t(), binary(), binary()) :: t() def update_credentials(channel, channel_ref, channel_secret) do %__MODULE__{ channel - | procs: [{channel_ref, channel_secret} | channel.procs], - # internally this is considered 'channel redy' to route messages + | procs: [BridgeCore.Reference.new(channel_ref, channel_secret) | channel.procs], + # internally this is considered 'channel ready' to route messages status: :ready, updated_at: DateTime.utc_now() } end - @spec set_status(t(), term(), term()) :: t() - def set_status(channel, status, reason) do - %__MODULE__{channel | status: status, reason: reason, updated_at: DateTime.utc_now()} +# @doc """ +# Adds a list of refs to the channel ref list. A ref is a channel_ref id generated by ADF Channel Sender. +# """ +# @spec add_procs(t(), procs()) :: t() +# def add_procs(channel, procs) do +# new_procs = Enum.dedup(procs ++ channel.procs) +# %__MODULE__{channel | procs: new_procs, updated_at: DateTime.utc_now()} +# end + +# @doc """ +# Updates the status and reason of the channel. +# """ +# @spec set_status(t(), term(), term()) :: t() +# def set_status(channel, status, reason) do +# %__MODULE__{channel | status: status, reason: reason, updated_at: DateTime.utc_now()} +# end + + @doc """ + Updates the last message routed timestamp to the current time. + """ + @spec update_last_message(t()) :: t() + def update_last_message(channel) do + %__MODULE__{channel | last_message_at: DateTime.utc_now()} + end + + @doc """ + Obtains the ADF Channel Sender client Message struct to be routed. + """ + @spec prepare_messages(t(), CloudEvent.t()) :: {:ok, list()} | {:error, term} + def prepare_messages(channel, cloud_event) do + + with {:ok, ch_procs} <- get_procs(channel), + {:ok, _ } <- check_cloud_event(cloud_event) do + + {:ok, + Stream.map(ch_procs, fn proc -> + {proc.channel_ref, cloud_event.id, Map.from_struct(cloud_event), cloud_event.type} + end) |> + Stream.map(fn {ref, evt_id, data, name} -> + Message.new(ref, evt_id, evt_id, data, name) + end) + } + + else + {:error, :empty_refs} = err-> + Logger.error("channel_ref is empty or unknown. #{inspect(cloud_event)}") + err + + {:error, :invalid_message} = err -> + Logger.error("Invalid cloud_event. #{inspect(cloud_event)}") + err + end + end + + def get_procs(channel) do + case channel.procs do + nil -> + {:error, :empty_refs} + [] -> + {:error, :empty_refs} + _ -> + {:ok, channel.procs} + end + end + + defp check_cloud_event(cloud_event) do + case cloud_event do + nil -> + {:error, :invalid_message} + _ -> + {:ok, cloud_event} + end + end + + @doc """ + Checks if the state of the channel has reached the inactivity timeout. Only applies when state is :ready (open). + """ + @spec check_state_inactivity(t()) :: :noop | :timeout + def check_state_inactivity(channel) do + case channel.status do + :ready -> + last_msg = case channel.last_message_at do + nil -> channel.updated_at + _ -> channel.last_message_at + end + if DateTime.diff(DateTime.utc_now(), last_msg) > channel.application_ref.channel_timeout do + Logger.warning("Channel inactivity timeout reached: #{inspect(channel)}") + :timeout + else + :noop + end + _ -> + :noop + end end @doc """ - Closing the channel logically (just changing its status here), so no more operations can be done with it inside - ADF Bridge. ADF Channel Sender it is not notified of this action, and ADF Channel Sender handles closing - via timeouts or socket disconection. + Changes the channel current state to closed, no further operations can be done with this channel. + ADF Channel Sender it's not notified of this action. Note that ADF Channel Sender handles channel closing + via timeouts or socket disconnection. """ @spec close(t()) :: {:ok, t()} | {:error, reason :: term} def close(channel) do diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event.ex index c8f3dd9..01c4b12 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event.ex @@ -30,6 +30,7 @@ defmodule BridgeCore.CloudEvent do data: data() } + @derive Jason.Encoder defstruct specVersion: nil, type: nil, source: nil, @@ -113,4 +114,40 @@ defmodule BridgeCore.CloudEvent do defdelegate extract(cloud_event, path), to: Extractor defdelegate extract_channel_alias(cloud_event), to: Extractor + + @spec mutate(t(), atom()) :: {:ok, t()} | {:error, any} + def mutate(cloud_event, mutator_setup) do + + mutator = mutator_setup["mutator_module"] + mutator_config = mutator_setup["config"] + + with true <- mutator.applies?(cloud_event, mutator_config) do + Logger.debug("Applying mutator #{inspect(mutator)} to cloud event...") + + cloud_event + |> mutator.mutate(mutator_config) + |> (fn result -> + case result do + {:ok, _mutated} -> + Logger.debug("Cloud event mutated.") + result + + {:noop, ce} -> + Logger.debug("Cloud event not mutated!") + {:ok, ce} + + {:error, reason} = err -> + Logger.error("Message mutation error. #{inspect(reason)}") + err + end + end).() + else + false -> + Logger.debug("Mutator not applied to cloud event due to 'Mutator.applies?/2' returned = false") + {:ok, cloud_event} + {:error, reason} = err -> + Logger.error("Message mutation decision logic error. #{inspect(reason)}") + err + end + end end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator.ex index 7795710..fd4356d 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator.ex @@ -7,9 +7,16 @@ defmodule BridgeCore.CloudEvent.Mutator do alias BridgeCore.CloudEvent @type cloud_event() :: CloudEvent.t() + @type config() :: Map.t() @doc """ - Transform a CloudEvent + Function that defines if the mutator should be applied to the cloud event """ - @callback mutate(cloud_event()) :: {:ok, cloud_event()} | {:error, any()} + @callback applies?(cloud_event(), config()) :: boolean() | {:error, any()} + + @doc """ + Apply the mutator logic to the CloudEvent, an :ok result means the CloudEvent was mutated, else a :noop result means + the CloudEvent was not mutated due an error invoking the related endpoint. + """ + @callback mutate(cloud_event(), config()) :: {:ok | :noop, cloud_event()} | {:error, any()} end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/default_mutator.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/default_mutator.ex index 07ba2e2..f72de3f 100644 --- a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/default_mutator.ex +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/default_mutator.ex @@ -15,7 +15,13 @@ defmodule BridgeCore.CloudEvent.Mutator.DefaultMutator do @doc false @impl true - def mutate(cloud_event) do + def applies?(_cloud_event, _config \\ nil) do + true + end + + @doc false + @impl true + def mutate(cloud_event, _config \\ nil) do # No changes are made to the input cloud_event. You can implement mutation functionality here. {:ok, cloud_event} end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/webhook_mutator.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/webhook_mutator.ex new file mode 100644 index 0000000..0d90fc2 --- /dev/null +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/cloud_event/mutator/webhook_mutator.ex @@ -0,0 +1,158 @@ +defmodule BridgeCore.CloudEvent.Mutator.WebhookMutator do + @moduledoc """ + A module that invokes a rest endpoint and pass the cloudevent, to be mutated, as a parameter. + The rest endpoint is expected to return a mutated cloudevent. + + an example of the configuration for this mutator is: + + bridge: + cloud_event_mutator: + mutator_module: Elixir.BridgeCore.CloudEvent.Mutator.WebhookMutator + config: + webhook_url: "http://localhost:8081" + webhook_method: "POST" + webhook_headers: + - "Content-Type: application/json" + applies_when: + - key: "$.data" + comparator: "eq" + value: "demo1" + - key: "$.subject" + comparator: "contains" + value: "foo" + operator: "and" + """ + @behaviour BridgeCore.CloudEvent.Mutator + + require Logger + alias BridgeCore.CloudEvent + alias BridgeCore.Utils.JsonSearch + + @type t() :: CloudEvent.t() + + @webhook_content_type ['application/json'] + @webhook_options [ {:timeout, 3_000}, {:connect_timeout, 3_000} ] + + @doc false + @impl true + def applies?(cloud_event, config) do + Stream.map(config["applies_when"], fn rule -> + key = rule["key"] + comparator = rule["comparator"] + value = rule["value"] + bool_op = rule["operator"] || "or" + part = get_part(cloud_event, key, comparator) + cond do + is_comparator?(comparator) -> + {bool_op, compare(comparator, value, part)} + is_regex?(comparator) -> + {bool_op, Regex.match?(~r/#{value}/, part)} + end + end) + |> Enum.to_list() + |> Enum.reduce(false, fn {op, value}, acc -> + case op do + "and" -> acc && value + "or" -> acc || value + end + end) + end + + defp get_part(cloud_event, key, _operator) do + JsonSearch.prepare(cloud_event) + |> JsonSearch.extract(key) + end + + defp is_comparator?(operator) do + operator in ["eq", "ne", "gt", "lt", "ge", "le", "contains", "not_contains"] + end + + defp is_regex?(operator) do + operator in ["regex"] + end + + defp compare(operator, value, part) do + case operator do + "eq" -> part == value + "ne" -> part != value + "gt" -> part > value + "lt" -> part < value + "ge" -> part >= value + "le" -> part <= value + "contains" -> String.contains?(part, value) + "not_contains" -> !String.contains?(part, value) + _ -> false + end + end + + @doc false + @impl true + def mutate(cloud_event, config) do + encode_cloud_event(cloud_event) + |> invoke_webhook(config) + |> process_response(cloud_event) + end + + defp encode_cloud_event(cloud_event) do + case Jason.encode(cloud_event) do + {:ok, _encoded_cloud_event} = res -> + res + {:error, reason} -> + Logger.error("Error encoding cloud event prior to invoking webhook: #{inspect(reason)}") + {:noop, reason} + end + end + + defp invoke_webhook({:ok, encoded_cloud_event}, config) do + :httpc.request(:post, + {config["webhook_url"], parse_headers(config), @webhook_content_type, encoded_cloud_event}, + @webhook_options, []) + end + + defp invoke_webhook({:noop, reason}, _) do + {:noop, reason} + end + + defp process_response({:ok, { {_, status, _}, _, response_body} }, cloud_event) do + if (status < 200 or status >= 300) do + Logger.error("Webhook result unsuccessful: #{inspect(status)}, body: #{inspect(response_body)}") + {:noop, cloud_event} + else + CloudEvent.from(to_string(response_body)) + |> (fn + {:error, reason, _} -> + Logger.error("Error parsing webhook response: #{inspect(reason)}") + {:noop, cloud_event} + {:ok, new_ce} -> + {:ok, %{cloud_event | data: new_ce.data}} + end).() + end + end + + defp process_response({:failed_connect, reason}, cloud_event) do + Logger.error("Failed to connect to webhook: #{inspect(reason)}") + {:noop, cloud_event} + end + + defp process_response({:error, reason}, cloud_event) do + Logger.error("Error invoking webhook: #{inspect(reason)}") + {:noop, cloud_event} + end + + defp process_response({:noop, _reason}, cloud_event) do + {:noop, cloud_event} + end + + defp parse_headers(config) do + if config["webhook_headers"] == nil do + [{'accept', 'application/json'}] + else + Stream.map(config["webhook_headers"], fn str_header -> + String.split(str_header, ":") + end) + |> Stream.map(fn [key, value] -> {to_charlist(key), to_charlist(String.trim(value))} end) + |> Enum.to_list() + end + end + +end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/reference.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/reference.ex new file mode 100644 index 0000000..fd8748f --- /dev/null +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/reference.ex @@ -0,0 +1,35 @@ +defmodule BridgeCore.Reference do + @moduledoc false + + @type channel_ref() :: binary() + @type channel_secret() :: binary() + + @type t() :: %__MODULE__{ + channel_ref: channel_ref(), + channel_secret: channel_secret(), + created_at: DateTime.t(), + status: atom(), + updated_at: DateTime.t() | nil, + last_message_at: DateTime.t() | nil + } + + defstruct channel_ref: nil, + channel_secret: nil, + created_at: nil, + status: nil, + updated_at: nil, + last_message_at: nil + + @spec new(channel_ref(), channel_secret()) :: t() + def new(channel_ref, channel_secret) do + %__MODULE__{ + channel_ref: channel_ref, + channel_secret: channel_secret, + created_at: DateTime.utc_now(), + status: :new, + updated_at: nil, + last_message_at: nil + } + end + +end diff --git a/channel-bridge/apps/bridge_core/lib/bridge_core/sender/connector.ex b/channel-bridge/apps/bridge_core/lib/bridge_core/sender/connector.ex new file mode 100644 index 0000000..efd0f76 --- /dev/null +++ b/channel-bridge/apps/bridge_core/lib/bridge_core/sender/connector.ex @@ -0,0 +1,36 @@ +defmodule BridgeCore.Sender.Connector do + @moduledoc false + + require Logger + + alias AdfSenderConnector.Message + + @spec channel_registration(String.t, String.t) :: {:ok, map()} | {:error, any()} + def channel_registration(application_ref, user_ref) do + AdfSenderConnector.channel_registration(application_ref, user_ref) + end + + @spec start_router_process(String.t, Keyword.t) :: :ok | {:error, any()} + def start_router_process(channel_ref, options \\ []) do + AdfSenderConnector.start_router_process(channel_ref, options) + end + + @spec stop_router_process(String.t, Keyword.t) :: :ok | {:error, any()} + def stop_router_process(channel_ref, options \\ []) do + # TODO to be implemented + :ok + end + + @spec route_message(String.t, String.t, Message.t) :: {:ok, map()} | {:error, any()} + def route_message(channel_ref, event_name, protocol_msg) do + Logger.debug("Routing message to channel: #{channel_ref}") + AdfSenderConnector.route_message(channel_ref, event_name, protocol_msg) + end + + @spec route_message(String.t, Message.t) :: {:ok, map()} | {:error, any()} + def route_message(channel_ref, protocol_msg) do + Logger.debug("Routing message to channel: #{channel_ref}") + AdfSenderConnector.route_message(channel_ref, nil, protocol_msg) + end + +end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_manager_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_manager_test.exs index f902dfa..b301996 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_manager_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_manager_test.exs @@ -1,69 +1,49 @@ -Code.compiler_options(ignore_module_conflict: true) - defmodule BridgeCore.Boundary.ChannelManagerTest do use ExUnit.Case, async: false import Mock - import ExUnit.CaptureLog require Logger alias BridgeCore.Boundary.ChannelManager - alias BridgeCore.Channel - alias BridgeCore.AppClient - alias BridgeCore.User - alias BridgeCore.CloudEvent + alias BridgeCore.{Channel, AppClient, User, CloudEvent} alias BridgeCore.CloudEvent.Mutator.DefaultMutator + alias BridgeCore.Sender.Connector @app_ref AppClient.new("01", "app-01") @user_ref User.new("CC-123456") - - setup do - Application.put_env(:channel_bridge, :event_mutator, BridgeCore.CloudEvent.Mutator.DefaultMutator) - - on_exit(fn -> - Application.delete_env(:channel_bridge, :event_mutator) - end) - - :ok - end + @default_mutator_setup %{ + "mutator_module" => DefaultMutator, + "config" => nil + } setup_with_mocks([ - {AdfSenderConnector, [], + {Connector, [], [ - channel_registration: fn application_ref, _user_ref -> - case application_ref do - @app_ref -> - {:ok, - %{ - "channel_ref" => Base.encode64(:crypto.strong_rand_bytes(10)), - "channel_secret" => Base.encode64(:crypto.strong_rand_bytes(20)) - }} - - _ -> - {:error, %{}} - end + channel_registration: fn _application_ref, _user_ref -> + {:ok, %{ "channel_ref" => "ref", "channel_secret" => "secret"} } end, - start_router_process: fn _channel_ref, _options -> - :ok - end, - route_message: fn _chref, _event, protocol_msg -> + start_router_process: fn _channel_ref, _options -> :ok end, + stop_router_process: fn _channel_ref, _options -> :ok end, + route_message: fn _chref, protocol_msg -> case protocol_msg.event_name do - "some.event.to.fail.send1" -> {:error, :channel_sender_unknown_error} - "some.event.to.fail.send2" -> {:error, :channel_sender_econnrefused} - _ -> {:ok, %{}} + "some.event.to.fail.send1" -> + {:error, :channel_sender_econnrefused} + _ -> + {:ok, %{}} end end ]}, - {DefaultMutator, [], - [ - mutate: fn cloud_event -> - case cloud_event do - nil -> - {:ok, cloud_event} - _ -> - case cloud_event.type do - "some.event.to.fail.mutation" -> {:error, "some dummy reason"} - _ -> {:ok, cloud_event} - end + {DefaultMutator, [], [ + applies?: fn _cloud_event, _config -> true end, + mutate: fn event, _config -> + if event == nil do + {:error, :mutation_error} + else + case event.type do + "some.event.to.fail.mutation" -> + {:error, :mutation_error} + _ -> + {:ok, event} + end end end ]} @@ -75,76 +55,102 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - assert [{"ref", "secret"}] == channel.procs + assert {:ok, refs} = Channel.get_procs(channel) + assert ["ref"] == Enum.map(refs, fn ref -> ref.channel_ref end) - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) assert pid != nil assert Process.info(pid, :priority) == {:priority, :normal} - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "Should get info on channel" do channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) {:ok, {channel2, _mutator}} = ChannelManager.get_channel_info(pid) assert channel == channel2 - Process.exit(pid, :kill) + Process.exit(pid, :normal) + end + + test "Should get info on channel, with status closed" do + channel = Channel.new("my-alias", @app_ref, @user_ref) + |> Channel.update_credentials("ref", "secret") + + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) + + assert :ok == ChannelManager.close_channel(pid) + + {:ok, {channel2, _mutator}} = ChannelManager.get_channel_info(pid) + + assert channel.channel_alias == channel2.channel_alias + assert :closed == channel2.status + + Process.exit(pid, :normal) end test "Should update channel" do channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) {:ok, {alt_channel, _mutator}} = ChannelManager.get_channel_info(pid) assert channel == alt_channel - channel2 = Channel.update_credentials(channel, "ref2", "secret2") - assert [{"ref2", "secret2"}, {"ref", "secret"}] == channel2.procs + channel = %{alt_channel | procs: [BridgeCore.Reference.new("ref2", "secret2")]} - {:ok, alt_channel} = ChannelManager.update(pid, channel2) + {:ok, channel2} = ChannelManager.update(pid, channel) + {:ok, refs} = Channel.get_procs(channel2) + assert ["ref2", "ref"] == Enum.map(refs, fn ref -> ref.channel_ref end) - assert channel2.procs == alt_channel.procs + # now tries to update with empty procs + assert {:error, :empty_refs} = ChannelManager.update(pid, %{channel2 | procs: []}) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end - test "Should close channel" do + channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) + + assert :ok == ChannelManager.close_channel(pid) + + Process.exit(pid, :normal) + end + + test "Should close channel just once" do + + channel = Channel.new("my-alias", @app_ref, @user_ref) + |> Channel.update_credentials("ref", "secret") + + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) assert :ok == ChannelManager.close_channel(pid) # trying to close it again assert {:error, :alreadyclosed} == ChannelManager.close_channel(pid) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "Should route message" do - children = [ - {Task.Supervisor, name: ADFSender.TaskSupervisor} - ] - - {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) {:ok, message} = CloudEvent.from("{ \"data\": { @@ -161,50 +167,38 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do }") response = ChannelManager.deliver_message(pid, message) - assert :ok = response - :timer.sleep(200) + :timer.sleep(10) + assert_called Connector.start_router_process(:_ , :_) + assert_called Connector.route_message(:_, :_) - assert called(AdfSenderConnector.route_message(:_, :_, :_)) + Process.exit(pid, :normal) - Process.exit(pid, :kill) end test "Should not send nil message" do - children = [ - {Task.Supervisor, name: ADFSender.TaskSupervisor} - ] - - {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) response = ChannelManager.deliver_message(pid, nil) - assert :ok = response - :timer.sleep(100) - - assert_not_called(AdfSenderConnector.route_message(:_, :_, :_)) + :timer.sleep(10) + assert_not_called Connector.route_message(:_, :_, :_) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "Should not send message - handle mutator fail" do - children = [ - {Task.Supervisor, name: ADFSender.TaskSupervisor} - ] - - {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) {:ok, message} = CloudEvent.from("{ \"data\": { @@ -221,28 +215,19 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do }") response = ChannelManager.deliver_message(pid, message) - assert :ok = response - :timer.sleep(100) + :timer.sleep(10) + assert_not_called Connector.route_message(:_, :_) - assert_not_called(AdfSenderConnector.route_message(:_, :_, :_)) - - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "Should not send message - handle send fail" do - children = [ - {Task.Supervisor, name: ADFSender.TaskSupervisor} - ] - - {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) - channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) - + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) {:ok, message} = CloudEvent.from("{ \"data\": { @@ -261,24 +246,23 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do response = ChannelManager.deliver_message(pid, message) assert :ok = response - :timer.sleep(100) - assert called(AdfSenderConnector.route_message(:_, :_, :_)) - + :timer.sleep(10) + assert_called Connector.route_message(:_, :_) new_message = %{message | type: "some.event.to.fail.send2"} new_response = ChannelManager.deliver_message(pid, new_message) assert :ok = new_response - :timer.sleep(100) - assert called(AdfSenderConnector.route_message(:_, :_, :_)) + :timer.sleep(10) + assert_called Connector.route_message(:_, :_) - Process.exit(pid, :kill) + Process.exit(pid, :normal) end test "Should not send message - channel status is new" do channel = Channel.new("my-alias", @app_ref, User.new("CC-1989637140")) - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) # then tries to deliver msg {:ok, message} = CloudEvent.from("{ @@ -297,24 +281,22 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do assert :ok == ChannelManager.deliver_message(pid, message) - :timer.sleep(100) - - assert_not_called(AdfSenderConnector.route_message(:_, :_, :_)) + :timer.sleep(10) + assert_not_called BridgeCore.Sender.Connector.route_message(:_, :_, :_) Process.exit(pid, :normal) end test "Should not send message - channel status is closed" do + channel = Channel.new("my-alias", @app_ref, @user_ref) |> Channel.update_credentials("ref", "secret") - {:ok, pid} = ChannelManager.start_link({channel, DefaultMutator}) + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) # then closes channel ChannelManager.close_channel(pid) - :timer.sleep(100) - # then tries to deliver msg {:ok, message} = CloudEvent.from("{ \"data\": { @@ -332,9 +314,44 @@ defmodule BridgeCore.Boundary.ChannelManagerTest do assert :ok == ChannelManager.deliver_message(pid, message) - :timer.sleep(200) + :timer.sleep(10) + assert_not_called BridgeCore.Sender.Connector.route_message(:_, :_, :_) + + Process.exit(pid, :normal) + end + + test "Should stop related process on channel close" do + + channel = Channel.new("my-alias", @app_ref, @user_ref) + |> Channel.update_credentials("ref", "secret") + + {:ok, pid} = ChannelManager.start_link({channel, @default_mutator_setup}) + + {:ok, message} = CloudEvent.from("{ + \"data\": { + \"msg\": \"Hello World\" + }, + \"dataContentType\": \"application/json\", + \"id\": \"1\", + \"invoker\": \"invoker1\", + \"source\": \"source1\", + \"specVersion\": \"0.1\", + \"subject\": \"my-alias\", + \"time\": \"xxx\", + \"type\": \"some.event\" + }") + + # delivers a message + assert :ok == ChannelManager.deliver_message(pid, message) + + :timer.sleep(5) + assert_called BridgeCore.Sender.Connector.route_message(:_, :_) + + # then closes the channel + ChannelManager.close_channel(pid) - assert_not_called(AdfSenderConnector.route_message(:_, :_, :_)) + :timer.sleep(5) + assert_called BridgeCore.Sender.Connector.stop_router_process(:_, :_) Process.exit(pid, :normal) end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_supervisor_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_supervisor_test.exs index 6f508b0..ef9d00e 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_supervisor_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/channel_supervisor_test.exs @@ -5,6 +5,7 @@ defmodule BridgeCore.Boundary.ChannelSupervisorTest do alias BridgeCore.Channel alias BridgeCore.Boundary.ChannelSupervisor + alias BridgeCore.Boundary.ChannelManager alias Horde.DynamicSupervisor test "Should start supervisor" do @@ -22,6 +23,24 @@ defmodule BridgeCore.Boundary.ChannelSupervisorTest do pid = ChannelSupervisor.start_channel_process(channel, BridgeCore.CloudEvent.Mutator.DefaultMutator) assert pid != nil + end + end + + test "Should handle starting channel more than once" do + with_mocks([ + {DynamicSupervisor, [], [ + start_child: fn _module, _child -> {:error, {:already_started, self()}} end + ]}, + {ChannelManager, [], [ + update: fn _pid, _channel -> :ok end + ]}, + + ]) do + + channel = Channel.new("my-alias0980978", "app01", "user1") + pid = ChannelSupervisor.start_channel_process(channel, + BridgeCore.CloudEvent.Mutator.DefaultMutator) + assert pid != nil end end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/node_observer_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/node_observer_test.exs index cfbb17a..92e1bbd 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/boundary/node_observer_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/boundary/node_observer_test.exs @@ -1,16 +1,21 @@ defmodule BridgeCore.Boundary.NodeObserverTest do use ExUnit.Case - import Mock + alias BridgeCore.Boundary.{NodeObserver, ChannelRegistry, ChannelSupervisor} - alias BridgeCore.Boundary.NodeObserver - alias BridgeCore.Boundary.ChannelRegistry - alias BridgeCore.Boundary.ChannelSupervisor - - test "Should start nodeobserver" do + setup do {:ok, rpid} = ChannelRegistry.start_link(nil) {:ok, spid} = ChannelSupervisor.start_link(nil) + on_exit(fn -> + Process.exit(rpid, :kill) + Process.exit(spid, :kill) + end) + + :ok + end + + test "Should start nodeobserver" do {:ok, pid} = NodeObserver.start_link(nil) assert is_pid(pid) diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/channel_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/channel_test.exs index 36e0205..3bf8431 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/channel_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/channel_test.exs @@ -1,18 +1,17 @@ -Code.compiler_options(ignore_module_conflict: true) - defmodule BridgeCore.ChannelTest do - use ExUnit.Case + use ExUnit.Case, async: false import Mock alias BridgeCore.Channel alias BridgeCore.AppClient alias BridgeCore.User alias BridgeCore.CloudEvent + alias BridgeCore.Sender.Connector @moduletag :capture_log setup_with_mocks([ - {AdfSenderConnector, [], + {Connector, [], [ channel_registration: fn application_ref, _user_ref -> case application_ref do @@ -56,7 +55,9 @@ defmodule BridgeCore.ChannelTest do |> Channel.update_credentials("some_ref", "some_secret") assert channel.status == :ready - assert [{"some_ref", "some_secret"}] == channel.procs + {:ok, procs} = Channel.get_procs(channel) + + assert ["some_ref"] == Enum.map(procs, fn ref -> ref.channel_ref end) end test "Should mark channel as closed" do @@ -101,7 +102,8 @@ defmodule BridgeCore.ChannelTest do assert :ready == channel.status - new_channel = Channel.set_status(channel, :closed, :ok) + {:ok, new_channel} = Channel.close(channel) + assert :closed == new_channel.status end @@ -111,7 +113,96 @@ defmodule BridgeCore.ChannelTest do |> Channel.update_credentials("some_ref", "some_secret") |> Channel.update_credentials("some_ref2", "some_secret2") - assert [{"some_ref2", "some_secret2"}, {"some_ref", "some_secret"}] == channel.procs + {:ok, procs} = Channel.get_procs(channel) + + assert ["some_ref2", "some_ref"] == Enum.map(procs, fn ref -> ref.channel_ref end) + + end + + test "Should decide that channel is to be closed when running state inactivity check" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 1), User.new("user1")) + |> Channel.update_credentials("some_ref", "some_secret") + |> Channel.update_credentials("some_ref2", "some_secret2") + + assert :ready == channel.status + + # wait for 2 seconds + :timer.sleep(2000) + + # check_state_inactivity should return :timeout + assert :timeout == Channel.check_state_inactivity(channel) + + end + + test "Should decide that channel is not to be closed when running state inactivity check" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 60), User.new("user1")) + |> Channel.update_credentials("some_ref", "some_secret") + + assert :ready == channel.status + + # wait for 1 seconds + :timer.sleep(1000) + + new_channel = Channel.update_last_message(channel) + + # wait for 1 seconds + :timer.sleep(1000) + + # check_state_inactivity should return :noop + assert :noop == Channel.check_state_inactivity(new_channel) + + end + + test "Should not close channel when running state inactivity check, on status close" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 60), User.new("user1")) + |> Channel.update_credentials("some_ref", "some_secret") + + assert :ready == channel.status + + {:ok, new_channel} = Channel.close(channel) + assert :noop == Channel.check_state_inactivity(new_channel) + + + end + + test "Should test prepare message" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 60), User.new("user1")) + |> Channel.update_credentials("some_ref", "some_secret") + + cloud_event = CloudEvent.new("1", "1", "1", "1", "1", "1", "1", "1", "1") + + {:ok, messages} = Channel.prepare_messages(channel, cloud_event) + + messages_list = messages |> Enum.to_list() + + assert 1 == length(messages_list) + end + + test "Should fail test prepare message, channel with no procs" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 60), User.new("user1")) + + cloud_event = CloudEvent.new("1", "1", "1", "1", "1", "1", "1", "1", "1") + + assert {:error, :empty_refs} == Channel.prepare_messages(channel, cloud_event) + end + + test "Should fail test prepare message, invalid cloud event" do + # create a channel with an app configured for 1 second idling timeout + channel = + Channel.new("my-channel", AppClient.new("app01", "app nam", 60), User.new("user1")) + |> Channel.update_credentials("some_ref", "some_secret") + + assert {:error, :invalid_message} == Channel.prepare_messages(channel, nil) end end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/extractor_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/extractor_test.exs index 19226f7..6d87df6 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/extractor_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/extractor_test.exs @@ -1,5 +1,3 @@ -Code.compiler_options(ignore_module_conflict: true) - defmodule BridgeCore.CloudEvent.ExtractorTest do use ExUnit.Case @@ -11,7 +9,10 @@ defmodule BridgeCore.CloudEvent.ExtractorTest do setup do cloud_event = "{ \"data\": { - \"hello\": \"world\" + \"hello\": \"world\", + \"list\": [{ + \"somekey\": \"somevalue\" + }] }, \"dataContentType\": \"application/json\", \"id\": \"1\", @@ -67,7 +68,9 @@ defmodule BridgeCore.CloudEvent.ExtractorTest do test "Should extract random data from cloud event", %{demo_evt: demo_evt} do {:ok, cloud_event} = CloudEvent.from(demo_evt) - + assert Extractor.extract(cloud_event, "$.id") == {:ok, "1"} + assert Extractor.extract(cloud_event, "$.type") == {:ok, "type1"} + assert Extractor.extract(cloud_event, "$.unexistent") == {:error, :keynotfound} assert Extractor.extract(cloud_event, "$.data.hello") == {:ok, "world"} end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/default_mutator_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/default_mutator_test.exs index 8ae9536..68b8c5f 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/default_mutator_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/default_mutator_test.exs @@ -8,16 +8,27 @@ defmodule BridgeCore.CloudEvent.Mutator.DefaultMutatorTest do @moduletag :capture_log - setup_all do - # {:ok, _} = Application.ensure_all_started(:plug_crypto) - :ok - end + test "Should not perform mutations to cloud_event" do + cloud_event = "{ + \"data\": {\"hello\": \"World\"}, + \"dataContentType\": \"application/json\", + \"id\": \"1\", + \"invoker\": \"invoker1\", + \"source\": \"source1\", + \"specVersion\": \"0.1\", + \"subject\": \"foo\", + \"time\": \"xxx\", + \"type\": \"type1\" + }" - setup do - :ok + {:ok, parsed_cloud_event} = CloudEvent.from(cloud_event) + + {:ok, non_mutated_cloud_event} = DefaultMutator.mutate(parsed_cloud_event) + + assert non_mutated_cloud_event == parsed_cloud_event end - test "Should not perform mutations to cloud_event" do + test "Should check apply rule" do cloud_event = "{ \"data\": {\"hello\": \"World\"}, \"dataContentType\": \"application/json\", @@ -32,8 +43,8 @@ defmodule BridgeCore.CloudEvent.Mutator.DefaultMutatorTest do {:ok, parsed_cloud_event} = CloudEvent.from(cloud_event) - {:ok, unmutated_cloud_event} = DefaultMutator.mutate(parsed_cloud_event) + assert true == DefaultMutator.applies?(parsed_cloud_event, %{}) - assert unmutated_cloud_event == parsed_cloud_event end + end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/webhook_mutator_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/webhook_mutator_test.exs new file mode 100644 index 0000000..63618dc --- /dev/null +++ b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/mutator/webhook_mutator_test.exs @@ -0,0 +1,230 @@ +Code.compiler_options(ignore_module_conflict: true) + +defmodule BridgeCore.CloudEvent.Mutator.WebhookMutatorTest do + use ExUnit.Case + import Mock + + alias BridgeCore.CloudEvent + alias BridgeCore.CloudEvent.Mutator.WebhookMutator + + @moduletag :capture_log + + setup do + cloud_event = "{ + \"data\": {\"hello\": \"World\"}, + \"dataContentType\": \"application/json\", + \"id\": \"1\", + \"invoker\": \"invoker foo ref\", + \"source\": \"source1\", + \"specVersion\": \"0.1\", + \"subject\": \"foo\", + \"time\": \"xxx\", + \"type\": \"type1\" + }" + + {:ok, parsed_cloud_event} = CloudEvent.from(cloud_event) + + %{raw_cloud_event: cloud_event, cloud_event: parsed_cloud_event} + end + + test "Should resolve to apply mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "contains", "value" => "foo"}, + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + applies_result = WebhookMutator.applies?(cloud_event, mutator_config) + + assert applies_result == true + end + + test "Should resolve not to apply mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "not_contains", "value" => "foo"} # this rule will be applied + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + applies_result = WebhookMutator.applies?(cloud_event, mutator_config) + + assert applies_result == false + end + + test "Should perform mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "contains", "value" => "foo"}, + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + with_mocks([ + {:httpc, [], [request: fn _url, _params, _headers, _opts -> + + {:ok, + {{~c"HTTP/1.1", 200, ~c"OK"}, + [ + {~c"connection", ~c"keep-alive"}, + {~c"date", ~c"Fri, 22 Mar 2024 14:10:08 GMT"}, + {~c"content-length", ~c"224"}, + {~c"content-type", ~c"application/json; charset=utf-8"}, + {~c"keep-alive", ~c"timeout=5"} + ], + ~c"{\n \"data\": {\"foo\": \"bar\"},\n \"dataContentType\": \"application/json\",\n \"id\": \"1\",\n \"invoker\": \"invoker foo ref\",\n \"source\": \"source1\",\n \"specVersion\": \"0.1\",\n \"subject\": \"foo\",\n \"time\": \"xxx\",\n \"type\": \"type1\"\n}"}} + end]} + ]) do + + {:ok, mutated_cloud_event} = WebhookMutator.mutate(cloud_event, mutator_config) + + ## assert only body is mutated + assert mutated_cloud_event.data == %{"foo" => "bar"} + ## other fields should remain the same + assert mutated_cloud_event.dataContentType == cloud_event.dataContentType + assert mutated_cloud_event.id == cloud_event.id + assert mutated_cloud_event.invoker == cloud_event.invoker + assert mutated_cloud_event.source == cloud_event.source + assert mutated_cloud_event.specVersion == cloud_event.specVersion + assert mutated_cloud_event.subject == cloud_event.subject + assert mutated_cloud_event.time == cloud_event.time + assert mutated_cloud_event.type == cloud_event.type + + end + + end + + test "Should fail mutations due to invalid cloud event" do + + {:noop, <<255>>} = WebhookMutator.mutate("\xFF", %{}) + + end + + test "Should handle webhook fail and perform no mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "contains", "value" => "foo"}, + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + with_mocks([ + {:httpc, [], [request: fn _url, _params, _headers, _opts -> + {:ok, + {{~c"HTTP/1.1", 401, ~c"unauthorized"}, + [ + {~c"connection", ~c"keep-alive"}, + {~c"date", ~c"Fri, 22 Mar 2024 14:10:08 GMT"}, + {~c"content-length", ~c"224"}, + {~c"content-type", ~c"application/json; charset=utf-8"}, + {~c"keep-alive", ~c"timeout=5"} + ], + ~c""}} + end]} + ]) do + + {:noop, mutated_cloud_event} = WebhookMutator.mutate(cloud_event, mutator_config) + + ## assert fields should remain the same + assert mutated_cloud_event.data == %{"hello" => "World"} + assert mutated_cloud_event.dataContentType == cloud_event.dataContentType + assert mutated_cloud_event.id == cloud_event.id + assert mutated_cloud_event.invoker == cloud_event.invoker + assert mutated_cloud_event.source == cloud_event.source + assert mutated_cloud_event.specVersion == cloud_event.specVersion + assert mutated_cloud_event.subject == cloud_event.subject + assert mutated_cloud_event.time == cloud_event.time + assert mutated_cloud_event.type == cloud_event.type + + end + + end + + test "Should handle webhook failed connection and perform no mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "contains", "value" => "foo"}, + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + with_mocks([ + {:httpc, [], [request: fn _url, _params, _headers, _opts -> + {:failed_connect, :error} + end]} + ]) do + + {:noop, mutated_cloud_event} = WebhookMutator.mutate(cloud_event, mutator_config) + + ## assert fields should remain the same + assert mutated_cloud_event.data == %{"hello" => "World"} + assert mutated_cloud_event.dataContentType == cloud_event.dataContentType + assert mutated_cloud_event.id == cloud_event.id + assert mutated_cloud_event.invoker == cloud_event.invoker + assert mutated_cloud_event.source == cloud_event.source + assert mutated_cloud_event.specVersion == cloud_event.specVersion + assert mutated_cloud_event.subject == cloud_event.subject + assert mutated_cloud_event.time == cloud_event.time + assert mutated_cloud_event.type == cloud_event.type + + end + + end + + test "Should handle webhook error and perform no mutations to cloud_event", %{cloud_event: cloud_event} do + + mutator_config = %{ + "applies_when" => [ + %{"key" => "$.source", "comparator" => "eq", "value" => "sdfsdf"}, + %{"operator" => "or", "key" => "$.invoker", "comparator" => "contains", "value" => "foo"}, + ], + "webhook_headers" => ["Content-Type: application/json"], + "webhook_method" => "POST", + "webhook_url" => "http://localhost:3000/content/xyz" + } + + with_mocks([ + {:httpc, [], [request: fn _url, _params, _headers, _opts -> + {:error, "dummy reason"} + end]} + ]) do + + {:noop, mutated_cloud_event} = WebhookMutator.mutate(cloud_event, mutator_config) + + ## assert fields should remain the same + assert mutated_cloud_event.data == %{"hello" => "World"} + assert mutated_cloud_event.dataContentType == cloud_event.dataContentType + assert mutated_cloud_event.id == cloud_event.id + assert mutated_cloud_event.invoker == cloud_event.invoker + assert mutated_cloud_event.source == cloud_event.source + assert mutated_cloud_event.specVersion == cloud_event.specVersion + assert mutated_cloud_event.subject == cloud_event.subject + assert mutated_cloud_event.time == cloud_event.time + assert mutated_cloud_event.type == cloud_event.type + + end + + end + +end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/routing_error_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/routing_error_test.exs new file mode 100644 index 0000000..afd7417 --- /dev/null +++ b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event/routing_error_test.exs @@ -0,0 +1,16 @@ +defmodule BridgeCore.CloudEvent.RoutingErrorTest do + use ExUnit.Case + + alias BridgeCore.CloudEvent.RoutingError + + @moduletag :capture_log + + test "Should extract channel alias from data" do + + assert_raise RoutingError, fn -> + raise RoutingError, message: "dummy reason" + end + + end + +end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event_test.exs index 533a989..8b17ca5 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/cloud_event_test.exs @@ -2,12 +2,31 @@ Code.compiler_options(ignore_module_conflict: true) defmodule BridgeCore.CloudEventTest do use ExUnit.Case + import Mock alias BridgeCore.CloudEvent + alias BridgeCore.CloudEvent.Mutator.DefaultMutator @moduletag :capture_log - setup do + @default_mutator_setup %{ + "mutator_module" => BridgeCore.CloudEvent.Mutator.DefaultMutator, + "config" => nil + } + + setup_with_mocks([ + {DefaultMutator, [], + [ + applies?: fn a, _b -> + case a.id do + "1" -> true + "2" -> false + "3" -> {:error, "dummy"} + end + end, + mutate: fn a, _b -> {:ok, a} end + ]} + ]) do demo_event = %{ data: %{ "request" => %{ @@ -123,7 +142,7 @@ defmodule BridgeCore.CloudEventTest do assert err.reason != nil end - test "Should validate cloudevent", %{demo_evt_json: demo_evt_json} do + test "Should validate cloud event", %{demo_evt_json: demo_evt_json} do {:ok, msg} = CloudEvent.from(demo_evt_json) assert msg != nil assert "invoker1" == msg.invoker @@ -138,4 +157,23 @@ defmodule BridgeCore.CloudEventTest do {:ok, msg} = CloudEvent.from(demo_evt_json) assert {:ok, "some-micro"} == CloudEvent.extract(msg, "$.data.request.headers.target") end + + test "should perform mutation", %{demo_evt_json: demo_evt_json} do + {:ok, msg} = CloudEvent.from(demo_evt_json) + assert {:ok, msg} == CloudEvent.mutate(msg, @default_mutator_setup) + end + + test "should not perform mutation", %{demo_evt_json: demo_evt_json} do + {:ok, msg} = CloudEvent.from(demo_evt_json) + new_msg = %{msg | id: "2"} + assert {:ok, new_msg} == CloudEvent.mutate(new_msg, @default_mutator_setup) + end + + test "should fail performing mutation", %{demo_evt_json: demo_evt_json} do + {:ok, msg} = CloudEvent.from(demo_evt_json) + new_msg = %{msg | id: "3"} + assert {:error, "dummy"} == CloudEvent.mutate(new_msg, @default_mutator_setup) + end + + end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/sender/connector_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/sender/connector_test.exs new file mode 100644 index 0000000..fc2daff --- /dev/null +++ b/channel-bridge/apps/bridge_core/test/bridge_core/sender/connector_test.exs @@ -0,0 +1,26 @@ +defmodule BridgeCore.Sender.ConnectorTest do + use ExUnit.Case, async: false + require Logger + + alias BridgeCore.Sender.Connector + + test "Should check channel registration operation" do + assert {:error, :channel_sender_econnrefused} == + Connector.channel_registration("app_ref", "user_ref") + end + + test "Should check starting router process operation" do + {:ok, pid} = Connector.start_router_process("app_ref") + assert is_pid(pid) + end + + test "Should check routing operation" do + assert {:error, :unknown_channel_reference} == + Connector.route_message("xxx", "yyy", AdfSenderConnector.Message.new("a", "hello", "evt")) + end + + test "Should check routing operation II" do + assert {:error, :unknown_channel_reference} == + Connector.route_message("www", AdfSenderConnector.Message.new("a", "hello", "evt")) + end +end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core/utils/json_search_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core/utils/json_search_test.exs index 4438ef2..975dcbf 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core/utils/json_search_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core/utils/json_search_test.exs @@ -2,54 +2,50 @@ Code.compiler_options(ignore_module_conflict: true) defmodule BridgeCore.Utils.JsonSearchTest do use ExUnit.Case - + import Mock alias BridgeCore.CloudEvent alias BridgeCore.Utils.JsonSearch - # @moduletag :capture_log - + setup do + cloud_event = "{ + \"data\": { + \"hello\": \"world\", + \"list\": [{ + \"somekey\": \"somevalue\" + }] + }, + \"dataContentType\": \"application/json\", + \"id\": \"1\", + \"invoker\": \"invoker1\", + \"source\": \"source1\", + \"specVersion\": \"0.1\", + \"subject\": \"foo\", + \"time\": \"xxx\", + \"type\": \"type1\" + }" - setup_all do - # {:ok, _} = Application.ensure_all_started(:plug_crypto) - :ok - end + on_exit(fn -> + Application.delete_env(:channel_bridge, :cloud_event_channel_identifier) + end) - setup do - :ok + %{demo_evt: cloud_event} end - test "test search for key(s)" do - cloud_event = %{ - "data" => %{"hello" => "World"}, - "dataContentType" => "application/json", - "id" => "1", - "invoker" => "invoker1", - "source" => "source1", - "specVersion" => "0.1", - "subject" => "foo", - "time" => "xxx", - "type" => "type1" - } + test "test search for key(s)", %{demo_evt: demo_evt} do + {:ok, cloud_event} = CloudEvent.from(demo_evt) + unstruct_cloud_event = JsonSearch.prepare(cloud_event) - assert "invoker1" == JsonSearch.extract(cloud_event, "$.invoker") - assert "invoker1-source1" == JsonSearch.extract(cloud_event, ["$.invoker", "$.source"]) + assert "invoker1" == JsonSearch.extract(unstruct_cloud_event, "$.invoker") + assert "invoker1-source1" == JsonSearch.extract(unstruct_cloud_event, ["$.invoker", "$.source"]) + assert "somevalue" == JsonSearch.extract(unstruct_cloud_event, "$.data.list[0].somekey") + assert [%{"somekey" => "somevalue"}] == JsonSearch.extract(unstruct_cloud_event, "$.data.list") end - test "test search for non-existent key(s)" do - cloud_event = %{ - "data" => %{"hello" => "World"}, - "dataContentType" => "application/json", - "id" => "1", - "invoker" => "invoker1", - "source" => "source1", - "specVersion" => "0.1", - "subject" => "foo", - "time" => "xxx", - "type" => "type1" - } - - assert nil == JsonSearch.extract(cloud_event, "$.foo") - assert "undefined-undefined" == JsonSearch.extract(cloud_event, ["$.foo", "$.bar"]) + test "test search for non-existent key(s)", %{demo_evt: demo_evt} do + {:ok, cloud_event} = CloudEvent.from(demo_evt) + unstruct_cloud_event = JsonSearch.prepare(cloud_event) + assert nil == JsonSearch.extract(unstruct_cloud_event, "$.foo") + assert "undefined-undefined" == JsonSearch.extract(unstruct_cloud_event, ["$.foo", "$.bar"]) end test "test prepare" do @@ -70,4 +66,19 @@ defmodule BridgeCore.Utils.JsonSearchTest do test "test unstruct" do assert %{} == JsonSearch.unstruct(%{}) end + + test "test handle error on extract", %{demo_evt: demo_evt} do + with_mocks([ + {ExJSONPath, [], [ + eval: fn _a, _b -> {:error, "dummy error"} end + ]} + ]) do + + {:ok, cloud_event} = CloudEvent.from(demo_evt) + unstruct_cloud_event = JsonSearch.prepare(cloud_event) + + assert nil == JsonSearch.extract(unstruct_cloud_event, "$.foo") + + end + end end diff --git a/channel-bridge/apps/bridge_core/test/bridge_core_test.exs b/channel-bridge/apps/bridge_core/test/bridge_core_test.exs index 3189e7a..3923858 100644 --- a/channel-bridge/apps/bridge_core/test/bridge_core_test.exs +++ b/channel-bridge/apps/bridge_core/test/bridge_core_test.exs @@ -1,124 +1,145 @@ -Code.compiler_options(ignore_module_conflict: true) - defmodule BridgeCoreTest do use ExUnit.Case, async: false @moduletag :capture_log import Mock alias BridgeCore.{Channel, AppClient, User, CloudEvent} - alias BridgeCore.Boundary.ChannelSupervisor - alias BridgeCore.Boundary.ChannelRegistry + alias BridgeCore.Boundary.{ChannelSupervisor, ChannelRegistry, ChannelManager} + alias BridgeCore.Sender.Connector - setup_all do - ChannelRegistry.start_link(nil) - ChannelSupervisor.start_link(nil) - :ok - end + test "should start session" do - test "Should not start app twice" do + with_mocks([ + {ChannelRegistry, [], [lookup_channel_addr: fn _x -> :noproc end]}, + {Connector, [], [channel_registration: fn _, _ -> + {:ok, %{"channel_ref" => "dummy.channel.ref0", "channel_secret" => "yyy0"}} + end]}, + {ChannelSupervisor, [], [start_channel_process: fn _x, _y -> :ok end]}, + ]) do - assert {:error, {:already_started, _}} = BridgeCore.start(:normal, []) + {:ok, {new_channel, _}} = BridgeCore.start_session(Channel.new("a", AppClient.new("b", nil), User.new("c"))) - end + assert "a" == new_channel.channel_alias - test "should start session" do + {:ok, refs} = Channel.get_procs(new_channel) - create_response = %HTTPoison.Response{ - status_code: 200, - body: "{ \"channel_ref\": \"dummy.channel.ref0\", \"channel_secret\": \"yyy0\"}" - } + assert [{"dummy.channel.ref0", "yyy0"}] == Enum.map(refs, fn ref -> + {ref.channel_ref, ref.channel_secret} + end) + + assert :ready == new_channel.status + end + end + + test "should re-start session" do with_mocks([ - {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} + {ChannelRegistry, [], [lookup_channel_addr: [in_series(["a"], [:noproc, :c.pid(0,255,0)])] ]}, + {Connector, [], [channel_registration: fn _, _ -> + {:ok, %{"channel_ref" => "dummy.channel.ref0", "channel_secret" => "yyy0"}} + end]}, + {ChannelSupervisor, [], [start_channel_process: fn _x, _y -> :ok end]}, + {ChannelManager, [], [get_channel_info: fn _x -> + {:ok, {Channel.new("a", AppClient.new("b", nil), User.new("c")), nil}} + end]}, ]) do {:ok, {new_channel, _}} = BridgeCore.start_session(Channel.new("a", AppClient.new("b", nil), User.new("c"))) - :timer.sleep(100) - assert "a" == new_channel.channel_alias - assert [{"dummy.channel.ref0", "yyy0"}] == new_channel.procs - assert :ready == new_channel.status - :timer.sleep(100) + {:ok, refs} = Channel.get_procs(new_channel) + + assert [{"dummy.channel.ref0", "yyy0"}] == Enum.map(refs, fn ref -> + {ref.channel_ref, ref.channel_secret} + end) + + assert :ready == new_channel.status # try to reopen same channel - {:ok, {new_channel, _}} = BridgeCore.start_session(Channel.new("a", AppClient.new("b", nil), User.new("c"))) + {:ok, {other_channel, _}} = BridgeCore.start_session(Channel.new("a", AppClient.new("b", nil), User.new("c"))) # assert information stills the same - assert "a" == new_channel.channel_alias - assert [{"dummy.channel.ref0", "yyy0"}] == new_channel.procs - assert :ready == new_channel.status + assert new_channel.channel_alias == other_channel.channel_alias - BridgeCore.end_session("a") + {:ok, refs} = Channel.get_procs(other_channel) + + assert [{"dummy.channel.ref0", "yyy0"}] == Enum.map(refs, fn ref -> + {ref.channel_ref, ref.channel_secret} + end) + + assert :ready == other_channel.status end end test "should handle error starting session" do - create_response = %HTTPoison.Response{ - status_code: 500, - body: "{}" - } - with_mocks([ - {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} + {Connector, [], [channel_registration: fn _a, _b -> {:error, :channel_sender_unknown_error} end]}, + {ChannelRegistry, [], [lookup_channel_addr: fn _x -> :noproc end]}, ]) do - err_response = BridgeCore.start_session(Channel.new("a2", AppClient.new("b", nil), User.new("c"))) - - assert {:error, :channel_sender_unknown_error} == err_response - + assert {:error, :channel_sender_unknown_error} == + BridgeCore.start_session(Channel.new("a2", AppClient.new("b2", nil), User.new("c2"))) end end test "should route message" do - - create_response = %HTTPoison.Response{ - status_code: 200, - body: "{ \"channel_ref\": \"dummy.channel.ref1\", \"channel_secret\": \"yyy1\"}" - } - - route_response = %HTTPoison.Response{ - status_code: 200, - body: "{ \"message\": \"ok\" }" - } - with_mocks([ - {HTTPoison, [], [post: fn url, _params, _headers, _opts -> - if String.ends_with?(url, "ext/channel/create") do - {:ok, create_response} - else - {:ok, route_response} - end - end]} + {ChannelRegistry, [], [lookup_channel_addr: [in_series(["x"], [:noproc, :c.pid(0,255,0)])] ]}, + {Connector, [], [channel_registration: fn _, _ -> + {:ok, %{"channel_ref" => "dummy.channel.ref1", "channel_secret" => "yyy1"}} + end]}, + {ChannelSupervisor, [], [start_channel_process: fn _x, _y -> :ok end]}, + {ChannelManager, [], [deliver_message: fn _x, _y -> :ok end]}, ]) do {:ok, {new_channel, _}} = BridgeCore.start_session(Channel.new("x", AppClient.new("y", nil), User.new("z"))) - :timer.sleep(100) - assert "x" == new_channel.channel_alias - assert [{"dummy.channel.ref1", "yyy1"}] == new_channel.procs - assert :ready == new_channel.status - route_result = BridgeCore.route_message("x", CloudEvent.new("a", "b", "c", "d", "e", "f", "g", "h", "i")) + {:ok, refs} = Channel.get_procs(new_channel) - :timer.sleep(100) + assert [{"yyy1", "dummy.channel.ref1"}] == Enum.map(refs, fn ref -> + {ref.channel_secret, ref.channel_ref} + end) - BridgeCore.end_session("x") + assert :ready == new_channel.status + + assert :ok == BridgeCore.route_message("x", CloudEvent.new("a", "b", "c", "d", "e", "f", "g", "h", "i")) end end test "should not route message to un-existent channel" do - route_result = BridgeCore.route_message("y", CloudEvent.new("a", "b", "c", "d", "e", "f", "g", "h", "i")) - assert {:error, :noproc} == route_result + with_mocks([ + {ChannelRegistry, [], [lookup_channel_addr: fn _x -> :noproc end] }, + ]) do + + route_result = BridgeCore.route_message("y", CloudEvent.new("a", "b", "c", "d", "e", "f", "g", "h", "i")) + assert {:error, :noproc} == route_result + end + end + + test "should close channel/session" do + with_mocks([ + {ChannelRegistry, [], [lookup_channel_addr: fn _x -> :c.pid(0, 255, 0) end] }, + {ChannelManager, [], [close_channel: fn _x -> :ok end] }, + ]) do + + close_result = BridgeCore.end_session("z") + assert :ok == close_result + end end test "should not close un-existent channel" do - close_result = BridgeCore.end_session("z") - assert {:error, :noproc} == close_result + with_mocks([ + {ChannelRegistry, [], [lookup_channel_addr: fn _x -> :noproc end] }, + ]) do + + close_result = BridgeCore.end_session("z") + assert {:error, :noproc} == close_result + end end test "should parse topology configuration default" do @@ -127,7 +148,7 @@ defmodule BridgeCoreTest do test "should parse topology configuration k8s" do - String.to_atom("Elixir.Cluster.Strategy.Gossip") + _ = String.to_atom("Elixir.Cluster.Strategy.Gossip") with_mocks([ {BridgeHelperConfig, [], [get: fn _, _ -> @@ -135,15 +156,33 @@ defmodule BridgeCoreTest do "strategy" => "Elixir.Cluster.Strategy.Gossip", "config" => %{ "mode" => ":hostname", - "kubernetes_service_name" => "bridge" + "kubernetes_service_name" => "bridge", + "some_other_key" => 10 } } end]} ]) do - assert [k8s: [{:strategy, Cluster.Strategy.Gossip}, {:config, [kubernetes_service_name: "bridge", mode: :hostname]}]] + assert [k8s: [{:strategy, Cluster.Strategy.Gossip}, + {:config, [kubernetes_service_name: "bridge", mode: :hostname, some_other_key: 10]}]] == BridgeCore.topologies() end end + test "should parse topology with nil configuration" do + + _ = String.to_atom("Elixir.Cluster.Strategy.Gossip") + + with_mocks([ + {BridgeHelperConfig, [], [get: fn _, _ -> + %{ + "strategy" => "Elixir.Cluster.Strategy.Gossip", + "config" => nil + } + end]} + ]) do + assert [k8s: [{:strategy, Cluster.Strategy.Gossip}, {:config, []}]] == BridgeCore.topologies() + end + + end end diff --git a/channel-bridge/apps/bridge_helper_config/lib/bridge_helper_config/application_config.ex b/channel-bridge/apps/bridge_helper_config/lib/bridge_helper_config/application_config.ex index 1a8b650..7648fd4 100644 --- a/channel-bridge/apps/bridge_helper_config/lib/bridge_helper_config/application_config.ex +++ b/channel-bridge/apps/bridge_helper_config/lib/bridge_helper_config/application_config.ex @@ -4,12 +4,10 @@ defmodule BridgeHelperConfig.ApplicationConfig do require Logger - @default_file "config-local.yaml" - # configuration elements to be loaed as atoms @atom_keys [ [:bridge, "channel_authenticator"], - [:bridge, "cloud_event_mutator"] + [:bridge, "cloud_event_mutator", "mutator_module"] ] def load(file_path \\ nil) do @@ -58,9 +56,20 @@ defmodule BridgeHelperConfig.ApplicationConfig do end defp load_atoms(config) do - @atom_keys - |> Enum.map(fn(k) -> {List.last(k), get_in(config, k)} end) + |> Enum.map(fn(k) -> + res = get_in(config, k) + |> String.to_atom + |> Code.ensure_compiled + + case res do + {:error, _} -> + Logger.warning("invalid configuration for key #{k}, not a valid atom detected. Errors may occur during runtime") + nil + {:module, m} -> + {k, m} + end + end) |> Enum.filter(fn({k,v}) -> case v do nil -> @@ -69,19 +78,9 @@ defmodule BridgeHelperConfig.ApplicationConfig do _ -> true end end) - |> Enum.map(fn({k,v}) -> - res = String.to_atom(v) - |> Code.ensure_compiled - case res do - {:error, _} -> - Logger.warning("invalid configuration for key #{k}, value #{v} is not a valid atom. Errors may occur during runtime") - nil - {:module, _} -> - v - end + |> Enum.reduce(config, fn({k,v}, acc) -> + put_in(acc, k, v) end) - - config end defp set_logging_config(config) do diff --git a/channel-bridge/apps/bridge_helper_config/test/test-config.yaml b/channel-bridge/apps/bridge_helper_config/test/test-config.yaml index cfb5136..af3d9c8 100644 --- a/channel-bridge/apps/bridge_helper_config/test/test-config.yaml +++ b/channel-bridge/apps/bridge_helper_config/test/test-config.yaml @@ -4,7 +4,9 @@ bridge: - $.subject request_channel_identifier: - "$.req_headers['sub']" - cloud_event_mutator: Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator + cloud_event_mutator: + mutator_module: Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator + channel_authenticator: Elixir.BridgeRestapiAuth.PassthroughProvider event_bus: rabbitmq: diff --git a/channel-bridge/apps/bridge_rabbitmq/mix.exs b/channel-bridge/apps/bridge_rabbitmq/mix.exs index dbe6f74..baf9e0d 100644 --- a/channel-bridge/apps/bridge_rabbitmq/mix.exs +++ b/channel-bridge/apps/bridge_rabbitmq/mix.exs @@ -36,7 +36,7 @@ defmodule BridgeRabbitmq.MixProject do {:sweet_xml, "~> 0.6"}, {:vapor, "~> 0.10.0"}, # testing dependencies - {:mock, "~> 0.3.0", only: :test}, + {:mock, "~> 0.3.8", only: :test}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:credo_sonarqube, "~> 0.1.3", only: [:dev, :test]}, {:sobelow, "~> 0.8", only: :dev}, diff --git a/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_helper.ex b/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_helper.ex index bd585f8..f2bae1b 100644 --- a/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_helper.ex +++ b/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_helper.ex @@ -28,12 +28,12 @@ defmodule BridgeApi.Rest.RestHelper do with {:ok, channel} <- build_channel_info_from_request(request_data), {:ok, {new_channel, _mutator}} <- BridgeCore.start_session(channel) do - [{ref, key} | _tail] = new_channel.procs + [ref | _tail] = new_channel.procs ok_response(%{ "alias" => new_channel.channel_alias, - "channel_ref" => ref, - "channel_secret" => key + "channel_ref" => ref.channel_ref, + "channel_secret" => ref.channel_secret }) else @@ -113,11 +113,6 @@ defmodule BridgeApi.Rest.RestHelper do {%{"result" => "ok"}, 200} end - # defp handle_error_response({:error, :alreadyopen}) do - # {%{"errors" => [ErrorResponse.new("", "", "ADF00100", "channel already registered", "")]}, - # 400} - # end - defp handle_error_response({:error, :channel_sender_econnrefused}) do {%{"errors" => [ErrorResponse.new("", "", "ADF00105", "ADF Sender error", "")]}, 502} end diff --git a/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_router.ex b/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_router.ex index f6bc2b4..dcc4971 100644 --- a/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_router.ex +++ b/channel-bridge/apps/bridge_restapi/lib/bridge_api/rest/rest_router.ex @@ -44,13 +44,13 @@ defmodule BridgeApi.Rest.RestRouter do match(_, do: send_resp(conn, 404, "Resource not found")) defp start_session(conn) do - build_data_map(conn) + build_request_data(conn) |> RestHelper.start_session() |> send_response(conn) end defp delete_channel(conn) do - build_data_map(conn) + build_request_data(conn) |> RestHelper.close_channel() |> send_response(conn) end @@ -62,8 +62,8 @@ defmodule BridgeApi.Rest.RestRouter do |> send_resp(status, Jason.encode!(data)) end - @spec build_data_map(conn()) :: ChannelRequest.t() - defp build_data_map(conn) do + @spec build_request_data(conn()) :: ChannelRequest.t() + defp build_request_data(conn) do {:ok, all_headers} = Header.all_headers(conn) ChannelRequest.new( diff --git a/channel-bridge/apps/bridge_restapi/test/bridge_api/rest/channel_request_test.exs b/channel-bridge/apps/bridge_restapi/test/bridge_api/rest/channel_request_test.exs index 3a7e4be..ddca6f6 100644 --- a/channel-bridge/apps/bridge_restapi/test/bridge_api/rest/channel_request_test.exs +++ b/channel-bridge/apps/bridge_restapi/test/bridge_api/rest/channel_request_test.exs @@ -38,7 +38,7 @@ defmodule BridgeApi.Rest.ChannelRequestTest do sample_headers = %{"appid" => "value1"} ch_req = ChannelRequest.new(sample_headers, %{}, %{}, %{}) - assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{id: "value1", name: ""}} + assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{channel_timeout: 420, id: "value1", name: ""}} end end @@ -55,7 +55,7 @@ defmodule BridgeApi.Rest.ChannelRequestTest do ]) do ch_req = ChannelRequest.new(%{}, %{}, %{}, %{}) - assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{id: "default_app", name: ""}} + assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{channel_timeout: 420, id: "default_app", name: ""}} end end @@ -72,7 +72,7 @@ defmodule BridgeApi.Rest.ChannelRequestTest do ]) do ch_req = ChannelRequest.new(%{}, %{}, %{}, %{}) - assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{id: "fooapp", name: ""}} + assert ChannelRequest.extract_application(ch_req) == {:ok, %BridgeCore.AppClient{channel_timeout: 420, id: "fooapp", name: ""}} end end diff --git a/channel-bridge/apps/bridge_secretmanager/mix.exs b/channel-bridge/apps/bridge_secretmanager/mix.exs index dd40f57..0d34329 100644 --- a/channel-bridge/apps/bridge_secretmanager/mix.exs +++ b/channel-bridge/apps/bridge_secretmanager/mix.exs @@ -31,7 +31,7 @@ defmodule BridgeSecretManager.MixProject do {:ex_aws_sts, "~> 2.2"}, {:ex_aws_secretsmanager, "~> 2.0"}, # test only dependencies - {:mock, "~> 0.3.0", only: :test} + {:mock, "~> 0.3.8", only: :test} ] end end diff --git a/channel-bridge/config-local.yaml b/channel-bridge/config-local.yaml index b6f8cb8..b80f8b4 100644 --- a/channel-bridge/config-local.yaml +++ b/channel-bridge/config-local.yaml @@ -4,7 +4,19 @@ bridge: - $.subject request_channel_identifier: - "$.req_headers['sub']" - cloud_event_mutator: Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator + cloud_event_mutator: + mutator_module: Elixir.BridgeCore.CloudEvent.Mutator.DefaultMutator +# mutator_module: Elixir.BridgeCore.CloudEvent.Mutator.WebhookMutator +# config: +# webhook_url: "http://localhost:3000/content/x" +# webhook_method: "POST" +# webhook_headers: +# - "Accept: application/json" +# applies_when: +# - key: "$.invoker" +# comparator: "contains" +# value: "acme" + channel_authenticator: Elixir.BridgeRestapiAuth.PassthroughProvider event_bus: rabbitmq: @@ -25,15 +37,15 @@ bridge: ## ## Optionally and for local dev environments Rabbitmq host and credentials can be configured directly - ## here: - # username: - # password: - # hostname: localhost - # port: 5672 - # virtualhost: / - # ssl: false + ## here: +# username: +# password: +# hostname: localhost +# port: 5672 +# virtualhost: / +# ssl: false - ## producer and processor concurrency + ## producer and processor concurrency producer_concurrency: 1 producer_prefetch: 2 processor_concurrency: 2 @@ -41,7 +53,7 @@ bridge: # sqs: # queue: sample-queue - # ## producer and processor concurrency + # ##producer and processor concurrency # producer_concurrency: 1 # producer_prefetch: 2 # processor_concurrency: 2 @@ -57,6 +69,9 @@ bridge: # kubernetes_selector: "cluster=beam" # namespace: "bridgenm" # polling_interval: 5000 +# apps: +# - name: demo1 +# timeout: 5000 sender: url: http://localhost:8081 diff --git a/channel-bridge/mix.exs b/channel-bridge/mix.exs index 89b2cc4..e163d65 100644 --- a/channel-bridge/mix.exs +++ b/channel-bridge/mix.exs @@ -4,7 +4,7 @@ defmodule ChannelBridge.MixProject do def project do [ apps_path: "apps", - version: "0.1.0", + version: "0.2.0", start_permanent: Mix.env() == :prod, deps: deps(), releases: [ diff --git a/channel-bridge/mix.lock b/channel-bridge/mix.lock index a467211..c361de5 100644 --- a/channel-bridge/mix.lock +++ b/channel-bridge/mix.lock @@ -25,6 +25,7 @@ "ex_unit_sonarqube": {:hex, :ex_unit_sonarqube, "0.1.3", "6277aaddb6caff32402f227cd8390eca8ffe00217b4bc7974a477346c84a0119", [:mix], [], "hexpm", "e41b03efe9a0019f63f2f6309b02c8e443b0a36eb4cf6c9bc9593eb5f5de4882"}, "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, "exjsonpath": {:hex, :exjsonpath, "0.9.0", "87e593eb0deb53aa0688ca9f9edc9fb3456aca83c82245f83201ea04d696feba", [:mix], [], "hexpm", "8d7a8e9ba784e1f7a67c6f1074a3ac91a3a79a45969514ee5d95cea5bf749627"}, + "expreso": {:hex, :expreso, "0.1.1", "7165da175f14f55d1a88f474389b05e7697f215c0c65423beb09d02db9babb95", [:mix], [], "hexpm", "6b1c83909111a74f93dadb62429da3abf2f256fe35523e909126dd5b476ff3c7"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, From 38f94c06f12956c8bfefcdfd93127373d0c0e746 Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Date: Wed, 3 Apr 2024 16:38:46 -0500 Subject: [PATCH 2/2] feat: added method for stopping routing processes --- clients/backend-client-elixir/README.md | 2 +- .../lib/adf_sender_connector.ex | 8 +++++ clients/backend-client-elixir/mix.exs | 5 +-- .../test/adf_sender_connector_test.exs | 35 ++++++++++++++----- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/clients/backend-client-elixir/README.md b/clients/backend-client-elixir/README.md index 0ae6dae..33f4d70 100644 --- a/clients/backend-client-elixir/README.md +++ b/clients/backend-client-elixir/README.md @@ -10,7 +10,7 @@ by adding `adf_sender_connector` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:adf_sender_connector, "~> 0.1.0"} + {:adf_sender_connector, "~> 0.3.0"} ] end ``` diff --git a/clients/backend-client-elixir/lib/adf_sender_connector.ex b/clients/backend-client-elixir/lib/adf_sender_connector.ex index de4f1e6..d37b0fb 100644 --- a/clients/backend-client-elixir/lib/adf_sender_connector.ex +++ b/clients/backend-client-elixir/lib/adf_sender_connector.ex @@ -131,6 +131,14 @@ defmodule AdfSenderConnector do DynamicSupervisor.start_child(__MODULE__, Router.child_spec([name: channel_ref] ++ new_options)) end + @doc """ + Stops a routing process. + """ + @spec stop_router_process(channel_ref()) :: :ok | {:error, any()} + def stop_router_process(channel_ref) do + Logger.debug("Stopping routing process: #{inspect(channel_ref)}") + DynamicSupervisor.stop(__MODULE__, channel_ref) + end @spec route_message(channel_ref(), event_name(), message() | message_data()) :: {:ok, map()} | {:error, any()} @doc """ diff --git a/clients/backend-client-elixir/mix.exs b/clients/backend-client-elixir/mix.exs index 44b4e40..ab14064 100644 --- a/clients/backend-client-elixir/mix.exs +++ b/clients/backend-client-elixir/mix.exs @@ -4,7 +4,7 @@ defmodule AdfSenderConnector.MixProject do def project do [ app: :adf_sender_connector, - version: "0.2.2", + version: "0.3.0", elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, @@ -53,7 +53,8 @@ defmodule AdfSenderConnector.MixProject do {:sobelow, "~> 0.8", only: :dev}, {:excoveralls, "~> 0.10", only: :test}, {:ex_unit_sonarqube, "~> 0.1", only: :test}, - {:benchee, "~> 1.1", only: [:dev, :benchee]} + {:benchee, "~> 1.1", only: [:dev, :benchee]}, + {:ssl_verify_fun, "~> 1.1.6", manager: :rebar3, override: true} ] end diff --git a/clients/backend-client-elixir/test/adf_sender_connector_test.exs b/clients/backend-client-elixir/test/adf_sender_connector_test.exs index 6771d3e..2a20233 100644 --- a/clients/backend-client-elixir/test/adf_sender_connector_test.exs +++ b/clients/backend-client-elixir/test/adf_sender_connector_test.exs @@ -40,15 +40,6 @@ defmodule AdfSenderConnectorTest do assert {:error, :channel_sender_econnrefused} == AdfSenderConnector.channel_registration("a1", "b1", options) end - # test "fail to create a process due to invalid options" do - # options = [name: :xxx, alpha: true] - - # assert_raise NimbleOptions.ValidationError, fn -> - # AdfSenderConnector.channel_registration("a2", "b2", options) - # end - - # end - test "deliver a message via channel" do ### first exchange credentials @@ -125,4 +116,30 @@ defmodule AdfSenderConnectorTest do end + test "should stop routing process" do + + options = [sender_url: @sender_url, http_opts: []] + + ### first exchange credentials + create_response = %HTTPoison.Response{ + status_code: 200, + body: "{ \"channel_ref\": \"dummy.channel.ref4\", \"channel_secret\": \"yyy4\"}" + } + + with_mocks([ + {HTTPoison, [], [post: fn _url, _params, _headers, _opts -> {:ok, create_response} end]} + ]) do + assert {:ok, %{"channel_ref" => "dummy.channel.ref4", "channel_secret" => "yyy4"}} + == AdfSenderConnector.channel_registration("a4", "b4", options) + end + + ### then create a process to map that name + {:ok, pid} = AdfSenderConnector.start_router_process("dummy.channel.ref4") + + ### and then stop the router process + assert :ok == AdfSenderConnector.stop_router_process("dummy.channel.ref4") + + + end + end