From b80208b54b861a4b7ab43545b530c6e8583b5141 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Wed, 27 Sep 2023 21:09:00 -0700 Subject: [PATCH] e2pg: move migrations,task into e2pg schema (#168) This commit requires a database reset. All of e2pg's internal tables will be located in the e2pg schema. Tables that are created from events will remain in the public schema. --- e2pg/e2pg.go | 10 +++++----- e2pg/migrations.go | 3 +++ e2pg/schema.sql | 30 +++++++++++++++++------------- integrations/testhelper/helper.go | 3 +-- pgmig/migrate.go | 20 ++++++++++++-------- pgmig/migrate_test.go | 3 +++ 6 files changed, 41 insertions(+), 28 deletions(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 7fae25fc..a1de5091 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -188,13 +188,13 @@ func (task *Task) Status() StatusSnapshot { } func (task *Task) Insert(n uint64, h []byte) error { - const q = `insert into task (id, number, hash) values ($1, $2, $3)` + const q = `insert into e2pg.task (id, number, hash) values ($1, $2, $3)` _, err := task.pgp.Exec(context.Background(), q, task.ID, n, h) return err } func (task *Task) Latest() (uint64, []byte, error) { - const q = `SELECT number, hash FROM task WHERE id = $1 ORDER BY number DESC LIMIT 1` + const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1` var n, h = uint64(0), []byte{} err := task.pgp.QueryRow(context.Background(), q, task.ID).Scan(&n, &h) if errors.Is(err, pgx.ErrNoRows) { @@ -289,7 +289,7 @@ func (task *Task) Converge(notx bool) error { } for reorgs := 0; reorgs <= 10; { localNum, localHash := uint64(0), []byte{} - const q = `SELECT number, hash FROM task WHERE id = $1 ORDER BY number DESC LIMIT 1` + const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1` err := pg.QueryRow(task.ctx, q, task.ID).Scan(&localNum, &localHash) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("getting latest from task: %w", err) @@ -323,7 +323,7 @@ func (task *Task) Converge(notx bool) error { case errors.Is(err, ErrReorg): reorgs++ slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash)) - const dq = "delete from task where id = $1 AND number >= $2" + const dq = "delete from e2pg.task where id = $1 AND number >= $2" _, err := pg.Exec(task.ctx, dq, task.ID, localNum) if err != nil { return fmt.Errorf("deleting block from task table: %w", err) @@ -419,7 +419,7 @@ func (task *Task) writeIndex(localHash []byte, pg PG, delta uint64) error { return fmt.Errorf("writing indexed data: %w", err) } var last = task.batch[delta-1] - const uq = "insert into task (id, number, hash) values ($1, $2, $3)" + const uq = "insert into e2pg.task (id, number, hash) values ($1, $2, $3)" _, err := pg.Exec(context.Background(), uq, task.ID, last.Num(), last.Hash()) if err != nil { return fmt.Errorf("updating task table: %w", err) diff --git a/e2pg/migrations.go b/e2pg/migrations.go index 8e4fd802..e1b45794 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -96,4 +96,7 @@ var Migrations = map[int]pgmig.Migration{ 5: pgmig.Migration{ SQL: "create index on task(id, number desc);", }, + 6: pgmig.Migration{ + SQL: `alter table task set schema e2pg`, + }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index 67ae11fc..bdfc7594 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -11,12 +11,16 @@ SET xmloption = content; SET client_min_messages = warning; SET row_security = off; + +CREATE SCHEMA e2pg; + + SET default_tablespace = ''; SET default_table_access_method = heap; -CREATE TABLE public.e2pg_migrations ( +CREATE TABLE e2pg.migrations ( idx integer NOT NULL, hash bytea NOT NULL, inserted_at timestamp with time zone DEFAULT now() NOT NULL @@ -24,6 +28,15 @@ CREATE TABLE public.e2pg_migrations ( +CREATE TABLE e2pg.task ( + id smallint NOT NULL, + number bigint, + hash bytea, + insert_at timestamp with time zone DEFAULT now() +); + + + CREATE TABLE public.erc20_transfers ( contract bytea, f bytea, @@ -83,17 +96,8 @@ CREATE TABLE public.nft_transfers ( -CREATE TABLE public.task ( - id smallint NOT NULL, - number bigint, - hash bytea, - insert_at timestamp with time zone DEFAULT now() -); - - - -ALTER TABLE ONLY public.e2pg_migrations - ADD CONSTRAINT e2pg_migrations_pkey PRIMARY KEY (idx, hash); +ALTER TABLE ONLY e2pg.migrations + ADD CONSTRAINT migrations_pkey PRIMARY KEY (idx, hash); @@ -107,7 +111,7 @@ ALTER TABLE ONLY public.erc4337_userops -CREATE INDEX task_id_number_idx ON public.task USING btree (id, number DESC); +CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, number DESC); diff --git a/integrations/testhelper/helper.go b/integrations/testhelper/helper.go index 0ec0c57f..86a737bb 100644 --- a/integrations/testhelper/helper.go +++ b/integrations/testhelper/helper.go @@ -1,5 +1,4 @@ // Easily test e2pg integrations -// package testhelper import ( @@ -47,7 +46,7 @@ func New(tb testing.TB) *H { // Reset the task table. Call this in-between test cases func (th *H) Reset() { - _, err := th.PG.Exec(context.Background(), "truncate table task") + _, err := th.PG.Exec(context.Background(), "truncate table e2pg.task") check(th.tb, err) } diff --git a/pgmig/migrate.go b/pgmig/migrate.go index d9a9d711..4c8065d0 100644 --- a/pgmig/migrate.go +++ b/pgmig/migrate.go @@ -43,9 +43,9 @@ type Migrations map[int]Migration // // migs is a map because the keys represent the order of migrations // and so they should be unique. They keys are written to the idx -// columne of the e2pg_migrations table. +// columne of the e2pg.migrations table. // -// The e2pg_migrations table will be created if it doesn't +// The e2pg.migrations table will be created if it doesn't // already exist. // // Migrate will use pg_try_advisory_lock to ensure that only @@ -59,16 +59,20 @@ func Migrate(pgp *pgxpool.Pool, migs Migrations) error { if err != nil || !locked { return fmt.Errorf("locking db for migrations: %w", err) } - - const q1 = ` - create table if not exists e2pg_migrations ( + const q1 = `create schema if not exists e2pg` + _, err = pgp.Exec(ctx, q1) + if err != nil { + return fmt.Errorf("creating e2pg schema: %w", err) + } + const q2 = ` + create table if not exists e2pg.migrations ( idx int not null, hash bytea not null, inserted_at timestamptz default now() not null, primary key (idx, hash) ); ` - _, err = pgp.Exec(ctx, q1) + _, err = pgp.Exec(ctx, q2) if err != nil { return fmt.Errorf("creating migrations table: %w", err) } @@ -120,7 +124,7 @@ func migrate(ctx context.Context, db execer, i int, m Migration) error { if err != nil { return fmt.Errorf("migration %d %x exec error: %w", i, m.Hash(), err) } - const q = `insert into e2pg_migrations(idx, hash) values ($1, $2)` + const q = `insert into e2pg.migrations(idx, hash) values ($1, $2)` _, err = db.Exec(ctx, q, i, m.Hash()) if err != nil { return fmt.Errorf("migrations table %d %x insert error: %w", i, m.Hash(), err) @@ -129,7 +133,7 @@ func migrate(ctx context.Context, db execer, i int, m Migration) error { } func exists(ctx context.Context, db execer, i int, m Migration) (bool, error) { - const q = `select true from e2pg_migrations where idx = $1 and hash = $2` + const q = `select true from e2pg.migrations where idx = $1 and hash = $2` var found bool err := db.QueryRow(ctx, q, i, m.Hash()).Scan(&found) if errors.Is(err, pgx.ErrNoRows) { diff --git a/pgmig/migrate_test.go b/pgmig/migrate_test.go index ddc4c4d6..368d9b9c 100644 --- a/pgmig/migrate_test.go +++ b/pgmig/migrate_test.go @@ -44,6 +44,9 @@ func TestMigrate(t *testing.T) { diff.Test(t, t.Fatalf, err, nil) reset := func() { + if _, err := pg.Exec(ctx, "drop schema if exists e2pg cascade"); err != nil { + t.Fatalf("dropping e2pg schema: %s", err) + } if _, err := pg.Exec(ctx, "drop schema public cascade"); err != nil { t.Fatalf("dropping schema: %s", err) }