diff --git a/lib/mongo/read_preference.ex b/lib/mongo/read_preference.ex index 55bea01b..0aa20516 100644 --- a/lib/mongo/read_preference.ex +++ b/lib/mongo/read_preference.ex @@ -4,20 +4,20 @@ defmodule Mongo.ReadPreference do @moduledoc ~S""" Determines which servers are considered suitable for read operations - A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`. + A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`. The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers. - If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection. + If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection. If hedge is set, it configures how server hedged reads are used. The default mode is `:primary`. - The default tag_sets is a list with an empty tag set: [{}]. + The default tags is a list with an empty tag set: [{}]. The default max_staleness_ms is unset. The default hedge is unset. ## mode * `:primary` Only an available primary is suitable. - * `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable. + * `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable. * `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates, but only eligible secondaries are suitable. * `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable. @@ -32,14 +32,14 @@ defmodule Mongo.ReadPreference do | :primary_preferred | :secondary_preferred | :nearest, - tag_sets: [%{String.t() => String.t()}], + tags: [%{String.t() => String.t()}], max_staleness_ms: non_neg_integer, hedge: BSON.document() } @primary %{ mode: :primary, - tag_sets: [], + tags: [], max_staleness_ms: 0 } @@ -56,35 +56,35 @@ defmodule Mongo.ReadPreference do """ def add_read_preference(cmd, opts) do case Keyword.get(opts, :read_preference) do - nil -> cmd - pref -> cmd ++ ["$readPreference": pref] + nil -> + cmd + + pref -> + cmd ++ ["$readPreference": pref] end end @doc """ - From the specs: - - Use of slaveOk - - There are two usages of slaveOK: - - * A driver query parameter that predated read preference modes and tag set lists. - * A wire protocol flag on OP_QUERY operations - + Converts the preference to the mongodb format """ - def slave_ok(%{:mode => :primary}) do + def convert(%{:mode => :primary}) do %{:mode => :primary} end - def slave_ok(config) do + def convert(config) do mode = case config[:mode] do - :primary_preferred -> :primaryPreferred - :secondary_preferred -> :secondaryPreferred - other -> other + :primary_preferred -> + :primaryPreferred + + :secondary_preferred -> + :secondaryPreferred + + other -> + other end - filter_nils(mode: mode, tag_sets: config[:tag_sets]) + filter_nils(mode: mode, tags: [config[:tags]]) end ## @@ -106,7 +106,7 @@ defmodule Mongo.ReadPreference do end # For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a - # non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty, + # non-empty tags parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty, # drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference def mongos(%{mode: :secondary_preferred} = config) do transform(config) @@ -127,11 +127,13 @@ defmodule Mongo.ReadPreference do max_staleness_seconds = case config[:max_staleness_ms] do - i when is_integer(i) -> div(i, 1000) - nil -> nil + i when is_integer(i) -> + div(i, 1000) + + nil -> + nil end - [mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]] - |> filter_nils() + filter_nils(mode: mode, tags: [config[:tags]], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]) end end diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 68c7512f..cc5b3197 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -373,6 +373,8 @@ defmodule Mongo.Topology do # checkout a new session # def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do + opts = merge_read_preferences(opts, state.opts) + case TopologyDescription.select_servers(topology, read_write_type, opts) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts}) @@ -398,6 +400,8 @@ defmodule Mongo.Topology do end def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do + opts = merge_read_preferences(opts, state.opts) + case TopologyDescription.select_servers(topology, read_write_type, opts) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts}) @@ -579,4 +583,14 @@ defmodule Mongo.Topology do defp fetch_arbiters(state) do Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end) end + + defp merge_read_preferences(opts, url_opts) do + case Keyword.get(url_opts, :read_preference) do + nil -> + opts + + read_preference -> + Keyword.put_new(opts, :read_preference, read_preference) + end + end end diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index ff7754c5..17c52c1c 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -88,10 +88,17 @@ defmodule Mongo.TopologyDescription do def select_servers(topology, :write, opts) do servers = case topology.type do - :single -> topology.servers - :sharded -> mongos_servers(topology) - :replica_set_with_primary -> primary_servers(topology) - _ -> [] + :single -> + topology.servers + + :sharded -> + mongos_servers(topology) + + :replica_set_with_primary -> + primary_servers(topology) + + _other -> + [] end addr = @@ -115,27 +122,40 @@ defmodule Mongo.TopologyDescription do {servers, read_prefs} = case topology.type do - :unknown -> {[], nil} - :single -> {topology.servers, nil} - :sharded -> {mongos_servers(topology), ReadPreference.mongos(read_preference)} - _ -> {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.slave_ok(read_preference)} + :unknown -> + {[], nil} + + :single -> + {topology.servers, nil} + + :sharded -> + {mongos_servers(topology), ReadPreference.mongos(read_preference)} + + _other -> + {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.convert(read_preference)} end opts = case read_prefs do - nil -> Keyword.delete(opts, :read_preference) - prefs -> Keyword.put(opts, :read_preference, prefs) + nil -> + Keyword.delete(opts, :read_preference) + + prefs -> + Keyword.put(opts, :read_preference, prefs) end addr = servers - |> Enum.map(fn {server, _} -> server end) |> Enum.take_random(1) + |> Enum.map(fn {server, _} -> server end) # check now three possible cases case addr do - [] -> :empty - [result] -> {:ok, {result, opts}} + [] -> + :empty + + [result] -> + {:ok, {result, opts}} end end @@ -153,7 +173,7 @@ defmodule Mongo.TopologyDescription do ## # - # Select the primary without without tag_sets or maxStalenessSeconds + # Select the primary without without tags or maxStalenessSeconds # defp select_replica_set_server(topology, :primary, _read_preference) do primary_servers(topology) @@ -161,13 +181,13 @@ defmodule Mongo.TopologyDescription do ## # - # Select the secondary with without tag_sets or maxStalenessSeconds + # Select the secondary with without tags or maxStalenessSeconds # defp select_replica_set_server(topology, :secondary, read_preference) do topology |> secondary_servers() |> filter_out_stale(topology, read_preference.max_staleness_ms) - |> select_tag_sets(read_preference.tag_sets) + |> select_tag_sets(read_preference.tags) |> filter_latency_window(topology.local_threshold_ms) end @@ -175,8 +195,8 @@ defmodule Mongo.TopologyDescription do # From the specs # # 'primaryPreferred' is equivalent to selecting a server with read preference mode 'primary' - # (without tag_sets or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode - # 'secondary' (with tag_sets and maxStalenessSeconds, if provided). + # (without tags or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode + # 'secondary' (with tags and maxStalenessSeconds, if provided). defp select_replica_set_server(topology, :primary_preferred, read_preference) do case primary_servers(topology) do [] -> select_replica_set_server(topology, :secondary, read_preference) @@ -186,8 +206,8 @@ defmodule Mongo.TopologyDescription do ## # From the specs - # 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tag_sets and maxStalenessSeconds) and - # falling back to selecting with mode 'primary' (without tag_sets or maxStalenessSeconds). + # 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tags and maxStalenessSeconds) and + # falling back to selecting with mode 'primary' (without tags or maxStalenessSeconds). # defp select_replica_set_server(topology, :secondary_preferred, read_preference) do case select_replica_set_server(topology, :secondary, read_preference) do @@ -202,11 +222,11 @@ defmodule Mongo.TopologyDescription do # The term 'nearest' is unfortunate, as it implies a choice based on geographic locality or absolute lowest latency, neither of which are true. # # Instead, and unlike the other read preference modes, 'nearest' does not favor either primaries or secondaries; - # instead all servers are candidates and are filtered by tag_sets and maxStalenessSeconds. + # instead all servers are candidates and are filtered by tags and maxStalenessSeconds. defp select_replica_set_server(%{:servers => servers} = topology, :nearest, read_preference) do servers |> filter_out_stale(topology, read_preference.max_staleness_ms) - |> select_tag_sets(read_preference.tag_sets) + |> select_tag_sets(read_preference.tags) |> filter_latency_window(topology.local_threshold_ms) end diff --git a/lib/mongo/url_parser.ex b/lib/mongo/url_parser.ex index 0c3202f7..8cd89b9f 100644 --- a/lib/mongo/url_parser.ex +++ b/lib/mongo/url_parser.ex @@ -6,6 +6,8 @@ defmodule Mongo.UrlParser do """ + require Logger + @mongo_url_regex ~r/^mongodb(?\+srv)?:\/\/((?[^:]+):(?[^@]+)@)?(?[^\/]+)(\/(?[^?]+))?(\?(?.*))?$/ # https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options @@ -181,6 +183,7 @@ defmodule Mongo.UrlParser do frags <- resolve_srv_url(frags), opts <- parse_seeds(opts, frags), opts <- parse_query_options(opts, frags), + opts <- process_read_preferences(opts), # Parse fixed parameters (database, username & password) & merge them with query options opts <- Enum.reduce(frags, opts, &add_option/2) do opts @@ -191,4 +194,60 @@ defmodule Mongo.UrlParser do end def parse_url(opts), do: opts + + defp process_read_preferences(opts) do + opts = + case Keyword.get(opts, :read_preference) do + nil -> + opts + + mode -> + read_preference = + %{mode: mode} + |> extend_read_preference_tags(opts) + |> extend_max_staleness_ms(opts) + + Keyword.put(opts, :read_preference, read_preference) + end + + Keyword.drop(opts, [:read_preference_tags, :max_staleness_seconds]) + end + + defp extend_read_preference_tags(read_preference, opts) do + case Keyword.get(opts, :read_preference_tags, []) |> parse_tags() do + [] -> + read_preference + + tags -> + Map.put(read_preference, :tags, Keyword.new(tags)) + end + end + + defp extend_max_staleness_ms(read_preference, opts) do + case Keyword.get(opts, :max_staleness_seconds) do + nil -> + read_preference + + max_staleness_seconds -> + Map.put(read_preference, :max_staleness_ms, max_staleness_seconds * 1_000) + end + end + + defp parse_tags(tags) do + tags + |> String.split(",") + |> Enum.map(fn key_value -> to_tuple(key_value) end) + |> Enum.reject(fn key_value -> key_value == nil end) + end + + defp to_tuple(key_value) do + case String.split(key_value, ":") do + [key, value] -> + {String.to_atom(key), value} + + _other -> + Logger.warning("Unable to parse the read preference tags #{inspect(key_value)}") + nil + end + end end diff --git a/test/mongo/url_parser_test.exs b/test/mongo/url_parser_test.exs index 71d7fb0a..c65b52b5 100644 --- a/test/mongo/url_parser_test.exs +++ b/test/mongo/url_parser_test.exs @@ -105,6 +105,34 @@ defmodule Mongo.UrlParserTest do end end + test "write read preferences" do + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + read_preference: %{mode: :secondary, tags: [dc: "ny", rack: "r"], max_staleness_ms: 30_000}, + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc::ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + read_preference: %{mode: :secondary, tags: [rack: "r"], max_staleness_ms: 30_000}, + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + + assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=weird&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [ + database: "db_name", + seeds: [ + "seed1.domain.com:27017", + "seed2.domain.com:27017" + ] + ] + end + test "encoded user" do real_username = "@:/skøl:@/" real_password = "@æœ{}%e()}@"