Skip to content

Commit

Permalink
fix: added support for read preference specified by the URL
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Sep 12, 2023
1 parent fd1607f commit 9478c1d
Showing 3 changed files with 101 additions and 0 deletions.
14 changes: 14 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
@@ -373,6 +373,8 @@ defmodule Mongo.Topology do
# checkout a new session
#
def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts})
@@ -398,6 +400,8 @@ defmodule Mongo.Topology do
end

def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts})
@@ -579,4 +583,14 @@ defmodule Mongo.Topology do
defp fetch_arbiters(state) do
Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end)
end

defp merge_read_preferences(opts, url_opts) do
case Keyword.get(url_opts, :read_preference) do
nil ->
opts

read_preference ->
Keyword.put_new(opts, :read_preference, read_preference)
end
end
end
59 changes: 59 additions & 0 deletions lib/mongo/url_parser.ex
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ defmodule Mongo.UrlParser do
"""

require Logger

@mongo_url_regex ~r/^mongodb(?<srv>\+srv)?:\/\/((?<username>[^:]+):(?<password>[^@]+)@)?(?<seeds>[^\/]+)(\/(?<database>[^?]+))?(\?(?<options>.*))?$/

# https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options
@@ -181,6 +183,7 @@ defmodule Mongo.UrlParser do
frags <- resolve_srv_url(frags),
opts <- parse_seeds(opts, frags),
opts <- parse_query_options(opts, frags),
opts <- process_read_preferences(opts),
# Parse fixed parameters (database, username & password) & merge them with query options
opts <- Enum.reduce(frags, opts, &add_option/2) do
opts
@@ -191,4 +194,60 @@ defmodule Mongo.UrlParser do
end

def parse_url(opts), do: opts

defp process_read_preferences(opts) do
opts =
case Keyword.get(opts, :read_preference) do
nil ->
opts

mode ->
read_preference =
%{mode: mode}
|> extend_read_preference_tags(opts)
|> extend_max_staleness_ms(opts)

Keyword.put(opts, :read_preference, read_preference)
end

Keyword.drop(opts, [:read_preference_tags, :max_staleness_seconds])
end

defp extend_read_preference_tags(read_preference, opts) do
case Keyword.get(opts, :read_preference_tags, []) |> parse_tags() do
[] ->
read_preference

tags ->
Map.put(read_preference, :tags, Keyword.new(tags))
end
end

defp extend_max_staleness_ms(read_preference, opts) do
case Keyword.get(opts, :max_staleness_seconds) do
nil ->
read_preference

max_staleness_seconds ->
Map.put(read_preference, :max_staleness_ms, max_staleness_seconds * 1_000)
end
end

defp parse_tags(tags) do
tags
|> String.split(",")
|> Enum.map(fn key_value -> to_tuple(key_value) end)
|> Enum.reject(fn key_value -> key_value == nil end)
end

defp to_tuple(key_value) do
case String.split(key_value, ":") do
[key, value] ->
{String.to_atom(key), value}

_other ->
Logger.warning("Unable to parse the read preference tags #{inspect(key_value)}")
nil
end
end
end
28 changes: 28 additions & 0 deletions test/mongo/url_parser_test.exs
Original file line number Diff line number Diff line change
@@ -105,6 +105,34 @@ defmodule Mongo.UrlParserTest do
end
end

test "write read preferences" do
assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [
database: "db_name",
read_preference: %{mode: :secondary, tags: [dc: "ny", rack: "r"], max_staleness_ms: 30_000},
seeds: [
"seed1.domain.com:27017",
"seed2.domain.com:27017"
]
]

assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=secondary&readPreferenceTags=dc::ny,rack:r&maxStalenessSeconds=30") == [
database: "db_name",
read_preference: %{mode: :secondary, tags: [rack: "r"], max_staleness_ms: 30_000},
seeds: [
"seed1.domain.com:27017",
"seed2.domain.com:27017"
]
]

assert UrlParser.parse_url(url: "mongodb://seed1.domain.com:27017,seed2.domain.com:27017/db_name?readPreference=weird&readPreferenceTags=dc:ny,rack:r&maxStalenessSeconds=30") == [
database: "db_name",
seeds: [
"seed1.domain.com:27017",
"seed2.domain.com:27017"
]
]
end

test "encoded user" do
real_username = "@:/skøl:@/"
real_password = "@æœ{}%e()}@"

0 comments on commit 9478c1d

Please sign in to comment.