Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'zookzook:master' into master
Browse files Browse the repository at this point in the history
JD-Robertson authored Nov 3, 2023
2 parents d3a9062 + 2823e26 commit a1afbe0
Showing 23 changed files with 413 additions and 169 deletions.
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@
{Credo.Check.Refactor.MapJoin, []},
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
{Credo.Check.Refactor.Nesting, []},
{Credo.Check.Refactor.Nesting, [max_nesting: 4]},
{Credo.Check.Refactor.UnlessWithElse, []},
{Credo.Check.Refactor.WithClauses, []},
{Credo.Check.Refactor.FilterFilter, []},
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -719,7 +719,7 @@ You need roughly three additional configuration steps:
* Authenticate with an x.509 Certificate

To get the x.509 authentication working you need to prepare the ssl configuration accordingly:
* you need set the ssl option: `verify_peer`
* you need to set the ssl option: `verify_peer`
* you need to specify the `cacertfile` because Erlang BEAM don't provide any CA certificate store by default
* you maybe need to customize the hostname check to allow wildcard certificates
* you need to specify the `username` from the subject entry of the user certificate
@@ -802,15 +802,15 @@ a simple map, supporting the following keys:

* `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest`
* `:max_staleness_ms`, the maxStaleness value in milliseconds
* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]`
* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]`

The driver selects the server using the read preference.

```elixr
prefs = %{
mode: :secondary,
max_staleness_ms: 120_000,
tag_sets: [dc: "west", usage: "production"]
tags: [dc: "west", usage: "production"]
}
Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs)
@@ -907,7 +907,7 @@ result = Mongo.BulkWrite.write(:mongo, bulk, w: 1)
In the following example we import 1.000.000 integers into the MongoDB using the stream api:

We need to create an insert operation for each number. Then we call the `Mongo.UnorderedBulk.stream`
function to import it. This function returns a stream function which accumulate
function to import it. This function returns a stream function that accumulates
all inserts operations until the limit `1000` is reached. In this case the operation group is send to
MongoDB. So using the stream api you can reduce the memory using while
importing big volume of data.
@@ -1026,7 +1026,7 @@ That means, you can just generate a `raise :should_not_happen` exception as well

## Command Monitoring

You can watch all events that are triggered while the driver send requests and processes responses. You can use the
You can watch all events that are triggered while the driver sends requests and processes responses. You can use the
`Mongo.EventHandler` as a starting point. It logs the events from the topic `:commands` (by ignoring the `:isMaster` command)
to `Logger.info`:

@@ -1041,7 +1041,7 @@ iex> {:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017/test")

## Testing

Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all test except the socket and ssl test. If you want to
Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all tests, except the socket and ssl test. If you want to
run the test cases against other MongoDB deployments or older versions, you can use the [mtools](https://github.com/rueckstiess/mtools) for deployment and run the test cases locally:

```bash
2 changes: 1 addition & 1 deletion lib/mongo.ex
Original file line number Diff line number Diff line change
@@ -1502,7 +1502,7 @@ defmodule Mongo do
@spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) ::
{:ok, BSON.document() | nil} | {:error, Mongo.Error.t()}
def exec_command_session(session, cmd, opts) do
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
:ok <- Session.update_session(session, response, opts),
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do
8 changes: 4 additions & 4 deletions lib/mongo/grid_fs/upload.ex
Original file line number Diff line number Diff line change
@@ -7,12 +7,12 @@ defmodule Mongo.GridFs.Upload do

@doc """
Opens a stream that the application can write the contents of the file to.
The driver generates the file id.
The driver generates the file id if not provided.
User data for the 'metadata' field of the files collection document.
"""
@spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t()
def open_upload_stream(bucket, filename, meta \\ nil) do
UploadStream.new(bucket, filename, meta)
@spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil, UploadStream.file_id() | nil) :: UploadStream.t()
def open_upload_stream(bucket, filename, meta \\ nil, file_id \\ nil) do
UploadStream.new(bucket, filename, meta, file_id)
end
end
9 changes: 5 additions & 4 deletions lib/mongo/grid_fs/upload_stream.ex
Original file line number Diff line number Diff line change
@@ -20,9 +20,10 @@ defmodule Mongo.GridFs.UploadStream do
alias Mongo.GridFs.Bucket
alias Mongo.GridFs.UploadStream

@type file_id :: BSON.ObjectId.t() | binary()
@type t :: %__MODULE__{
bucket: Bucket.t(),
id: BSON.ObjectId.t(),
id: file_id(),
filename: String.t(),
metadata: {BSON.document() | nil}
}
@@ -31,9 +32,9 @@ defmodule Mongo.GridFs.UploadStream do
@doc """
Creates a new upload stream to insert a file into the grid-fs.
"""
@spec new(Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t()
def new(bucket, filename, metadata \\ nil) do
%UploadStream{bucket: bucket, filename: filename, id: Mongo.object_id(), metadata: metadata}
@spec new(Bucket.t(), String.t(), BSON.document() | nil, file_id() | nil) :: UploadStream.t()
def new(bucket, filename, metadata \\ nil, file_id \\ nil) do
%UploadStream{bucket: bucket, filename: filename, id: file_id || Mongo.object_id(), metadata: metadata}
end

defimpl Collectable, for: UploadStream do
14 changes: 2 additions & 12 deletions lib/mongo/monitor.ex
Original file line number Diff line number Diff line change
@@ -186,18 +186,8 @@ defmodule Mongo.Monitor do
##
# Get a new server description from the server and send it to the Topology process.
##
defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
case get_server_description(state) do
%{round_trip_time: round_trip_time} ->
## debug info("Updating round_trip_time: #{inspect round_trip_time}")
Topology.update_rrt(topology_pid, address, round_trip_time)

%{state | round_trip_time: round_trip_time}

error ->
warning("Unable to round trip time because of #{inspect(error)}")
state
end
defp update_server_description(%{mode: :streaming_mode} = state) do
state
end

##
147 changes: 77 additions & 70 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
@@ -4,134 +4,141 @@ defmodule Mongo.ReadPreference do
@moduledoc ~S"""
Determines which servers are considered suitable for read operations
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If hedge is set, it configures how server hedged reads are used.
The default mode is `:primary`.
The default tag_sets is a list with an empty tag set: [{}].
The default tags is a list with an empty tag set: [{}].
The default max_staleness_ms is unset.
The default hedge is unset.
## mode
* `:primary` Only an available primary is suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
but only eligible secondaries are suitable.
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
Otherwise, when there are no eligible secondaries, the primary is suitable.
* `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.
"""
@type t :: %{
mode:
:primary
| :secondary
| :primary_preferred
| :secondary_preferred
| :nearest,
tag_sets: [%{String.t() => String.t()}],
max_staleness_ms: non_neg_integer,
hedge: BSON.document()
}

@primary %{
mode: :primary,
tag_sets: [],
tags: [],
max_staleness_ms: 0
}

def primary(map \\ nil)
@doc """
Merge default values to the read preferences and converts deprecated tag_sets to tags
"""
def merge_defaults(%{tag_sets: tags} = map) do
map =
map
|> Map.delete(:tag_sets)
|> Map.put(:tags, tags)

Map.merge(@primary, map)
end

def primary(map) when is_map(map) do
def merge_defaults(map) when is_map(map) do
Map.merge(@primary, map)
end

def primary(_), do: @primary
def merge_defaults(_other) do
@primary
end

@doc """
Add read preference to the cmd
"""
def add_read_preference(cmd, opts) do
case Keyword.get(opts, :read_preference) do
nil -> cmd
pref -> cmd ++ ["$readPreference": pref]
nil ->
cmd

pref ->
cmd ++ ["$readPreference": pref]
end
end

@doc """
From the specs:
Use of slaveOk
There are two usages of slaveOK:
* A driver query parameter that predated read preference modes and tag set lists.
* A wire protocol flag on OP_QUERY operations
Converts the preference to the mongodb format for replica sets
"""
def slave_ok(%{:mode => :primary}) do
%{:mode => :primary}
def to_replica_set(%{:mode => :primary}) do
%{mode: :primary}
end

def slave_ok(config) do
def to_replica_set(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
end
:primary_preferred ->
:primaryPreferred

filter_nils(mode: mode, tag_sets: config[:tag_sets])
end
:secondary_preferred ->
:secondaryPreferred

##
# Therefore, when sending queries to a mongos, the following rules apply:
#
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
def mongos(%{mode: :primary}) do
nil
end
other ->
other
end

# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :secondary} = config) do
transform(config)
end
case config[:tags] do
[] ->
%{mode: mode}

# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :primary_preferred} = config) do
transform(config)
end
nil ->
%{mode: mode}

# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
def mongos(%{mode: :secondary_preferred} = config) do
transform(config)
tags ->
%{mode: mode, tags: [tags]}
end
end

# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :nearest} = config) do
transform(config)
@doc """
Converts the preference to the mongodb format for mongos
"""
def to_mongos(%{mode: :primary}) do
nil
end

defp transform(config) do
# for the others we should use the read preferences
def to_mongos(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
:primary_preferred ->
:primaryPreferred

:secondary_preferred ->
:secondaryPreferred

other ->
other
end

max_staleness_seconds =
case config[:max_staleness_ms] do
i when is_integer(i) -> div(i, 1000)
nil -> nil
i when is_integer(i) ->
div(i, 1000)

nil ->
nil
end

read_preference =
case config[:tags] do
[] ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

nil ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

tags ->
%{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
end

[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
|> filter_nils()
filter_nils(read_preference)
end
end
Loading

0 comments on commit a1afbe0

Please sign in to comment.