From 5608bbdda702541a9482d84d097b4240c5526de6 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Thu, 26 Oct 2023 14:22:00 -0700 Subject: [PATCH] e2pg: refactor. remove Insert and Latest on Task I don't really want to support these methods as a part of the Task API. I also like to see the precise query in the context of the code. --- e2pg/e2pg.go | 86 +++++++++++++++++++-------------------- e2pg/e2pg_test.go | 50 ++++++++++++----------- e2pg/integration_test.go | 6 +-- geth/testdata/1-bodies | 1 + geth/testdata/1-hashes | 1 + geth/testdata/1-headers | Bin 0 -> 291 bytes geth/testdata/1-receipts | Bin 0 -> 3 bytes 7 files changed, 73 insertions(+), 71 deletions(-) create mode 100644 geth/testdata/1-bodies create mode 100644 geth/testdata/1-hashes create mode 100644 geth/testdata/1-headers create mode 100644 geth/testdata/1-receipts diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 7560e172..f7364194 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -163,62 +163,60 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) { t.dstat[name] = s } -func (task *Task) Insert(n uint64, h []byte) error { - const q = ` - insert into e2pg.task (src_name, backfill, num, hash) - values ($1, $2, $3, $4) - ` - _, err := task.pgp.Exec(task.ctx, q, - task.srcName, - task.backfill, - n, - h, - ) - return err -} - -func (task *Task) Latest() (uint64, []byte, error) { - const q = ` - select num, hash +func (task *Task) Setup() error { + const q1 = ` + select true from e2pg.task where src_name = $1 and backfill = $2 - order by num desc limit 1 ` - var n, h = uint64(0), []byte{} - err := task.pgp.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&n, &h) - if errors.Is(err, pgx.ErrNoRows) { - return n, nil, nil - } - return n, h, err -} - -func (task *Task) Setup() error { - _, localHash, err := task.Latest() - if err != nil { - return err - } - if len(localHash) > 0 { - // already setup - return nil + var f bool + err := task.pgp.QueryRow(task.ctx, q1, task.srcName, task.backfill).Scan(&f) + switch { + case errors.Is(err, pgx.ErrNoRows): + break + case err != nil: + return fmt.Errorf("querying for latest task update: %w", err) + default: + return nil // already setup } - if task.start > 0 { + const insertQuery = ` + insert into e2pg.task(src_name, backfill, hash, num) + values ($1, $2, $3, $4) + ` + switch { + case task.start > 0: h, err := task.src.Hash(task.start - 1) if err != nil { return err } - return task.Insert(task.start-1, h) - } - gethNum, _, err := task.src.Latest() - if err != nil { + _, err = task.pgp.Exec(task.ctx, + insertQuery, + task.srcName, + task.backfill, + h, + task.start-1, + ) + return err + default: + gethNum, _, err := task.src.Latest() + if err != nil { + return err + } + h, err := task.src.Hash(gethNum - 1) + if err != nil { + return fmt.Errorf("getting hash for %d: %w", gethNum-1, err) + } + _, err = task.pgp.Exec(task.ctx, + insertQuery, + task.srcName, + task.backfill, + h, + gethNum-1, + ) return err } - h, err := task.src.Hash(gethNum - 1) - if err != nil { - return fmt.Errorf("getting hash for %d: %w", gethNum-1, err) - } - return task.Insert(gethNum-1, h) } var ( diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index 509283e9..e5a8f79a 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -143,7 +143,7 @@ func TestSetup(t *testing.T) { tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(0, "1", tg), + WithSource(0, "foo", tg), WithPG(pg), WithDestinations(newTestDestination()), ) @@ -153,10 +153,13 @@ func TestSetup(t *testing.T) { tg.add(2, hash(2), hash(1)) diff.Test(t, t.Errorf, task.Setup(), nil) - n, h, err := task.Latest() - diff.Test(t, t.Errorf, err, nil) - diff.Test(t, t.Errorf, n, uint64(1)) - diff.Test(t, t.Errorf, h, hash(1)) + checkQuery(t, pg, ` + select true + from e2pg.task + where src_name = 'foo' + and hash = $1 + and num = $2 + `, hash(1), uint64(1)) } func TestConverge_Zero(t *testing.T) { @@ -164,7 +167,7 @@ func TestConverge_Zero(t *testing.T) { tg = &testGeth{} pg = testpg(t) task = NewTask( - WithSource(0, "1", tg), + WithSource(0, "foo", tg), WithPG(pg), WithDestinations(newTestDestination()), ) @@ -178,7 +181,7 @@ func TestConverge_EmptyDestination(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(0, "1", tg), + WithSource(0, "foo", tg), WithPG(pg), WithDestinations(dest), ) @@ -186,7 +189,7 @@ func TestConverge_EmptyDestination(t *testing.T) { tg.add(0, hash(0), hash(0)) tg.add(1, hash(1), hash(0)) dest.add(0, hash(0), hash(0)) - task.Insert(0, hash(0)) + taskAdd(t, pg, "foo", 0, hash(0)) diff.Test(t, t.Fatalf, task.Converge(true), nil) diff.Test(t, t.Errorf, dest.blocks(), tg.blocks) } @@ -197,7 +200,7 @@ func TestConverge_Reorg(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(0, "", tg), + WithSource(0, "foo", tg), WithPG(pg), WithDestinations(dest), ) @@ -210,8 +213,8 @@ func TestConverge_Reorg(t *testing.T) { dest.add(0, hash(0), hash(0)) dest.add(1, hash(2), hash(0)) - taskAdd(t, pg, "", 0, hash(0), dest.Name()) - taskAdd(t, pg, "", 1, hash(1), dest.Name()) + taskAdd(t, pg, "foo", 0, hash(0), dest.Name()) + taskAdd(t, pg, "foo", 1, hash(1), dest.Name()) diff.Test(t, t.Fatalf, task.Converge(false), nil) diff.Test(t, t.Fatalf, task.Converge(false), nil) @@ -257,7 +260,7 @@ func TestConverge_DeltaBatchSize(t *testing.T) { tg = &testGeth{} dest = newTestDestination() task = NewTask( - WithSource(0, "1", tg), + WithSource(0, "foo", tg), WithPG(pg), WithConcurrency(workers, batchSize), WithDestinations(dest), @@ -266,7 +269,7 @@ func TestConverge_DeltaBatchSize(t *testing.T) { tg.add(0, hash(0), hash(0)) dest.add(0, hash(0), hash(0)) - task.Insert(0, hash(0)) + taskAdd(t, pg, "foo", 0, hash(0)) for i := uint64(1); i <= batchSize+1; i++ { tg.add(i, hash(byte(i)), hash(byte(i-1))) @@ -286,14 +289,13 @@ func TestConverge_MultipleTasks(t *testing.T) { dest1 = newTestDestination() dest2 = newTestDestination() task1 = NewTask( - WithSource(0, "1", tg), + WithSource(0, "a", tg), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest1), ) task2 = NewTask( - WithSource(0, "1", tg), - WithBackfill(true), + WithSource(0, "b", tg), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest2), @@ -302,8 +304,8 @@ func TestConverge_MultipleTasks(t *testing.T) { tg.add(1, hash(1), hash(0)) tg.add(2, hash(2), hash(1)) - task1.Insert(0, hash(0)) - task2.Insert(0, hash(0)) + taskAdd(t, pg, "a", 0, hash(0)) + taskAdd(t, pg, "b", 0, hash(0)) diff.Test(t, t.Errorf, task1.Converge(true), nil) diff.Test(t, t.Errorf, dest1.blocks(), tg.blocks) @@ -318,7 +320,7 @@ func TestConverge_LocalAhead(t *testing.T) { pg = testpg(t) dest = newTestDestination() task = NewTask( - WithSource(0, "1", tg), + WithSource(0, "foo", tg), WithPG(pg), WithConcurrency(1, 3), WithDestinations(dest), @@ -326,16 +328,16 @@ func TestConverge_LocalAhead(t *testing.T) { ) tg.add(1, hash(1), hash(0)) - task.Insert(0, hash(0)) - task.Insert(1, hash(1)) - task.Insert(2, hash(2)) + taskAdd(t, pg, "foo", 0, hash(0)) + taskAdd(t, pg, "foo", 1, hash(1)) + taskAdd(t, pg, "foo", 2, hash(2)) diff.Test(t, t.Errorf, task.Converge(true), ErrAhead) } -func checkQuery(tb testing.TB, pg wpg.Conn, query string) { +func checkQuery(tb testing.TB, pg wpg.Conn, query string, args ...any) { var found bool - err := pg.QueryRow(context.Background(), query).Scan(&found) + err := pg.QueryRow(context.Background(), query, args...).Scan(&found) diff.Test(tb, tb.Fatalf, err, nil) if !found { tb.Errorf("query\n%s\nreturned false", query) diff --git a/e2pg/integration_test.go b/e2pg/integration_test.go index 94f8e5aa..02f61161 100644 --- a/e2pg/integration_test.go +++ b/e2pg/integration_test.go @@ -69,14 +69,14 @@ func (th *Helper) Process(dest Destination, n uint64) { WithSource(0, "", geth), WithPG(th.PG), WithDestinations(dest), + WithRange(n, n+1), ) ) cur, err := geth.Hash(n) check(th.tb, err) - prev, err := geth.Hash(n - 1) - check(th.tb, err) th.gt.SetLatest(n, cur) - check(th.tb, task.Insert(n-1, prev)) + + check(th.tb, task.Setup()) check(th.tb, task.Converge(true)) } diff --git a/geth/testdata/1-bodies b/geth/testdata/1-bodies new file mode 100644 index 00000000..6ff518c8 --- /dev/null +++ b/geth/testdata/1-bodies @@ -0,0 +1 @@ +ÂÀÀ \ No newline at end of file diff --git a/geth/testdata/1-hashes b/geth/testdata/1-hashes new file mode 100644 index 00000000..63b32112 --- /dev/null +++ b/geth/testdata/1-hashes @@ -0,0 +1 @@ + |ˆémE7¾¤ÙÀ]T™³%aÓ¿1ôZ®sLÜŸ@l¶ \ No newline at end of file diff --git a/geth/testdata/1-headers b/geth/testdata/1-headers new file mode 100644 index 0000000000000000000000000000000000000000..7fb90a676e73203035b7fca7f6920ee599c6fea3 GIT binary patch literal 291 zcmbQj@?qOgCcy<)o~AqeC|mdAfWVF{hpS)HT^CARn2=@iY}T6|-i@dG7cY=Kty|N#ow*`)S;$p-N>_>ShlNgm9j;7vpTfE{PdDgmCW zj|b8-bJiYUOw!-_IC+aA9tE6=96K2q{vn|c!L7{y8yFZEn}jU? zGt@KCGtke;%qy+XPtP~hGto0zFm3h(-bdA0$Ci~lnZ;g<4_y$HzN;l_L^A<;w6 Q@*_KfXJ45<|GnH10OtaSCjbBd literal 0 HcmV?d00001 diff --git a/geth/testdata/1-receipts b/geth/testdata/1-receipts new file mode 100644 index 0000000000000000000000000000000000000000..304240271a69d3276c865b44fecd03988ccb7444 GIT binary patch literal 3 KcmZQ%H~;_u#sI