Skip to content

Commit

Permalink
e2pg: move migrations,task into e2pg schema (#168)
Browse files Browse the repository at this point in the history
This commit requires a database reset.

All of e2pg's internal tables will be located in the e2pg schema. Tables
that are created from events will remain in the public schema.
  • Loading branch information
ryandotsmith authored Sep 28, 2023
1 parent dbc26c8 commit b80208b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 28 deletions.
10 changes: 5 additions & 5 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func (task *Task) Status() StatusSnapshot {
}

func (task *Task) Insert(n uint64, h []byte) error {
const q = `insert into task (id, number, hash) values ($1, $2, $3)`
const q = `insert into e2pg.task (id, number, hash) values ($1, $2, $3)`
_, err := task.pgp.Exec(context.Background(), q, task.ID, n, h)
return err
}

func (task *Task) Latest() (uint64, []byte, error) {
const q = `SELECT number, hash FROM task WHERE id = $1 ORDER BY number DESC LIMIT 1`
const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(context.Background(), q, task.ID).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down Expand Up @@ -289,7 +289,7 @@ func (task *Task) Converge(notx bool) error {
}
for reorgs := 0; reorgs <= 10; {
localNum, localHash := uint64(0), []byte{}
const q = `SELECT number, hash FROM task WHERE id = $1 ORDER BY number DESC LIMIT 1`
const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
err := pg.QueryRow(task.ctx, q, task.ID).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
Expand Down Expand Up @@ -323,7 +323,7 @@ func (task *Task) Converge(notx bool) error {
case errors.Is(err, ErrReorg):
reorgs++
slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash))
const dq = "delete from task where id = $1 AND number >= $2"
const dq = "delete from e2pg.task where id = $1 AND number >= $2"
_, err := pg.Exec(task.ctx, dq, task.ID, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
Expand Down Expand Up @@ -419,7 +419,7 @@ func (task *Task) writeIndex(localHash []byte, pg PG, delta uint64) error {
return fmt.Errorf("writing indexed data: %w", err)
}
var last = task.batch[delta-1]
const uq = "insert into task (id, number, hash) values ($1, $2, $3)"
const uq = "insert into e2pg.task (id, number, hash) values ($1, $2, $3)"
_, err := pg.Exec(context.Background(), uq, task.ID, last.Num(), last.Hash())
if err != nil {
return fmt.Errorf("updating task table: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions e2pg/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ var Migrations = map[int]pgmig.Migration{
5: pgmig.Migration{
SQL: "create index on task(id, number desc);",
},
6: pgmig.Migration{
SQL: `alter table task set schema e2pg`,
},
}
30 changes: 17 additions & 13 deletions e2pg/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,32 @@ SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;


CREATE SCHEMA e2pg;


SET default_tablespace = '';

SET default_table_access_method = heap;


CREATE TABLE public.e2pg_migrations (
CREATE TABLE e2pg.migrations (
idx integer NOT NULL,
hash bytea NOT NULL,
inserted_at timestamp with time zone DEFAULT now() NOT NULL
);



CREATE TABLE e2pg.task (
id smallint NOT NULL,
number bigint,
hash bytea,
insert_at timestamp with time zone DEFAULT now()
);



CREATE TABLE public.erc20_transfers (
contract bytea,
f bytea,
Expand Down Expand Up @@ -83,17 +96,8 @@ CREATE TABLE public.nft_transfers (



CREATE TABLE public.task (
id smallint NOT NULL,
number bigint,
hash bytea,
insert_at timestamp with time zone DEFAULT now()
);



ALTER TABLE ONLY public.e2pg_migrations
ADD CONSTRAINT e2pg_migrations_pkey PRIMARY KEY (idx, hash);
ALTER TABLE ONLY e2pg.migrations
ADD CONSTRAINT migrations_pkey PRIMARY KEY (idx, hash);



Expand All @@ -107,7 +111,7 @@ ALTER TABLE ONLY public.erc4337_userops



CREATE INDEX task_id_number_idx ON public.task USING btree (id, number DESC);
CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, number DESC);



Expand Down
3 changes: 1 addition & 2 deletions integrations/testhelper/helper.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Easily test e2pg integrations
//
package testhelper

import (
Expand Down Expand Up @@ -47,7 +46,7 @@ func New(tb testing.TB) *H {

// Reset the task table. Call this in-between test cases
func (th *H) Reset() {
_, err := th.PG.Exec(context.Background(), "truncate table task")
_, err := th.PG.Exec(context.Background(), "truncate table e2pg.task")
check(th.tb, err)
}

Expand Down
20 changes: 12 additions & 8 deletions pgmig/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Migrations map[int]Migration
//
// migs is a map because the keys represent the order of migrations
// and so they should be unique. They keys are written to the idx
// columne of the e2pg_migrations table.
// columne of the e2pg.migrations table.
//
// The e2pg_migrations table will be created if it doesn't
// The e2pg.migrations table will be created if it doesn't
// already exist.
//
// Migrate will use pg_try_advisory_lock to ensure that only
Expand All @@ -59,16 +59,20 @@ func Migrate(pgp *pgxpool.Pool, migs Migrations) error {
if err != nil || !locked {
return fmt.Errorf("locking db for migrations: %w", err)
}

const q1 = `
create table if not exists e2pg_migrations (
const q1 = `create schema if not exists e2pg`
_, err = pgp.Exec(ctx, q1)
if err != nil {
return fmt.Errorf("creating e2pg schema: %w", err)
}
const q2 = `
create table if not exists e2pg.migrations (
idx int not null,
hash bytea not null,
inserted_at timestamptz default now() not null,
primary key (idx, hash)
);
`
_, err = pgp.Exec(ctx, q1)
_, err = pgp.Exec(ctx, q2)
if err != nil {
return fmt.Errorf("creating migrations table: %w", err)
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func migrate(ctx context.Context, db execer, i int, m Migration) error {
if err != nil {
return fmt.Errorf("migration %d %x exec error: %w", i, m.Hash(), err)
}
const q = `insert into e2pg_migrations(idx, hash) values ($1, $2)`
const q = `insert into e2pg.migrations(idx, hash) values ($1, $2)`
_, err = db.Exec(ctx, q, i, m.Hash())
if err != nil {
return fmt.Errorf("migrations table %d %x insert error: %w", i, m.Hash(), err)
Expand All @@ -129,7 +133,7 @@ func migrate(ctx context.Context, db execer, i int, m Migration) error {
}

func exists(ctx context.Context, db execer, i int, m Migration) (bool, error) {
const q = `select true from e2pg_migrations where idx = $1 and hash = $2`
const q = `select true from e2pg.migrations where idx = $1 and hash = $2`
var found bool
err := db.QueryRow(ctx, q, i, m.Hash()).Scan(&found)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down
3 changes: 3 additions & 0 deletions pgmig/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func TestMigrate(t *testing.T) {
diff.Test(t, t.Fatalf, err, nil)

reset := func() {
if _, err := pg.Exec(ctx, "drop schema if exists e2pg cascade"); err != nil {
t.Fatalf("dropping e2pg schema: %s", err)
}
if _, err := pg.Exec(ctx, "drop schema public cascade"); err != nil {
t.Fatalf("dropping schema: %s", err)
}
Expand Down

0 comments on commit b80208b

Please sign in to comment.