Skip to content

Commit

Permalink
fix: applies the global timeout value to each query (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook authored Sep 29, 2023
1 parent f68b9c1 commit 64f8ea1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
2 changes: 1 addition & 1 deletion lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ defmodule Mongo do
@spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) ::
{:ok, BSON.document() | nil} | {:error, Mongo.Error.t()}
def exec_command_session(session, cmd, opts) do
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
:ok <- Session.update_session(session, response, opts),
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do
Expand Down
14 changes: 2 additions & 12 deletions lib/mongo/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,8 @@ defmodule Mongo.Monitor do
##
# Get a new server description from the server and send it to the Topology process.
##
defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
case get_server_description(state) do
%{round_trip_time: round_trip_time} ->
## debug info("Updating round_trip_time: #{inspect round_trip_time}")
Topology.update_rrt(topology_pid, address, round_trip_time)

%{state | round_trip_time: round_trip_time}

error ->
warning("Unable to round trip time because of #{inspect(error)}")
state
end
defp update_server_description(%{mode: :streaming_mode} = state) do
state
end

##
Expand Down
47 changes: 32 additions & 15 deletions lib/mongo/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,15 @@ defmodule Mongo.Session do

@doc """
Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically.
The global session timeout is merged to the options as well.
"""
@spec bind_session(Session.t(), BSON.document()) :: {:ok, pid, BSON.document()} | {:error, term()}
def bind_session(nil, _cmd) do
@spec bind_session(Session.t(), BSON.document(), Keyword.t()) :: {:ok, pid, BSON.document(), Keyword.t()} | {:error, term()}
def bind_session(nil, _cmd, _opts) do
{:error, Mongo.Error.exception("No session")}
end

def bind_session(pid, cmd) do
call(pid, {:bind_session, cmd})
def bind_session(pid, cmd, opts) do
call(pid, {:bind_session, cmd, opts})
end

@doc """
Expand Down Expand Up @@ -462,13 +463,16 @@ defmodule Mongo.Session do
##
# bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid and the transaction-id is added
#
def handle_call_event({:bind_session, cmd}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
def handle_call_event({:bind_session, cmd, client_opts}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data)
when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
## only if retryable_writes are enabled!
options =
case opts[:retryable_writes] do
true -> [lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
_ -> [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
true ->
[lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]

_ ->
[lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
end

cmd =
Expand All @@ -477,11 +481,12 @@ defmodule Mongo.Session do
|> ReadPreference.add_read_preference(opts)
|> filter_nils()

{:keep_state_and_data, {:ok, conn, cmd}}
client_opts = merge_timeout(client_opts, opts)
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
result =
def handle_call_event({:bind_session, cmd, client_opts}, :starting_transaction, %Session{conn: conn, opts: opts, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do
cmd =
Keyword.merge(cmd,
readConcern: read_concern(data, Keyword.get(cmd, :readConcern)),
lsid: %{id: id},
Expand All @@ -492,10 +497,11 @@ defmodule Mongo.Session do
|> filter_nils()
|> Keyword.drop(~w(writeConcern)a)

{:next_state, :transaction_in_progress, {:ok, conn, result}}
client_opts = merge_timeout(client_opts, opts)
{:next_state, :transaction_in_progress, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:bind_session, cmd}, :transaction_in_progress, %Session{conn: conn, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
def handle_call_event({:bind_session, cmd, client_opts}, :transaction_in_progress, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do
result =
Keyword.merge(cmd,
lsid: %{id: id},
Expand All @@ -504,12 +510,13 @@ defmodule Mongo.Session do
)
|> Keyword.drop(~w(writeConcern readConcern)a)

{:keep_state_and_data, {:ok, conn, result}}
client_opts = merge_timeout(client_opts, opts)
{:keep_state_and_data, {:ok, conn, result, client_opts}}
end

# In case of wire_version < 6 we do nothing
def handle_call_event({:bind_session, cmd}, _transaction, %Session{conn: conn}) do
{:keep_state_and_data, {:ok, conn, cmd}}
def handle_call_event({:bind_session, cmd, client_opts}, _transaction, %Session{conn: conn}) do
{:keep_state_and_data, {:ok, conn, cmd, client_opts}}
end

def handle_call_event({:commit_transaction, _start_time}, :starting_transaction, _data) do
Expand Down Expand Up @@ -710,4 +717,14 @@ defmodule Mongo.Session do
def in_session(session, _topology_pid, _read_write_type, fun, opts) do
fun.(session, opts)
end

defp merge_timeout(opts, default_ops) do
case Keyword.get(default_ops, :timeout) do
nil ->
opts

timeout ->
Keyword.put_new(opts, :timeout, timeout)
end
end
end
12 changes: 12 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ defmodule Mongo.Topology do

## found
{:ok, {address, opts}} ->
opts = merge_timeout(opts, state.opts)

with {:ok, connection} <- get_connection(address, state),
wire_version <- wire_version(address, topology),
{server_session, new_state} <- checkout_server_session(state),
Expand Down Expand Up @@ -593,4 +595,14 @@ defmodule Mongo.Topology do
Keyword.put_new(opts, :read_preference, read_preference)
end
end

defp merge_timeout(opts, default_ops) do
case Keyword.get(default_ops, :timeout) do
nil ->
opts

timeout ->
Keyword.put_new(opts, :timeout, timeout)
end
end
end

0 comments on commit 64f8ea1

Please sign in to comment.