diff --git a/.credo.exs b/.credo.exs index 8d227a22..aa56add9 100644 --- a/.credo.exs +++ b/.credo.exs @@ -126,7 +126,7 @@ {Credo.Check.Refactor.MapJoin, []}, {Credo.Check.Refactor.NegatedConditionsInUnless, []}, {Credo.Check.Refactor.NegatedConditionsWithElse, []}, - {Credo.Check.Refactor.Nesting, []}, + {Credo.Check.Refactor.Nesting, [max_nesting: 4]}, {Credo.Check.Refactor.UnlessWithElse, []}, {Credo.Check.Refactor.WithClauses, []}, {Credo.Check.Refactor.FilterFilter, []}, diff --git a/README.md b/README.md index f1611542..1c38e6f9 100644 --- a/README.md +++ b/README.md @@ -719,7 +719,7 @@ You need roughly three additional configuration steps: * Authenticate with an x.509 Certificate To get the x.509 authentication working you need to prepare the ssl configuration accordingly: -* you need set the ssl option: `verify_peer` +* you need to set the ssl option: `verify_peer` * you need to specify the `cacertfile` because Erlang BEAM don't provide any CA certificate store by default * you maybe need to customize the hostname check to allow wildcard certificates * you need to specify the `username` from the subject entry of the user certificate @@ -802,7 +802,7 @@ a simple map, supporting the following keys: * `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest` * `:max_staleness_ms`, the maxStaleness value in milliseconds -* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]` +* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]` The driver selects the server using the read preference. @@ -810,7 +810,7 @@ The driver selects the server using the read preference. prefs = %{ mode: :secondary, max_staleness_ms: 120_000, - tag_sets: [dc: "west", usage: "production"] + tags: [dc: "west", usage: "production"] } Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs) @@ -907,7 +907,7 @@ result = Mongo.BulkWrite.write(:mongo, bulk, w: 1) In the following example we import 1.000.000 integers into the MongoDB using the stream api: We need to create an insert operation for each number. Then we call the `Mongo.UnorderedBulk.stream` -function to import it. This function returns a stream function which accumulate +function to import it. This function returns a stream function that accumulates all inserts operations until the limit `1000` is reached. In this case the operation group is send to MongoDB. So using the stream api you can reduce the memory using while importing big volume of data. @@ -1026,7 +1026,7 @@ That means, you can just generate a `raise :should_not_happen` exception as well ## Command Monitoring -You can watch all events that are triggered while the driver send requests and processes responses. You can use the +You can watch all events that are triggered while the driver sends requests and processes responses. You can use the `Mongo.EventHandler` as a starting point. It logs the events from the topic `:commands` (by ignoring the `:isMaster` command) to `Logger.info`: @@ -1041,7 +1041,7 @@ iex> {:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017/test") ## Testing -Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all test except the socket and ssl test. If you want to +Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all tests, except the socket and ssl test. If you want to run the test cases against other MongoDB deployments or older versions, you can use the [mtools](https://github.com/rueckstiess/mtools) for deployment and run the test cases locally: ```bash diff --git a/lib/mongo.ex b/lib/mongo.ex index d759d6b3..4200f8ca 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -1502,7 +1502,7 @@ defmodule Mongo do @spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) :: {:ok, BSON.document() | nil} | {:error, Mongo.Error.t()} def exec_command_session(session, cmd, opts) do - with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd), + with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts), {:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts), :ok <- Session.update_session(session, response, opts), {:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do diff --git a/lib/mongo/grid_fs/upload.ex b/lib/mongo/grid_fs/upload.ex index 35ecac18..2da46ba7 100644 --- a/lib/mongo/grid_fs/upload.ex +++ b/lib/mongo/grid_fs/upload.ex @@ -7,12 +7,12 @@ defmodule Mongo.GridFs.Upload do @doc """ Opens a stream that the application can write the contents of the file to. - The driver generates the file id. + The driver generates the file id if not provided. User data for the 'metadata' field of the files collection document. """ - @spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t() - def open_upload_stream(bucket, filename, meta \\ nil) do - UploadStream.new(bucket, filename, meta) + @spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil, UploadStream.file_id() | nil) :: UploadStream.t() + def open_upload_stream(bucket, filename, meta \\ nil, file_id \\ nil) do + UploadStream.new(bucket, filename, meta, file_id) end end diff --git a/lib/mongo/grid_fs/upload_stream.ex b/lib/mongo/grid_fs/upload_stream.ex index 83cb8bcc..00000cdb 100644 --- a/lib/mongo/grid_fs/upload_stream.ex +++ b/lib/mongo/grid_fs/upload_stream.ex @@ -20,9 +20,10 @@ defmodule Mongo.GridFs.UploadStream do alias Mongo.GridFs.Bucket alias Mongo.GridFs.UploadStream + @type file_id :: BSON.ObjectId.t() | binary() @type t :: %__MODULE__{ bucket: Bucket.t(), - id: BSON.ObjectId.t(), + id: file_id(), filename: String.t(), metadata: {BSON.document() | nil} } @@ -31,9 +32,9 @@ defmodule Mongo.GridFs.UploadStream do @doc """ Creates a new upload stream to insert a file into the grid-fs. """ - @spec new(Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t() - def new(bucket, filename, metadata \\ nil) do - %UploadStream{bucket: bucket, filename: filename, id: Mongo.object_id(), metadata: metadata} + @spec new(Bucket.t(), String.t(), BSON.document() | nil, file_id() | nil) :: UploadStream.t() + def new(bucket, filename, metadata \\ nil, file_id \\ nil) do + %UploadStream{bucket: bucket, filename: filename, id: file_id || Mongo.object_id(), metadata: metadata} end defimpl Collectable, for: UploadStream do diff --git a/lib/mongo/monitor.ex b/lib/mongo/monitor.ex index e8b43ccd..d56fce6d 100644 --- a/lib/mongo/monitor.ex +++ b/lib/mongo/monitor.ex @@ -186,18 +186,8 @@ defmodule Mongo.Monitor do ## # Get a new server description from the server and send it to the Topology process. ## - defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do - case get_server_description(state) do - %{round_trip_time: round_trip_time} -> - ## debug info("Updating round_trip_time: #{inspect round_trip_time}") - Topology.update_rrt(topology_pid, address, round_trip_time) - - %{state | round_trip_time: round_trip_time} - - error -> - warning("Unable to round trip time because of #{inspect(error)}") - state - end + defp update_server_description(%{mode: :streaming_mode} = state) do + state end ## diff --git a/lib/mongo/read_preference.ex b/lib/mongo/read_preference.ex index 55bea01b..3671f5e3 100644 --- a/lib/mongo/read_preference.ex +++ b/lib/mongo/read_preference.ex @@ -4,20 +4,20 @@ defmodule Mongo.ReadPreference do @moduledoc ~S""" Determines which servers are considered suitable for read operations - A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`. + A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`. The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers. - If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection. + If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection. If hedge is set, it configures how server hedged reads are used. The default mode is `:primary`. - The default tag_sets is a list with an empty tag set: [{}]. + The default tags is a list with an empty tag set: [{}]. The default max_staleness_ms is unset. The default hedge is unset. ## mode * `:primary` Only an available primary is suitable. - * `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable. + * `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable. * `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates, but only eligible secondaries are suitable. * `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable. @@ -25,113 +25,120 @@ defmodule Mongo.ReadPreference do * `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable. """ - @type t :: %{ - mode: - :primary - | :secondary - | :primary_preferred - | :secondary_preferred - | :nearest, - tag_sets: [%{String.t() => String.t()}], - max_staleness_ms: non_neg_integer, - hedge: BSON.document() - } @primary %{ mode: :primary, - tag_sets: [], + tags: [], max_staleness_ms: 0 } - def primary(map \\ nil) + @doc """ + Merge default values to the read preferences and converts deprecated tag_sets to tags + """ + def merge_defaults(%{tag_sets: tags} = map) do + map = + map + |> Map.delete(:tag_sets) + |> Map.put(:tags, tags) + + Map.merge(@primary, map) + end - def primary(map) when is_map(map) do + def merge_defaults(map) when is_map(map) do Map.merge(@primary, map) end - def primary(_), do: @primary + def merge_defaults(_other) do + @primary + end @doc """ Add read preference to the cmd """ def add_read_preference(cmd, opts) do case Keyword.get(opts, :read_preference) do - nil -> cmd - pref -> cmd ++ ["$readPreference": pref] + nil -> + cmd + + pref -> + cmd ++ ["$readPreference": pref] end end @doc """ - From the specs: - - Use of slaveOk - - There are two usages of slaveOK: - - * A driver query parameter that predated read preference modes and tag set lists. - * A wire protocol flag on OP_QUERY operations - + Converts the preference to the mongodb format for replica sets """ - def slave_ok(%{:mode => :primary}) do - %{:mode => :primary} + def to_replica_set(%{:mode => :primary}) do + %{mode: :primary} end - def slave_ok(config) do + def to_replica_set(config) do mode = case config[:mode] do - :primary_preferred -> :primaryPreferred - :secondary_preferred -> :secondaryPreferred - other -> other - end + :primary_preferred -> + :primaryPreferred - filter_nils(mode: mode, tag_sets: config[:tag_sets]) - end + :secondary_preferred -> + :secondaryPreferred - ## - # Therefore, when sending queries to a mongos, the following rules apply: - # - # For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference - def mongos(%{mode: :primary}) do - nil - end + other -> + other + end - # For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference - def mongos(%{mode: :secondary} = config) do - transform(config) - end + case config[:tags] do + [] -> + %{mode: mode} - # For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference - def mongos(%{mode: :primary_preferred} = config) do - transform(config) - end + nil -> + %{mode: mode} - # For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a - # non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty, - # drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference - def mongos(%{mode: :secondary_preferred} = config) do - transform(config) + tags -> + %{mode: mode, tags: [tags]} + end end - # For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference - def mongos(%{mode: :nearest} = config) do - transform(config) + @doc """ + Converts the preference to the mongodb format for mongos + """ + def to_mongos(%{mode: :primary}) do + nil end - defp transform(config) do + # for the others we should use the read preferences + def to_mongos(config) do mode = case config[:mode] do - :primary_preferred -> :primaryPreferred - :secondary_preferred -> :secondaryPreferred - other -> other + :primary_preferred -> + :primaryPreferred + + :secondary_preferred -> + :secondaryPreferred + + other -> + other end max_staleness_seconds = case config[:max_staleness_ms] do - i when is_integer(i) -> div(i, 1000) - nil -> nil + i when is_integer(i) -> + div(i, 1000) + + nil -> + nil + end + + read_preference = + case config[:tags] do + [] -> + %{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]} + + nil -> + %{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]} + + tags -> + %{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]} end - [mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]] - |> filter_nils() + filter_nils(read_preference) end end diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index 10c01c2d..72a7c627 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -186,14 +186,15 @@ defmodule Mongo.Session do @doc """ Merge the session / transaction data into the cmd. There is no need to call this function directly. It is called automatically. + The global session timeout is merged to the options as well. """ - @spec bind_session(Session.t(), BSON.document()) :: {:ok, pid, BSON.document()} | {:error, term()} - def bind_session(nil, _cmd) do + @spec bind_session(Session.t(), BSON.document(), Keyword.t()) :: {:ok, pid, BSON.document(), Keyword.t()} | {:error, term()} + def bind_session(nil, _cmd, _opts) do {:error, Mongo.Error.exception("No session")} end - def bind_session(pid, cmd) do - call(pid, {:bind_session, cmd}) + def bind_session(pid, cmd, opts) do + call(pid, {:bind_session, cmd, opts}) end @doc """ @@ -462,13 +463,16 @@ defmodule Mongo.Session do ## # bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid and the transaction-id is added # - def handle_call_event({:bind_session, cmd}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data) + def handle_call_event({:bind_session, cmd, client_opts}, transaction, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data) when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do ## only if retryable_writes are enabled! options = case opts[:retryable_writes] do - true -> [lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))] - _ -> [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))] + true -> + [lsid: %{id: id}, txnNumber: %BSON.LongNumber{value: txn_num}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))] + + _ -> + [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))] end cmd = @@ -477,11 +481,12 @@ defmodule Mongo.Session do |> ReadPreference.add_read_preference(opts) |> filter_nils() - {:keep_state_and_data, {:ok, conn, cmd}} + client_opts = merge_timeout(client_opts, opts) + {:keep_state_and_data, {:ok, conn, cmd, client_opts}} end - def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do - result = + def handle_call_event({:bind_session, cmd, client_opts}, :starting_transaction, %Session{conn: conn, opts: opts, server_session: %ServerSession{session_id: id, txn_num: txn_num}, wire_version: wire_version} = data) when wire_version >= 6 do + cmd = Keyword.merge(cmd, readConcern: read_concern(data, Keyword.get(cmd, :readConcern)), lsid: %{id: id}, @@ -492,10 +497,11 @@ defmodule Mongo.Session do |> filter_nils() |> Keyword.drop(~w(writeConcern)a) - {:next_state, :transaction_in_progress, {:ok, conn, result}} + client_opts = merge_timeout(client_opts, opts) + {:next_state, :transaction_in_progress, {:ok, conn, cmd, client_opts}} end - def handle_call_event({:bind_session, cmd}, :transaction_in_progress, %Session{conn: conn, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do + def handle_call_event({:bind_session, cmd, client_opts}, :transaction_in_progress, %Session{conn: conn, opts: opts, wire_version: wire_version, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) when wire_version >= 6 do result = Keyword.merge(cmd, lsid: %{id: id}, @@ -504,12 +510,13 @@ defmodule Mongo.Session do ) |> Keyword.drop(~w(writeConcern readConcern)a) - {:keep_state_and_data, {:ok, conn, result}} + client_opts = merge_timeout(client_opts, opts) + {:keep_state_and_data, {:ok, conn, result, client_opts}} end # In case of wire_version < 6 we do nothing - def handle_call_event({:bind_session, cmd}, _transaction, %Session{conn: conn}) do - {:keep_state_and_data, {:ok, conn, cmd}} + def handle_call_event({:bind_session, cmd, client_opts}, _transaction, %Session{conn: conn}) do + {:keep_state_and_data, {:ok, conn, cmd, client_opts}} end def handle_call_event({:commit_transaction, _start_time}, :starting_transaction, _data) do @@ -710,4 +717,14 @@ defmodule Mongo.Session do def in_session(session, _topology_pid, _read_write_type, fun, opts) do fun.(session, opts) end + + defp merge_timeout(opts, default_ops) do + case Keyword.get(default_ops, :timeout) do + nil -> + opts + + timeout -> + Keyword.put_new(opts, :timeout, timeout) + end + end end diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 68c7512f..d7f21d4e 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -373,6 +373,8 @@ defmodule Mongo.Topology do # checkout a new session # def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do + opts = merge_read_preferences(opts, state.opts) + case TopologyDescription.select_servers(topology, read_write_type, opts) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts}) @@ -381,6 +383,8 @@ defmodule Mongo.Topology do ## found {:ok, {address, opts}} -> + opts = merge_timeout(opts, state.opts) + with {:ok, connection} <- get_connection(address, state), wire_version <- wire_version(address, topology), {server_session, new_state} <- checkout_server_session(state), @@ -398,6 +402,8 @@ defmodule Mongo.Topology do end def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do + opts = merge_read_preferences(opts, state.opts) + case TopologyDescription.select_servers(topology, read_write_type, opts) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts}) @@ -579,4 +585,24 @@ defmodule Mongo.Topology do defp fetch_arbiters(state) do Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end) end + + defp merge_read_preferences(opts, url_opts) do + case Keyword.get(url_opts, :read_preference) do + nil -> + opts + + read_preference -> + Keyword.put_new(opts, :read_preference, read_preference) + end + end + + defp merge_timeout(opts, default_ops) do + case Keyword.get(default_ops, :timeout) do + nil -> + opts + + timeout -> + Keyword.put_new(opts, :timeout, timeout) + end + end end diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index ff7754c5..841472ec 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -88,10 +88,17 @@ defmodule Mongo.TopologyDescription do def select_servers(topology, :write, opts) do servers = case topology.type do - :single -> topology.servers - :sharded -> mongos_servers(topology) - :replica_set_with_primary -> primary_servers(topology) - _ -> [] + :single -> + topology.servers + + :sharded -> + mongos_servers(topology) + + :replica_set_with_primary -> + primary_servers(topology) + + _other -> + [] end addr = @@ -111,31 +118,44 @@ defmodule Mongo.TopologyDescription do read_preference = opts |> Keyword.get(:read_preference) - |> ReadPreference.primary() + |> ReadPreference.merge_defaults() {servers, read_prefs} = case topology.type do - :unknown -> {[], nil} - :single -> {topology.servers, nil} - :sharded -> {mongos_servers(topology), ReadPreference.mongos(read_preference)} - _ -> {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.slave_ok(read_preference)} + :unknown -> + {[], nil} + + :single -> + {topology.servers, nil} + + :sharded -> + {mongos_servers(topology), ReadPreference.to_mongos(read_preference)} + + _other -> + {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.to_replica_set(read_preference)} end opts = case read_prefs do - nil -> Keyword.delete(opts, :read_preference) - prefs -> Keyword.put(opts, :read_preference, prefs) + nil -> + Keyword.delete(opts, :read_preference) + + prefs -> + Keyword.put(opts, :read_preference, prefs) end addr = servers - |> Enum.map(fn {server, _} -> server end) |> Enum.take_random(1) + |> Enum.map(fn {server, _} -> server end) # check now three possible cases case addr do - [] -> :empty - [result] -> {:ok, {result, opts}} + [] -> + :empty + + [result] -> + {:ok, {result, opts}} end end @@ -153,7 +173,7 @@ defmodule Mongo.TopologyDescription do ## # - # Select the primary without without tag_sets or maxStalenessSeconds + # Select the primary without without tags or maxStalenessSeconds # defp select_replica_set_server(topology, :primary, _read_preference) do primary_servers(topology) @@ -161,13 +181,13 @@ defmodule Mongo.TopologyDescription do ## # - # Select the secondary with without tag_sets or maxStalenessSeconds + # Select the secondary with without tags or maxStalenessSeconds # defp select_replica_set_server(topology, :secondary, read_preference) do topology |> secondary_servers() |> filter_out_stale(topology, read_preference.max_staleness_ms) - |> select_tag_sets(read_preference.tag_sets) + |> select_tag_sets(read_preference.tags) |> filter_latency_window(topology.local_threshold_ms) end @@ -175,8 +195,8 @@ defmodule Mongo.TopologyDescription do # From the specs # # 'primaryPreferred' is equivalent to selecting a server with read preference mode 'primary' - # (without tag_sets or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode - # 'secondary' (with tag_sets and maxStalenessSeconds, if provided). + # (without tags or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode + # 'secondary' (with tags and maxStalenessSeconds, if provided). defp select_replica_set_server(topology, :primary_preferred, read_preference) do case primary_servers(topology) do [] -> select_replica_set_server(topology, :secondary, read_preference) @@ -186,8 +206,8 @@ defmodule Mongo.TopologyDescription do ## # From the specs - # 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tag_sets and maxStalenessSeconds) and - # falling back to selecting with mode 'primary' (without tag_sets or maxStalenessSeconds). + # 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tags and maxStalenessSeconds) and + # falling back to selecting with mode 'primary' (without tags or maxStalenessSeconds). # defp select_replica_set_server(topology, :secondary_preferred, read_preference) do case select_replica_set_server(topology, :secondary, read_preference) do @@ -202,11 +222,11 @@ defmodule Mongo.TopologyDescription do # The term 'nearest' is unfortunate, as it implies a choice based on geographic locality or absolute lowest latency, neither of which are true. # # Instead, and unlike the other read preference modes, 'nearest' does not favor either primaries or secondaries; - # instead all servers are candidates and are filtered by tag_sets and maxStalenessSeconds. + # instead all servers are candidates and are filtered by tags and maxStalenessSeconds. defp select_replica_set_server(%{:servers => servers} = topology, :nearest, read_preference) do servers |> filter_out_stale(topology, read_preference.max_staleness_ms) - |> select_tag_sets(read_preference.tag_sets) + |> select_tag_sets(read_preference.tags) |> filter_latency_window(topology.local_threshold_ms) end diff --git a/lib/mongo/url_parser.ex b/lib/mongo/url_parser.ex index c1bcb625..0fe9a5e6 100644 --- a/lib/mongo/url_parser.ex +++ b/lib/mongo/url_parser.ex @@ -6,7 +6,9 @@ defmodule Mongo.UrlParser do """ - @mongo_url_regex ~r/^mongodb(?\+srv)?:\/\/((?[^:]+):(?[^@]+)@)?(?[^\/]+)(\/(?[^?]+))?(\?(?.*))?$/ + require Logger + + @mongo_url_regex ~r/^mongodb(?\+srv)?:\/\/(?:(?[^:]+):(?[^@]+)@)?(?[^\/\?]+)(?:\/(?[^?]*)?(?:\?(?(?:[^\s=]+=[^\s&]*)+))?)?$/ # https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options @mongo_options %{ @@ -182,6 +184,7 @@ defmodule Mongo.UrlParser do frags <- resolve_srv_url(frags), opts <- parse_seeds(opts, frags), opts <- parse_query_options(opts, frags), + opts <- process_read_preferences(opts), # Parse fixed parameters (database, username & password) & merge them with query options opts <- Enum.reduce(frags, opts, &add_option/2) do opts @@ -192,4 +195,67 @@ defmodule Mongo.UrlParser do end def parse_url(opts), do: opts + + defp process_read_preferences(opts) do + opts = + case Keyword.get(opts, :read_preference) do + nil -> + opts + + mode when is_atom(mode) -> + read_preference = + %{mode: mode} + |> extend_read_preference_tags(opts) + |> extend_max_staleness_ms(opts) + + Keyword.put(opts, :read_preference, read_preference) + + _other -> + opts + end + + Keyword.drop(opts, [:read_preference_tags, :max_staleness_seconds]) + end + + defp extend_read_preference_tags(read_preference, opts) do + case Keyword.get(opts, :read_preference_tags, []) |> parse_tags() do + [] -> + read_preference + + tags -> + Map.put(read_preference, :tags, Keyword.new(tags)) + end + end + + defp extend_max_staleness_ms(read_preference, opts) do + case Keyword.get(opts, :max_staleness_seconds) do + nil -> + read_preference + + max_staleness_seconds -> + Map.put(read_preference, :max_staleness_ms, max_staleness_seconds * 1_000) + end + end + + defp parse_tags([]) do + [] + end + + defp parse_tags(tags) do + tags + |> String.split(",") + |> Enum.map(fn key_value -> to_tuple(key_value) end) + |> Enum.reject(fn key_value -> key_value == nil end) + end + + defp to_tuple(key_value) do + case String.split(key_value, ":") do + [key, value] -> + {String.to_atom(key), value} + + _other -> + Logger.warning("Unable to parse the read preference tags #{inspect(key_value)}") + nil + end + end end diff --git a/mix.exs b/mix.exs index d4f55aed..1148594b 100644 --- a/mix.exs +++ b/mix.exs @@ -36,7 +36,7 @@ defmodule Mongodb.Mixfile do {:decimal, "~> 2.1.1"}, {:patch, "~> 0.12.0", only: [:dev, :test]}, {:jason, "~> 1.3", only: [:dev, :test]}, - {:credo, "~> 1.6.1", only: [:dev, :test], runtime: false}, + {:credo, "~> 1.7.0", only: [:dev, :test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} ] end diff --git a/mix.lock b/mix.lock index 921223bc..e2298812 100644 --- a/mix.lock +++ b/mix.lock @@ -1,29 +1,29 @@ %{ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, - "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, + "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, - "credo": {:hex, :credo, "1.6.4", "ddd474afb6e8c240313f3a7b0d025cc3213f0d171879429bf8535d7021d9ad78", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "c28f910b61e1ff829bffa056ef7293a8db50e87f2c57a9b5c3f57eee124536b7"}, + "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, "earmark": {:hex, :earmark, "1.4.15", "2c7f924bf495ec1f65bd144b355d0949a05a254d0ec561740308a54946a67888", [:mix], [{:earmark_parser, ">= 1.4.13", [hex: :earmark_parser, repo: "hexpm", optional: false]}], "hexpm", "3b1209b85bc9f3586f370f7c363f6533788fb4e51db23aa79565875e7f9999ee"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.33", "3c3fd9673bb5dcc9edc28dd90f50c87ce506d1f71b70e3de69aa8154bc695d44", [:mix], [], "hexpm", "2d526833729b59b9fdb85785078697c72ac5e5066350663e5be6a1182da61b8f"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm"}, "erlex": {:hex, :erlex, "0.1.6", "c01c889363168d3fdd23f4211647d8a34c0f9a21ec726762312e08e083f3d47e", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.29.2", "dfa97532ba66910b2a3016a4bbd796f41a86fc71dd5227e96f4c8581fdf0fdf0", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "6b5d7139eda18a753e3250e27e4a929f8d2c880dd0d460cb9986305dea3e03af"}, + "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "excoveralls": {:hex, :excoveralls, "0.12.1", "a553c59f6850d0aff3770e4729515762ba7c8e41eedde03208182a8dc9d0ce07", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "5c1f717066a299b1b732249e736c5da96bb4120d1e55dc2e6f442d251e18a812"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "hackney": {:hex, :hackney, "1.15.2", "07e33c794f8f8964ee86cebec1a8ed88db5070e52e904b8f12209773c1036085", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.5", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "e0100f8ef7d1124222c11ad362c857d3df7cb5f4204054f9f0f4a728666591fc"}, "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"}, - "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "patch": {:hex, :patch, "0.12.0", "2da8967d382bade20344a3e89d618bfba563b12d4ac93955468e830777f816b0", [:mix], [], "hexpm", "ffd0e9a7f2ad5054f37af84067ee88b1ad337308a1cb227e181e3967127b0235"}, "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, diff --git a/test/mongo/change_stream_test.exs b/test/mongo/change_stream_test.exs index b0192077..d4ed98f4 100644 --- a/test/mongo/change_stream_test.exs +++ b/test/mongo/change_stream_test.exs @@ -174,6 +174,7 @@ defmodule Mongo.ChangeStreamTest do end @tag :mongo_3_6 + @tag :rs_required test "change stream: watch and resume_after", c do top = c.pid me = self() diff --git a/test/mongo/grid_fs/bucket_test.exs b/test/mongo/grid_fs/bucket_test.exs index 38adb94e..5d91e3a9 100644 --- a/test/mongo/grid_fs/bucket_test.exs +++ b/test/mongo/grid_fs/bucket_test.exs @@ -122,6 +122,7 @@ defmodule Mongo.GridFs.BucketTest do end @tag :mongo_4_2 + @tag :rs_required test "explicit sessions", c do top = c.pid {:ok, session} = Session.start_session(top, :write, []) diff --git a/test/mongo/grid_fs/upload_test.exs b/test/mongo/grid_fs/upload_test.exs index 8554da76..9cd73060 100644 --- a/test/mongo/grid_fs/upload_test.exs +++ b/test/mongo/grid_fs/upload_test.exs @@ -88,7 +88,24 @@ defmodule Mongo.GridFs.UploadTest do assert x == chksum end + test "upload a text file with custom id, check download, length, meta-data and checksum", c do + src_filename = "./test/data/test.txt" + bucket = Bucket.new(c.pid, j: true, w: :majority) + chksum = calc_checksum(src_filename) + file_id = Mongo.object_id() + + upload_stream = Upload.open_upload_stream(bucket, "my-example-file.txt", %{tag: "checked", chk_sum: chksum}, file_id) + + File.stream!(src_filename, [], 512) |> Stream.into(upload_stream) |> Stream.run() + + assert file_id == upload_stream.id + + %{"metadata" => %{"tag" => "checked", "chk_sum" => x}} = Mongo.find_one(c.pid, Bucket.files_collection_name(bucket), %{_id: file_id}) + assert x == chksum + end + @tag :mongo_4_2 + @tag :rs_required test "upload a text file, check download, length, meta-data and checksum transaction", c do src_filename = "./test/data/test.txt" chksum = calc_checksum(src_filename) @@ -114,6 +131,7 @@ defmodule Mongo.GridFs.UploadTest do end @tag :mongo_4_2 + @tag :rs_required test "upload a text file, check download, length, meta-data and checksum abort transaction", c do src_filename = "./test/data/test.txt" chksum = calc_checksum(src_filename) diff --git a/test/mongo/read_preferences_test.exs b/test/mongo/read_preferences_test.exs index f9904a88..97efc434 100644 --- a/test/mongo/read_preferences_test.exs +++ b/test/mongo/read_preferences_test.exs @@ -70,7 +70,7 @@ defmodule Mongo.ReadPreferencesTest do prefs = %{ mode: :secondary, max_staleness_ms: 120_000, - tag_sets: [dc: "west", usage: "production"] + tags: [dc: "west", usage: "production"] } assert %{"name" => "Oskar"} == Mongo.find_one(top, coll, %{name: "Oskar"}, read_preference: prefs) |> Map.take(["name"]) @@ -78,7 +78,7 @@ defmodule Mongo.ReadPreferencesTest do prefs = %{ mode: :nearest, max_staleness_ms: 120_000, - tag_sets: [dc: "east", usage: "production"] + tags: [dc: "east", usage: "production"] } assert %{"name" => "Oskar"} == Mongo.find_one(top, coll, %{name: "Oskar"}, read_preference: prefs) |> Map.take(["name"]) @@ -86,7 +86,7 @@ defmodule Mongo.ReadPreferencesTest do prefs = %{ mode: :secondary, max_staleness_ms: 120_000, - tag_sets: [dc: "east", usage: "production"] + tags: [dc: "east", usage: "production"] } assert catch_exit(Mongo.find_one(top, coll, %{name: "Oskar"}, read_preference: prefs, checkout_timeout: 500)) diff --git a/test/mongo/repo_test.exs b/test/mongo/repo_test.exs index e26b17d4..de9f2995 100644 --- a/test/mongo/repo_test.exs +++ b/test/mongo/repo_test.exs @@ -32,6 +32,7 @@ defmodule Mongo.RepoTest do end end + @tag :rs_required describe "transaction/3" do test "returns a single document for the given bson id" do assert :error = diff --git a/test/mongo/session_test.exs b/test/mongo/session_test.exs index 6dfbe9ef..a4bf2f24 100644 --- a/test/mongo/session_test.exs +++ b/test/mongo/session_test.exs @@ -108,6 +108,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "commit_transaction", %{top: top} do coll = "dogs" @@ -137,6 +138,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "commit_transaction on multiple database", %{top: top} do coll = "dogs" @@ -180,6 +182,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "abort_transaction", %{top: top} do coll = "dogs" @@ -209,6 +212,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "transaction", %{top: top} do coll = "dogs_with_commit_transaction" @@ -233,6 +237,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "transaction_causal_consistency", %{top: top} do coll = "dogs_with_commit_transaction_causal_consistency" @@ -258,6 +263,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "transaction_abort", %{top: top} do coll = "dogs_with_about_transaction" @@ -283,6 +289,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "transaction_abort_exception", %{top: top} do coll = "dogs_with_transaction_abort_exception" @@ -309,6 +316,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "check unordered bulk with transaction", %{top: top} do coll = unique_collection() Mongo.insert_one(top, coll, %{name: "Wuff"}) @@ -381,6 +389,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "check streaming bulk with transaction", %{top: top} do coll = unique_collection() Mongo.insert_one(top, coll, %{name: "Wuff"}) @@ -406,6 +415,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "commit empty transaction", %{top: top} do assert :ok = Mongo.transaction( @@ -418,6 +428,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "abort empty transaction", %{top: top} do assert :error = Mongo.transaction( @@ -430,6 +441,7 @@ defmodule Mongo.SessionTest do end @tag :mongo_4_2 + @tag :rs_required test "check ordered bulk with transaction", %{top: top} do coll = unique_collection() Mongo.insert_one(top, coll, %{name: "Wuff"}) diff --git a/test/mongo/topology_description_test.exs b/test/mongo/topology_description_test.exs index 4dbe6da6..7dd70aeb 100644 --- a/test/mongo/topology_description_test.exs +++ b/test/mongo/topology_description_test.exs @@ -7,7 +7,7 @@ defmodule Mongo.TopologyDescriptionTest do single_server = "localhost:27017" opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary}) ] assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts) @@ -15,7 +15,7 @@ defmodule Mongo.TopologyDescriptionTest do assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :write) opts = [ - read_preference: ReadPreference.primary(%{mode: :nearest}) + read_preference: ReadPreference.merge_defaults(%{mode: :nearest}) ] assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts) @@ -27,34 +27,34 @@ defmodule Mongo.TopologyDescriptionTest do assert {:ok, {^sharded_server, []}} = TopologyDescription.select_servers(sharded(), :write, []) opts = [ - read_preference: ReadPreference.primary(%{mode: :primary}) + read_preference: ReadPreference.merge_defaults(%{mode: :primary}) ] assert {:ok, {^sharded_server, []}} = TopologyDescription.select_servers(sharded(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary}) ] - assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondary, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts) + assert {:ok, {^sharded_server, [{:read_preference, %{mode: :secondary, maxStalenessSeconds: 0}}]}} = TopologyDescription.select_servers(sharded(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :primary_preferred}) + read_preference: ReadPreference.merge_defaults(%{mode: :primary_preferred}) ] - assert {:ok, {^sharded_server, [{:read_preference, [mode: :primaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts) + assert {:ok, {^sharded_server, [{:read_preference, %{mode: :primaryPreferred, maxStalenessSeconds: 0}}]}} = TopologyDescription.select_servers(sharded(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary_preferred}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary_preferred}) ] - assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts) + assert {:ok, {^sharded_server, [{:read_preference, %{mode: :secondaryPreferred, maxStalenessSeconds: 0}}]}} = TopologyDescription.select_servers(sharded(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :nearest}) + read_preference: ReadPreference.merge_defaults(%{mode: :nearest}) ] - assert {:ok, {^sharded_server, [{:read_preference, [mode: :nearest, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts) + assert {:ok, {^sharded_server, [{:read_preference, %{mode: :nearest, maxStalenessSeconds: 0}}]}} = TopologyDescription.select_servers(sharded(), :read, opts) end test "replica set server selection" do @@ -63,7 +63,7 @@ defmodule Mongo.TopologyDescriptionTest do seconardaries = List.delete(all_hosts, master) opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts) @@ -71,40 +71,40 @@ defmodule Mongo.TopologyDescriptionTest do assert Enum.any?(seconardaries, fn sec -> sec == server end) opts = [ - read_preference: ReadPreference.primary(%{mode: :primary}) + read_preference: ReadPreference.merge_defaults(%{mode: :primary}) ] assert {:ok, {_master, _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :primary_preferred}) + read_preference: ReadPreference.merge_defaults(%{mode: :primary_preferred}) ] assert {:ok, {_master, _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts) opts = [ - read_preference: ReadPreference.primary(%{mode: :primary_preferred}) + read_preference: ReadPreference.merge_defaults(%{mode: :primary_preferred}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts) assert Enum.any?(seconardaries, fn sec -> sec == server end) opts = [ - read_preference: ReadPreference.primary(%{mode: :nearest}) + read_preference: ReadPreference.merge_defaults(%{mode: :nearest}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts) assert Enum.any?(all_hosts, fn sec -> sec == server end) opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts) assert Enum.any?(seconardaries, fn sec -> sec == server end) opts = [ - read_preference: ReadPreference.primary(%{mode: :secondary_preferred}) + read_preference: ReadPreference.merge_defaults(%{mode: :secondary_preferred}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts) @@ -116,7 +116,7 @@ defmodule Mongo.TopologyDescriptionTest do assert Enum.any?(seconardaries, fn sec -> sec == server end) opts = [ - read_preference: ReadPreference.primary(%{mode: :nearest}) + read_preference: ReadPreference.merge_defaults(%{mode: :nearest}) ] {:ok, {server, _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts) diff --git a/test/mongo/topology_test.exs b/test/mongo/topology_test.exs index 29953fa3..43e7839a 100644 --- a/test/mongo/topology_test.exs +++ b/test/mongo/topology_test.exs @@ -14,7 +14,7 @@ defmodule Mongo.TopologyTest do for mode <- @modes do assert {:ok, %Mongo.InsertOneResult{inserted_id: new_id}} = Mongo.insert_one(mongo_pid, "test", %{topology_test: 1}, w: 3) - rp = Mongo.ReadPreference.primary(%{mode: mode}) + rp = Mongo.ReadPreference.merge_defaults(%{mode: mode}) assert [%{"_id" => ^new_id, "topology_test" => 1}] = mongo_pid diff --git a/test/mongo/url_parser_test.exs b/test/mongo/url_parser_test.exs index 4136bb5e..b6115285 100644 --- a/test/mongo/url_parser_test.exs +++ b/test/mongo/url_parser_test.exs @@ -9,6 +9,52 @@ defmodule Mongo.UrlParserTest do assert UrlParser.parse_url(url: "mongodb://localhost:27017") == [seeds: ["localhost:27017"]] end + test "basic url and trailing slash" do + assert UrlParser.parse_url(url: "mongodb://localhost:27017/") == [seeds: ["localhost:27017"]] + end + + test "basic url and trailing slash and options" do + assert UrlParser.parse_url(url: "mongodb://localhost:27017/?replicaSet=set-name&authSource=admin&maxPoolSize=5") == [ + pool_size: 5, + auth_source: "admin", + set_name: "set-name", + seeds: ["localhost:27017"] + ] + end + + test "basic url, trailing slash and options" do + assert UrlParser.parse_url(url: "mongodb://localhost:27017/") == [seeds: ["localhost:27017"]] + end + + test "Missing delimiting slash between hosts and options" do + assert UrlParser.parse_url(url: "mongodb://example.com?w=1") == [url: "mongodb://example.com?w=1"] + end + + test "Incomplete key value pair for option" do + assert UrlParser.parse_url(url: "mongodb://example.com/test?w") == [url: "mongodb://example.com/test?w"] + end + + test "User info for single IPv4 host without database" do + assert UrlParser.parse_url(url: "mongodb://alice:foo@127.0.0.1") |> Keyword.drop([:pw_safe]) == [password: "*****", username: "alice", seeds: ["127.0.0.1"]] + end + + test "User info for single IPv4 host with database" do + assert UrlParser.parse_url(url: "mongodb://alice:foo@127.0.0.1/test") |> Keyword.drop([:pw_safe]) == [ + password: "*****", + username: "alice", + database: "test", + seeds: ["127.0.0.1"] + ] + end + + test "User info for single hostname without database" do + assert UrlParser.parse_url(url: "mongodb://eve:baz@example.com") |> Keyword.drop([:pw_safe]) == [ + password: "*****", + username: "eve", + seeds: ["example.com"] + ] + end + test "cluster url with ssl" do url = "mongodb://user:password@seed1.domain.com:27017,seed2.domain.com:27017,seed3.domain.com:27017/db_name?ssl=true&replicaSet=set-name&authSource=admin&maxPoolSize=5" @@ -105,6 +151,43 @@ defmodule Mongo.UrlParserTest do end end + test "write read preferences" do + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + read_preference: %{mode: :secondary, tags: [dc: "ny", rack: "r"], max_staleness_ms: 30_000}, + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc::ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + read_preference: %{mode: :secondary, tags: [rack: "r"], max_staleness_ms: 30_000}, + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&maxStalenessSeconds=30") == [ + database: "db_name", + read_preference: %{mode: :secondary, max_staleness_ms: 30_000}, + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=weird&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + end + test "encoded user" do real_username = "@:/skøl:@/" real_password = "@æœ{}%e()}@" diff --git a/test/mongo_test.exs b/test/mongo_test.exs index 586093d8..fb91014a 100644 --- a/test/mongo_test.exs +++ b/test/mongo_test.exs @@ -614,6 +614,7 @@ defmodule Mongo.Test do end) end + @tag :rs_required test "nested transaction", %{pid: top} do coll = unique_collection() Mongo.drop_collection(top, coll, w: 3)