Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make events schema table dynamic #17

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fable/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Fable.Config do
:repo,
:registry,
:router,
event_schema: Fable.Event,
event_schema: {"events", Fable.Event},
process_manager_schema: Fable.ProcessManager.State,
json_library: Jason
]
Expand Down
4 changes: 2 additions & 2 deletions lib/fable/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Fable.Event do

use Ecto.Schema

schema "events" do
schema "abstract table: events" do
field(:prev_event_id, :integer)
field(:aggregate_id, Ecto.UUID, null: false)
field(:aggregate_table, :string, null: false)
Expand All @@ -28,7 +28,7 @@ defmodule Fable.Event do
queryable |> where(active: true)
end

def for_aggregate(schema \\ __MODULE__, %agg{id: id}) do
def for_aggregate(schema, %agg{id: id}) do
table = agg.__schema__(:source)

schema
Expand Down
8 changes: 5 additions & 3 deletions lib/fable/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Fable.Events do
* `:registry` - The name of the registry used for the process managers. Defaults to `Fable.` + repo name
* `:router` - The module responsible for defining the routing of events. Defaults to `__MODULE__`
to their handlers
* `:event_schema` - The schema to be used for persisting events. Defaults to `Fable.Event`
* `:event_schema` - The schema to be used for persisting events. Defaults to `{"events", Fable.Event}`
* `:process_manager_schema` - The schema to be used for persisting process managers' state. Defaults to `Fable.ProcessManager.State`
* `:json_library` - The library used for json encoding. Defaults to `Jason`

Expand Down Expand Up @@ -336,8 +336,10 @@ defmodule Fable.Events do
meta: Keyword.get(opts, :meta, %{})
}

config.event_schema
|> struct()
case config.event_schema do
{table, schema} -> schema |> struct() |> Ecto.put_meta(source: table)
schema -> schema |> struct()
end
|> Ecto.Changeset.cast(attrs, Map.keys(attrs))
end

Expand Down
19 changes: 11 additions & 8 deletions lib/fable/migrations/create_events.exs → lib/fable/migrations.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
defmodule Fable.Migrations.CreateEvents do
defmodule Fable.Migrations do
use Ecto.Migration

def change do
create table(:events, primary_key: false) do
def events_table(table, opts \\ []) do
aggregate_type = Keyword.get(opts, :aggregate_type, :uuid)

create table(table, primary_key: false) do
add(:id, :bigserial, primary_key: true)
add(:prev_event_id, :integer)
add(:aggregate_id, :uuid, null: false)
add(:aggregate_id, aggregate_type, null: false)
add(:aggregate_table, :string, null: false)
add(:type, :string, null: false)
add(:version, :integer, null: false)
Expand All @@ -14,7 +16,7 @@ defmodule Fable.Migrations.CreateEvents do
add(:inserted_at, :timestamp, null: false, default: fragment("statement_timestamp()"))
end

create(index(:events, [:aggregate_id, :aggregate_table]))
create index(table, [:aggregate_id, :aggregate_table])

execute(
"""
Expand All @@ -24,13 +26,14 @@ defmodule Fable.Migrations.CreateEvents do
as $$
DECLARE
rcount int;
result uuid;
result #{aggregate_type};
find_aggregate text := format(
'SELECT id FROM %I WHERE id = $1', NEW.aggregate_table
);
event_json text := json_build_object(
'aggregate_id', NEW.aggregate_id,
'aggregate_table', NEW.aggregate_table,
'events_table', TG_TABLE_NAME,
'id', NEW.id
);
update_aggregate text := format(
Expand Down Expand Up @@ -63,11 +66,11 @@ defmodule Fable.Migrations.CreateEvents do

execute(
"""
create trigger event_insert_update_last_event_id after insert on events
create trigger event_insert_update_last_event_id after insert on #{table}
for each row
execute procedure fn_trigger_last_event_update()
""",
"drop trigger event_insert_update_last_event_id ON events"
"drop trigger event_insert_update_last_event_id ON #{table}"
)
end
end
12 changes: 6 additions & 6 deletions lib/fable/process_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ defmodule Fable.ProcessManager do

def handle_continue(:acquire_lock, state) do
state
|> acquire_lock
|> process_events
|> acquire_lock()
|> process_events()
|> case do
{:ok, state} ->
{:noreply, state}
Expand All @@ -67,8 +67,8 @@ defmodule Fable.ProcessManager do
# to get the lock.
def handle_info(:acquire_lock, %__MODULE__{handler: nil} = state) do
state
|> acquire_lock
|> process_events
|> acquire_lock()
|> process_events()
|> case do
{:ok, state} ->
{:noreply, state}
Expand All @@ -85,7 +85,7 @@ defmodule Fable.ProcessManager do
# There's a new event we haven't already pulled it from the database
%{"id" => event_id} when event_id > last_event_id ->
state
|> process_events
|> process_events()
|> case do
{:ok, state} ->
{:noreply, state}
Expand All @@ -101,7 +101,7 @@ defmodule Fable.ProcessManager do

def handle_info(:retry, state) do
state
|> process_events
|> process_events()
|> case do
{:ok, state} ->
{:noreply, state}
Expand Down