Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved the reconnection workflow #234

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions lib/mongo/mongo_db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule Mongo.MongoDBConnection do
wire_version: 0,
auth_mechanism: opts[:auth_mechanism] || nil,
connection_type: Keyword.fetch!(opts, :connection_type),
server_pid: Keyword.get(opts, :server_pid),
topology_pid: Keyword.fetch!(opts, :topology_pid),
stable_api: Keyword.get(opts, :stable_api),
use_op_msg: Keyword.get(opts, :stable_api) != nil,
Expand All @@ -48,8 +49,24 @@ defmodule Mongo.MongoDBConnection do
end

@impl true
def disconnect(_error, %{connection: {mod, socket}, connection_type: type, topology_pid: pid, host: host}) do
GenServer.cast(pid, {:disconnect, type, host})
## the stream monitor disconnects, we change the mode of the parent monitor
def disconnect(_error, %{connection: {mod, socket}, connection_type: :stream_monitor, parent_pid: parent_pid}) do
## Logger.debug("MongoDB-Connection: disconnected stream monitor: #{inspect(error)}")
GenServer.cast(parent_pid, :stop_streaming_mode)
mod.close(socket)
:ok
end

def disconnect(_error, %{connection: {mod, socket}, connection_type: :monitor, topology_pid: topology_pid, host: host, server_pid: server_pid}) do
## Logger.debug("MongoDB-Connection: disconnected: #{inspect(error)}, #{inspect(server_pid)}, #{inspect(host)}, cast disconnect :monitor")
GenServer.cast(server_pid, :stop_streaming_mode)
GenServer.cast(topology_pid, {:disconnect, :monitor, host, server_pid})
mod.close(socket)
:ok
end

def disconnect(_error, %{connection: {mod, socket}}) do
## Logger.debug("MongoDB-Connection: disconnected: #{inspect error}, #{inspect type}, #{inspect host} #{inspect server_pid}")
mod.close(socket)
:ok
end
Expand Down
28 changes: 21 additions & 7 deletions lib/mongo/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ defmodule Mongo.Monitor do
GenServer.cast(pid, :update)
end

def stop_streaming_mode(pid) do
GenServer.cast(pid, :stop_streaming_mode)
end

def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do
GenServer.cast(pid, {:update, heartbeat_frequency_ms})
end
Expand All @@ -59,7 +63,7 @@ defmodule Mongo.Monitor do
Initialize the monitor process
"""
def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do
## debug info("Starting monitor process with pid #{inspect self()}, #{inspect address}")
## Logger.info("Starting monitor process with pid #{inspect(self())}, #{inspect(address)}")

# monitors don't authenticate and use the "admin" database
opts =
Expand All @@ -73,6 +77,7 @@ defmodule Mongo.Monitor do
|> Keyword.put(:topology_pid, topology_pid)
|> Keyword.put(:pool_size, 1)
|> Keyword.put(:idle_interval, 5_000)
|> Keyword.put(:server_pid, self())

with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
{:ok,
Expand All @@ -97,18 +102,17 @@ defmodule Mongo.Monitor do
end

@doc """
In case of terminating we stop the our linked processes as well:
In case of terminating we stop our linked processes as well:
* connection
* streaming process
"""
def terminate(reason, %{connection_pid: connection_pid, streaming_pid: nil}) do
## debug info("Terminating monitor for reason #{inspect reason}")
## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}")
GenServer.stop(connection_pid, reason)
end

def terminate(reason, %{connection_pid: connection_pid, streaming_pid: streaming_pid}) do
## debug info("Terminating monitor for reason #{inspect reason}, #{inspect self()}, #{inspect streaming_pid}")

## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}, #{inspect(streaming_pid)}")
GenServer.stop(connection_pid, reason)
GenServer.stop(streaming_pid, reason)
end
Expand All @@ -117,6 +121,7 @@ defmodule Mongo.Monitor do
Report the connection event, so the topology process can now create the connection pool.
"""
def connected(_connection, me, topology_pid) do
## Logger.info("Monitor #{inspect(me)} connected to server! ")
Topology.monitor_connected(topology_pid, me)
GenServer.cast(me, :update)
end
Expand All @@ -125,6 +130,15 @@ defmodule Mongo.Monitor do
{:reply, Map.put(state, :pid, self()), state}
end

def handle_cast(:stop_streaming_mode, %{streaming_pid: streaming_pid} = state) when streaming_pid != nil do
spawn(fn -> GenServer.stop(streaming_pid) end)
{:noreply, %{state | mode: :polling_mode, streaming_pid: nil}}
end

def handle_cast(:stop_streaming_mode, state) do
{:noreply, %{state | mode: :polling_mode}}
end

##
# Update the server description or the rrt value
##
Expand Down Expand Up @@ -207,11 +221,11 @@ defmodule Mongo.Monitor do
# Starts the streaming mode
##
defp start_streaming_mode(%{address: address, topology_pid: topology_pid, opts: opts} = state, _server_description) do
args = [topology_pid, address, opts]
args = [self(), topology_pid, address, opts]

case StreamingHelloMonitor.start_link(args) do
{:ok, pid} ->
## debug info("Starting streaming mode: #{inspect self()}")
## Logger.debug("Starting streaming mode: #{inspect(pid)}")
%{state | mode: :streaming_mode, streaming_pid: pid, heartbeat_frequency_ms: 10_000}

error ->
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/password_safe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Mongo.PasswordSafe do

use GenServer

def new() do
def start_link() do
GenServer.start_link(@me, [])
end

Expand Down
4 changes: 2 additions & 2 deletions lib/mongo/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ defmodule Mongo.Repo do
@callback update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}

@doc """
Same as `c:update/1` but raises an error.
Same as `c:update/2` but raises an error.
"""
@callback update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()

Expand All @@ -471,7 +471,7 @@ defmodule Mongo.Repo do
@callback insert_or_update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}

@doc """
Same as `c:insert_or_update/1` but raises an error.
Same as `c:insert_or_update/2` but raises an error.
"""
@callback insert_or_update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()

Expand Down
8 changes: 5 additions & 3 deletions lib/mongo/streaming_hello_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ defmodule Mongo.StreamingHelloMonitor do
@doc """
Initialize the monitor process
"""
def init([topology_pid, address, opts]) do
def init([monitor_pid, topology_pid, address, opts]) do
heartbeat_frequency_ms = 10_000

opts =
opts
|> Keyword.drop([:after_connect])
|> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]})
|> Keyword.put(:connection_type, :stream_monitor)
|> Keyword.put(:server_pid, self())
|> Keyword.put(:monitor_pid, monitor_pid)

## debug info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}")

Expand Down Expand Up @@ -65,7 +67,7 @@ defmodule Mongo.StreamingHelloMonitor do
In this case we stop the DBConnection.
"""
def terminate(reason, %{connection_pid: connection_pid}) do
## debug info("Terminating streaming hello monitor for reason #{inspect reason}")
## Logger.debug("Terminating streaming hello monitor for reason #{inspect(reason)}")
GenServer.stop(connection_pid, reason)
end

Expand All @@ -84,7 +86,7 @@ defmodule Mongo.StreamingHelloMonitor do
end

def handle_info({:EXIT, _pid, reason}, state) do
## debug Logger.warn("Stopped with reason #{inspect reason}")
Logger.warning("Stopped with reason #{inspect(reason)}")
{:stop, reason, state}
end

Expand Down
96 changes: 72 additions & 24 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ defmodule Mongo.Topology do
GenServer.call(pid, :get_state)
end

# 97
def select_server(pid, type, opts \\ []) do
timeout = Keyword.get(opts, :checkout_timeout, @default_checkout_timeout)
GenServer.call(pid, {:select_server, type, opts}, timeout)
Expand Down Expand Up @@ -109,7 +108,7 @@ defmodule Mongo.Topology do
end

def stop(pid) do
GenServer.stop(pid)
GenServer.stop(pid, :stop)
end

## GenServer Callbacks
Expand Down Expand Up @@ -163,14 +162,7 @@ defmodule Mongo.Topology do
end
end

def terminate(_reason, state) do
case state.opts[:pw_safe] do
nil -> nil
pid -> GenServer.stop(pid)
end

Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end)
Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end)
def terminate(_reason, _state) do
Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()})
end

Expand Down Expand Up @@ -204,18 +196,25 @@ defmodule Mongo.Topology do
##
# In case of :monitor or :stream_monitor we mark the server description of the address as unknown
##
def handle_cast({:disconnect, kind, address}, state) when kind in [:monitor, :stream_monitor] do
server_description = ServerDescription.parse_hello_response(address, "#{inspect(kind)} disconnected")
def handle_cast({:disconnect, :monitor, address, pid}, state) do
server_description = ServerDescription.parse_hello_response(address, "monitor disconnected")
## Logger.debug("Disconnect monitor with #{inspect(pid)}")

new_state =
address
|> remove_address(state)
|> close_connection_pool(pid, state)
|> maybe_reinit()

handle_cast({:server_description, server_description}, new_state)
end

def handle_cast({:disconnect, _kind, _host}, state) do
def handle_cast({:disconnect, :stream_monitor, _host, _pid}, state) do
## IO.inspect("ignored: kind stream_monitor with #{inspect pid}")
{:noreply, state}
end

def handle_cast({:disconnect, _kind, _host, _pid}, state) do
## IO.inspect("ignored: kind #{inspect kind}")
{:noreply, state}
end

Expand All @@ -233,6 +232,7 @@ defmodule Mongo.Topology do

{host, ^monitor_pid} ->
arbiters = fetch_arbiters(state)
Mongo.Events.notify(%ServerOpeningEvent{address: host, topology_pid: self()})

if host in arbiters do
state
Expand All @@ -243,8 +243,9 @@ defmodule Mongo.Topology do
|> Keyword.put(:topology_pid, self())
|> connect_opts_from_address(host)

## Logger.debug("Starting connection pool for #{inspect(host)}")
{:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts)
connection_pools = Map.put(state.connection_pools, host, pool)
connection_pools = replace_pool(state.connection_pools, host, pool)

Process.send_after(self(), {:new_connection, state.waiting_pids}, 10)

Expand Down Expand Up @@ -279,6 +280,49 @@ defmodule Mongo.Topology do
{:noreply, state}
end

## remove the address only if the pid is the same
defp close_connection_pool(address, pid, state) do
## Logger.debug("Closing connection pool by pid: #{inspect(state.monitors[address] == pid)}, #{inspect(pid)}, #{inspect(state.monitors[address])}")

case state.monitors[address] == pid do
true ->
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
## stopping the connection pool
case state.connection_pools[address] do
nil ->
:ok

pid ->
if Process.alive?(pid) do
## Logger.debug("Stopping the connection pool #{inspect(pid)} für #{inspect(address)}")
GenServer.stop(pid)
end
end

%{state | connection_pools: Map.delete(state.connection_pools, address)}

false ->
state
end
end

## replaces a pool for the host address
defp replace_pool(connection_pools, host, pool) do
## if we found an existing pool, we will stop it first
case Map.get(connection_pools, host) do
nil ->
:noop

pid ->
if Process.alive?(pid) do
## Logger.debug("Stopping the connection pool #{inspect(pid)}")
GenServer.stop(pid)
end
end

Map.put(connection_pools, host, pool)
end

##
# Update server description: in case of logical session the function creates a session pool for the `deployment`.
#
Expand Down Expand Up @@ -510,9 +554,6 @@ defmodule Mongo.Topology do
Enum.reduce(added, state, fn address, state ->
server_description = state.topology.servers[address]
connopts = connect_opts_from_address(state.opts, address)

Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()})

args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)]
{:ok, pid} = Monitor.start_link(args)

Expand Down Expand Up @@ -549,16 +590,23 @@ defmodule Mongo.Topology do
end

defp remove_address(address, state) do
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})

case state.monitors[address] do
nil -> :ok
pid -> GenServer.stop(pid)
nil ->
:ok

pid ->
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
## Logger.debug("Stopping: #{inspect(pid)} for #{inspect(address)}")
GenServer.stop(pid)
end

case state.connection_pools[address] do
nil -> :ok
pid -> GenServer.stop(pid)
nil ->
:ok

pid ->
## Logger.debug("Connection pool: #{inspect(address)}")
GenServer.stop(pid)
end

%{state | monitors: Map.delete(state.monitors, address), connection_pools: Map.delete(state.connection_pools, address)}
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/url_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ defmodule Mongo.UrlParser do

value ->
## start GenServer and put id
with {:ok, pid} <- Mongo.PasswordSafe.new(),
with {:ok, pid} <- Mongo.PasswordSafe.start_link(),
:ok <- Mongo.PasswordSafe.set_password(pid, value) do
opts
|> Keyword.put(:password, "*****")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Mongodb.Mixfile do
{:patch, "~> 0.12.0", only: [:dev, :test]},
{:jason, "~> 1.3", only: [:dev, :test]},
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
{:ex_doc, "== 0.24.1", only: :dev, runtime: false}
]
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
Expand Down
2 changes: 1 addition & 1 deletion test/mongo/password_safe_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Mongo.PasswordSafeTest do

test "encrypted password" do
pw = "my-secret-password"
{:ok, pid} = PasswordSafe.new()
{:ok, pid} = PasswordSafe.start_link()
PasswordSafe.set_password(pid, pw)
%{key: _key, pw: enc_pw} = :sys.get_state(pid)
assert enc_pw != pw
Expand Down
Loading