Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ecto 3 #2

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1d74dc8
Mostly working
aselder Sep 24, 2019
6519762
upgrade to ecto 3.6
scottmessinger Apr 28, 2021
a3f953b
Add number of passed tests per suite as comments
scottmessinger Apr 28, 2021
6cf53c4
Make normalized query tests pass
scottmessinger Apr 28, 2021
ff17661
Fix stream
scottmessinger Apr 28, 2021
48e9bcc
Update number of tests pass
scottmessinger Apr 29, 2021
241b94e
Fix converting read queries to appropriate format
scottmessinger Apr 29, 2021
d79ca3c
Handle dates with usecs
scottmessinger Apr 29, 2021
02e18c0
fix conversion of dates
scottmessinger Apr 29, 2021
61c8a4c
fix updating using a changeset.
scottmessinger Apr 29, 2021
d6002b2
remove IO.inspects
scottmessinger Apr 29, 2021
1bba967
Fix dumping of date
scottmessinger Apr 30, 2021
dec2ca9
Add support for time
scottmessinger Apr 30, 2021
bf607d5
Convert all keys on query to strings
scottmessinger Apr 30, 2021
3e13e07
Convert all keys to atoms
scottmessinger Apr 30, 2021
c179e9d
Copy over ecto tests so they can be modified/annotated
scottmessinger Apr 30, 2021
9f2c739
Update which repo_tests pass
scottmessinger Apr 30, 2021
b617d28
Fix reload test by fixing id in test
scottmessinger Apr 30, 2021
d20d8cb
fix reload and aggregate tests
scottmessinger Apr 30, 2021
56ec260
Fix delete_many
scottmessinger Apr 30, 2021
2215afb
Fix selecting :_id on schema schema
scottmessinger Apr 30, 2021
f085ae5
Update previous commit
scottmessinger Apr 30, 2021
79616db
Fix truncating the collection
scottmessinger Apr 30, 2021
d7fab81
Tagged remaining test and excluded many tags
scottmessinger Apr 30, 2021
a3678eb
Remove interval test
scottmessinger May 5, 2021
14cb29a
Add association test (currently fails)
scottmessinger May 5, 2021
9b01f7c
Note which assoc tests pass or fail
scottmessinger May 5, 2021
88afab1
Update mongodb dep and other related deps
scottmessinger May 5, 2021
bd11ad9
Don't convert string keys to atoms
scottmessinger May 7, 2021
29c0979
Add back the __before_compile__
scottmessinger May 7, 2021
d0b6c84
Use struct's default values when field isn't in document
scottmessinger May 7, 2021
04781e4
Fix elixir warnings
scottmessinger May 7, 2021
6790bcb
Stub out checkout callback
scottmessinger May 7, 2021
0e766b9
Implement storage status callback
scottmessinger May 7, 2021
557dc90
Add support for logging
scottmessinger May 12, 2021
92491dc
Tag association tests that fail.
scottmessinger May 12, 2021
c9e2837
Remove warnings
scottmessinger May 12, 2021
cf89876
fix: Point mongodb driver to elixir-ecto org
scottmessinger Aug 16, 2021
7cf5425
typo
scottmessinger Aug 16, 2021
a442188
Also change mix.lock
scottmessinger Aug 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
406 changes: 190 additions & 216 deletions lib/mongo_ecto.ex

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions lib/mongo_ecto/change.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ defmodule Mongo.Ecto.ChangeMap do
Change is not a value - it can't be loaded
"""
def load(_), do: :error

def embed_as(_), do: :dump

def equal?(a, b), do: a == b
end

defmodule Mongo.Ecto.ChangeArray do
Expand Down Expand Up @@ -72,4 +76,8 @@ defmodule Mongo.Ecto.ChangeArray do
Change is not a value - it can't be loaded
"""
def load(_), do: :error

def embed_as(_), do: :dump

def equal?(a, b), do: a == b
end
173 changes: 154 additions & 19 deletions lib/mongo_ecto/connection.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
defmodule Mongo.Ecto.Connection do
@moduledoc false

require Logger

alias Mongo.Ecto.NormalizedQuery.ReadQuery
alias Mongo.Ecto.NormalizedQuery.WriteQuery
alias Mongo.Ecto.NormalizedQuery.CommandQuery
alias Mongo.Ecto.NormalizedQuery.CountQuery
alias Mongo.Ecto.NormalizedQuery.AggregateQuery
alias Mongo.Query

def child_spec(opts) do
# Rename the `:mongo_url` key so that the driver can parse it
opts =
Enum.map(opts, fn
{:mongo_url, value} -> {:url, value}
{key, value} -> {key, value}
end)

# opts = [name: pool_name] ++ Keyword.delete(opts, :pool) ++ pool_opts
Mongo.child_spec(opts)
end

## Worker

def init(_config) do
end

def storage_down(opts) do
opts = Keyword.put(opts, :pool, DBConnection.Connection)
# opts = Keyword.put(opts, :pool, DBConnection.Connection)

{:ok, _apps} = Application.ensure_all_started(:mongodb)
{:ok, conn} = Mongo.start_link(opts)
Expand All @@ -24,6 +41,16 @@ defmodule Mongo.Ecto.Connection do
end
end

def storage_status(opts) do
{:ok, _apps} = Application.ensure_all_started(:mongodb)
{:ok, conn} = Mongo.start_link(opts)

case Mongo.command(conn, %{ping: true}) do
{:ok, %{"ok" => 1.0}} -> :up
_ -> :down
end
end

## Callbacks for adapter

def read(repo, query, opts \\ [])
Expand Down Expand Up @@ -59,6 +86,7 @@ defmodule Mongo.Ecto.Connection do
query = query.query

%{deleted_count: n} = query(repo, :delete_many!, [coll, query], opts)

n
end

Expand Down Expand Up @@ -86,7 +114,7 @@ defmodule Mongo.Ecto.Connection do
query = query.query

case query(repo, :update_many, [coll, query, command], opts) do
{:ok, %Mongo.UpdateResult{modified_count: m}} ->
{:ok, %Mongo.UpdateResult{modified_count: m} = _result} ->
m

{:error, error} ->
Expand Down Expand Up @@ -144,45 +172,141 @@ defmodule Mongo.Ecto.Connection do
query(repo, :command!, [command], opts)
end

defp query(repo, operation, args, opts) do
{conn, default_opts} = repo.__pool__
args = [conn] ++ args ++ [with_log(repo, opts ++ default_opts)]
def query(adapter_meta, operation, args, opts) do
%{pid: pool, telemetry: telemetry, opts: default_opts} = adapter_meta

args = [pool] ++ args ++ [with_log(telemetry, args, opts ++ default_opts)]
apply(Mongo, operation, args)
end

defp with_log(repo, opts) do
case Keyword.pop(opts, :log, true) do
{true, opts} -> [log: &log(repo, &1, opts)] ++ opts
{false, opts} -> opts
end
defp with_log(telemetry, params, opts) do
[log: &log(telemetry, params, &1, opts)] ++ opts
end

defp log(repo, entry, opts) do
defp log({repo, log, event_name}, _params, entry, opts) do
%{
connection_time: query_time,
decode_time: decode_time,
pool_time: queue_time,
idle_time: idle_time,
result: result,
query: query,
params: params
} = entry

source = Keyword.get(opts, :source)

repo.__log__(%Ecto.LogEntry{
query_time: query_time,
decode_time: decode_time,
queue_time: queue_time,
params =
Enum.map(params, fn
%Ecto.Query.Tagged{value: value} -> value
value -> value
end)

acc = if idle_time, do: [idle_time: idle_time], else: []

measurements =
log_measurements(
[query_time: query_time, decode_time: decode_time, queue_time: queue_time],
0,
acc
)

metadata = %{
type: :ecto_sql_query,
repo: repo,
result: log_result(result),
params: [],
params: params,
query: format_query(query, params),
source: source
})
source: source,
options: Keyword.get(opts, :telemetry_options, [])
}

if event_name = Keyword.get(opts, :telemetry_event, event_name) do
:telemetry.execute(event_name, measurements, metadata)
end

case Keyword.get(opts, :log, log) do
true ->
Logger.log(
log,
fn -> log_iodata(measurements, metadata) end,
ansi_color: log_color(query)
)

false ->
:ok

level ->
Logger.log(
level,
fn -> log_iodata(measurements, metadata) end,
ansi_color: log_color(query)
)
end

:ok
end

defp log_measurements([{_, nil} | rest], total, acc),
do: log_measurements(rest, total, acc)

defp log_measurements([{key, value} | rest], total, acc),
do: log_measurements(rest, total + value, [{key, value} | acc])

defp log_measurements([], total, acc),
do: Map.new([total_time: total] ++ acc)

# Currently unused
defp log_result({:ok, _query, res}), do: {:ok, res}
defp log_result(other), do: other

defp log_iodata(measurements, metadata) do
%{
params: params,
query: query,
result: result,
source: source
} = metadata

[
"QUERY",
?\s,
log_ok_error(result),
log_ok_source(source),
log_time("db", measurements, :query_time, true),
log_time("decode", measurements, :decode_time, false),
log_time("queue", measurements, :queue_time, false),
log_time("idle", measurements, :idle_time, true),
?\n,
query,
?\s,
inspect(params, charlists: false)
]
end

defp log_ok_error({:ok, _res}), do: "OK"
defp log_ok_error({:error, _err}), do: "ERROR"

defp log_ok_source(nil), do: ""
defp log_ok_source(source), do: " source=#{inspect(source)}"

defp log_time(label, measurements, key, force) do
case measurements do
%{^key => time} ->
us = System.convert_time_unit(time, :native, :microsecond)
ms = div(us, 100) / 10

if force or ms > 0 do
[?\s, label, ?=, :io_lib_format.fwrite_g(ms), ?m, ?s]
else
[]
end

%{} ->
[]
end
end

defp check_constraint_errors(%Mongo.Error{code: 11000, message: msg}) do
{:invalid, [unique: extract_index(msg)]}
end
Expand All @@ -196,7 +320,7 @@ defmodule Mongo.Ecto.Connection do

case Enum.reverse(parts) do
[_, index | _] ->
String.strip(index)
String.trim(index)

_ ->
raise "failed to extract index from error message: #{inspect(msg)}"
Expand Down Expand Up @@ -299,4 +423,15 @@ defmodule Mongo.Ecto.Connection do
defp format_part(name, value) do
[" ", name, "=" | inspect(value)]
end

defp log_color(%Query{action: :command}), do: :white
defp log_color(%Query{action: :find}), do: :cyan
defp log_color(%Query{action: :insert_one}), do: :green
defp log_color(%Query{action: :insert_many}), do: :green
defp log_color(%Query{action: :update_one}), do: :yellow
defp log_color(%Query{action: :update_many}), do: :yellow
defp log_color(%Query{action: :delete_many}), do: :red
defp log_color(%Query{action: :replace_one}), do: :yellow
defp log_color(%Query{action: :get_more}), do: :cyan
defp log_color(%Query{action: _}), do: nil
end
7 changes: 5 additions & 2 deletions lib/mongo_ecto/conversions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Mongo.Ecto.Conversions do
do: map(list, &inject_params(&1, params, pk))

def inject_params(
%Ecto.Query.Tagged{tag: tag, type: type, value: {:^, _, [idx]} = value},
%Ecto.Query.Tagged{tag: _tag, type: _type, value: {:^, _, [idx]} = _value},
params,
pk
) do
Expand Down Expand Up @@ -93,7 +93,10 @@ defmodule Mongo.Ecto.Conversions do
end

defp key(pk, pk), do: :_id
defp key(key, _), do: key

defp key(key, _) do
key
end

defp map(map, _fun) when is_map(map) and map_size(map) == 0 do
{:ok, %{}}
Expand Down
27 changes: 16 additions & 11 deletions lib/mongo_ecto/normalized_query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ defmodule Mongo.Ecto.NormalizedQuery do
end

alias Mongo.Ecto.Conversions
alias Ecto.Query.Tagged
alias Ecto.Query

defmacrop is_op(op) do
Expand All @@ -49,7 +48,7 @@ defmodule Mongo.Ecto.NormalizedQuery do
end
end

def all(%Query{} = original, params) do
def all(original, params) do
check_query!(original, [:limit, :offset])

from = from(original)
Expand Down Expand Up @@ -125,7 +124,7 @@ defmodule Mongo.Ecto.NormalizedQuery do
%WriteQuery{coll: coll, query: query, command: command, database: original.prefix}
end

def update(%{source: {prefix, coll}, schema: schema}, fields, filter) do
def update(%{source: coll, prefix: prefix, schema: schema}, fields, filter) do
command = command(:update, fields, primary_key(schema))
query = query(filter, primary_key(schema))

Expand All @@ -143,13 +142,13 @@ defmodule Mongo.Ecto.NormalizedQuery do
%WriteQuery{coll: coll, query: query, database: original.prefix}
end

def delete(%{source: {prefix, coll}, schema: schema}, filter) do
def delete(%{source: coll, schema: schema, prefix: prefix}, filter) do
query = query(filter, primary_key(schema))

%WriteQuery{coll: coll, query: query, database: prefix}
end

def insert(%{source: {prefix, coll}, schema: schema}, document) do
def insert(%{source: coll, schema: schema, prefix: prefix}, document) do
command = command(:insert, document, primary_key(schema))

%WriteQuery{coll: coll, command: command, database: prefix}
Expand All @@ -159,11 +158,11 @@ defmodule Mongo.Ecto.NormalizedQuery do
%CommandQuery{command: command, database: Keyword.get(opts, :database, nil)}
end

defp from(%Query{from: {coll, model}}) do
defp from(%Query{from: %{source: {coll, model}}}) do
{coll, model, primary_key(model)}
end

defp from(%Query{from: %Ecto.SubQuery{}}) do
defp from(%Query{from: %{source: %Ecto.SubQuery{}}}) do
raise ArgumentError, "MongoDB does not support subqueries"
end

Expand All @@ -172,8 +171,13 @@ defmodule Mongo.Ecto.NormalizedQuery do

defp projection(%Query{select: nil}, _params, _from), do: {:find, %{}, []}

defp projection(%Query{select: %Query.SelectExpr{fields: fields}} = query, params, from),
do: projection(fields, params, from, query, %{}, [])
defp projection(
%Query{select: %Query.SelectExpr{fields: fields} = _select} = query,
params,
from
) do
projection(fields, params, from, query, %{}, [])
end

defp projection([], _params, _from, _query, pacc, facc), do: {:find, pacc, Enum.reverse(facc)}

Expand Down Expand Up @@ -340,8 +344,9 @@ defmodule Mongo.Ecto.NormalizedQuery do
["$set": values |> value(pk, "update command") |> map_unless_empty]
end

defp both_nil(nil, nil), do: true
defp both_nil(_, _), do: false
# Currently unused
# defp both_nil(nil, nil), do: true
# defp both_nil(_, _), do: false

defp offset_limit(nil, _params, _pk, _query, _where), do: nil

Expand Down
4 changes: 4 additions & 0 deletions lib/mongo_ecto/regex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ defmodule Mongo.Ecto.Regex do
"""
def load(%BSON.Regex{} = js), do: {:ok, Map.put(js, :__struct__, __MODULE__)}
def load(_), do: :error

def embed_as(_), do: :dump

def equal?(a, b), do: a == b
end
Loading