Skip to content

Commit

Permalink
[#136] Fix the replicated adapter to ensure the local response while …
Browse files Browse the repository at this point in the history
…replicating
  • Loading branch information
cabol committed Nov 13, 2021
1 parent b8f01cf commit cd998cc
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 22 deletions.
26 changes: 17 additions & 9 deletions lib/nebulex/adapters/replicated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,22 @@ defmodule Nebulex.Adapters.Replicated do
end

defp multi_call(%{name: name, task_sup: task_sup} = meta, action, args, opts) do
task_sup
|> RPC.multi_call(
Cluster.get_nodes(name),
__MODULE__,
:with_dynamic_cache,
[meta, action, args],
opts
)
|> handle_rpc_multi_call(meta, action)
# Run the command locally first
local = with_dynamic_cache(meta, action, args)

# Run the command on the remote nodes
{ok_nodes, error_nodes} =
RPC.multi_call(
task_sup,
Cluster.get_nodes(name) -- [node()],
__MODULE__,
:with_dynamic_cache,
[meta, action, args],
opts
)

# Process the responses adding the local one as source of truth
handle_rpc_multi_call({[local | ok_nodes], error_nodes}, meta, action)
end

defp handle_rpc_multi_call({res, []}, _meta, _action), do: hd(res)
Expand All @@ -574,6 +581,7 @@ defmodule Nebulex.Adapters.Replicated do

defp handle_rpc_multi_call({responses, {:sanitized, {errors, rpc_errors}}}, meta, action) do
_ = dispatch_replication_error(meta, action, rpc_errors)

raise Nebulex.RPCMultiCallError, action: action, responses: responses, errors: errors
end

Expand Down
7 changes: 6 additions & 1 deletion lib/nebulex/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ defmodule Nebulex.Cache do
one could define a module:
defmodule MyApp.Telemetry do
def handle_event([:my_app, :cache, :command, event], measurements, metadata, config) do
def handle_event(
[:my_app, :cache, :command, event],
measurements,
metadata,
config
) do
case event do
:start ->
# Handle start event ...
Expand Down
25 changes: 20 additions & 5 deletions lib/nebulex/exceptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ defmodule Nebulex.RegistryLookupError do
@moduledoc """
Raised at runtime when the cache was not started or it does not exist.
"""

@type t :: %__MODULE__{message: binary, name: atom}

defexception [:message, :name]

@doc false
def exception(opts) do
name = Keyword.fetch!(opts, :name)

Expand All @@ -19,9 +23,12 @@ defmodule Nebulex.KeyAlreadyExistsError do
@moduledoc """
Raised at runtime when a key already exists in cache.
"""

@type t :: %__MODULE__{key: term, cache: atom}

defexception [:key, :cache]

@impl true
@doc false
def message(%{key: key, cache: cache}) do
"key #{inspect(key)} already exists in cache #{inspect(cache)}"
end
Expand All @@ -31,8 +38,12 @@ defmodule Nebulex.QueryError do
@moduledoc """
Raised at runtime when the query is invalid.
"""

@type t :: %__MODULE__{message: binary}

defexception [:message]

@doc false
def exception(opts) do
message = Keyword.fetch!(opts, :message)
query = Keyword.fetch!(opts, :query)
Expand All @@ -51,21 +62,25 @@ defmodule Nebulex.RPCMultiCallError do
@moduledoc """
Raised at runtime when a RPC multi_call error occurs.
"""

@type t :: %__MODULE__{message: binary}

defexception [:message]

@doc false
def exception(opts) do
action = Keyword.fetch!(opts, :action)
errors = Keyword.fetch!(opts, :errors)
responses = Keyword.fetch!(opts, :responses)

message = """
RPC error executing action: #{action}
RPC error while executing action #{inspect(action)}
Responses:
Successful responses:
#{inspect(responses, pretty: true)}
Errors:
Remote errors:
#{inspect(errors, pretty: true)}
"""
Expand All @@ -83,7 +98,7 @@ defmodule Nebulex.RPCError do

defexception [:reason, :node]

@impl true
@doc false
def message(%__MODULE__{reason: reason, node: node}) do
format_reason(reason, node)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/nebulex/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ defmodule Nebulex.RPC do
{:ok, res}, _node_callback, {ok, err} ->
{[res | ok], err}

{class, _} = error, node_callback, {ok, err} when class in [:error, :exit, :throw] ->
{kind, _} = error, node_callback, {ok, err} when kind in [:error, :exit, :throw] ->
{ok, [{error, node_callback} | err]}
end
}
Expand Down
4 changes: 2 additions & 2 deletions test/nebulex/adapters/partitioned_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ defmodule Nebulex.Adapters.PartitionedTest do
assert Partitioned.put_all(for(x <- 1..100_000, do: {x, x}), timeout: 60_000) == :ok
assert Partitioned.get(1, timeout: 1000) == 1

msg = ~r"RPC error executing action: all\n\nResponses:"
msg = ~r"RPC error while executing action :all\n\nSuccessful responses:"

assert_raise Nebulex.RPCMultiCallError, msg, fn ->
Partitioned.all(nil, timeout: 1)
Expand All @@ -228,7 +228,7 @@ defmodule Nebulex.Adapters.PartitionedTest do
PartitionedMock.get(1)
end

msg = ~r"RPC error executing action: count_all\n\nResponses:"
msg = ~r"RPC error while executing action :count_all\n\nSuccessful responses:"

assert_raise Nebulex.RPCMultiCallError, msg, fn ->
PartitionedMock.count_all()
Expand Down
12 changes: 8 additions & 4 deletions test/nebulex/adapters/replicated_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ defmodule Nebulex.Adapters.ReplicatedTest do
test "incr/3 raises when the counter is not an integer" do
:ok = Replicated.put(:counter, "string")

assert_raise Nebulex.RPCMultiCallError, fn ->
assert_raise ArgumentError, fn ->
Replicated.incr(:counter, 10)
end
end
Expand Down Expand Up @@ -135,15 +135,19 @@ defmodule Nebulex.Adapters.ReplicatedTest do
end

test "error: rpc error" do
test_with_dynamic_cache(ReplicatedMock, [name: :replicated_mock], fn ->
node_pid_list = start_caches(cluster_nodes(), [{ReplicatedMock, []}])

try do
_ = Process.flag(:trap_exit, true)

msg = ~r"RPC error executing action: put_all\n\nResponses:"
msg = ~r"RPC error while executing action :put_all\n\nSuccessful responses:"

assert_raise Nebulex.RPCMultiCallError, msg, fn ->
ReplicatedMock.put_all(a: 1, b: 2)
end
end)
after
stop_caches(node_pid_list)
end
end

test "ok: start/stop cache nodes" do
Expand Down

0 comments on commit cd998cc

Please sign in to comment.