Skip to content

Commit

Permalink
e2pg: refactor. remove Insert and Latest on Task
Browse files Browse the repository at this point in the history
I don't really want to support these methods as a part of the Task API.
I also like to see the precise query in the context of the code.
  • Loading branch information
ryandotsmith committed Oct 26, 2023
1 parent 7ab8ba3 commit 5608bbd
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 71 deletions.
86 changes: 42 additions & 44 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,62 +163,60 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) {
t.dstat[name] = s
}

func (task *Task) Insert(n uint64, h []byte) error {
const q = `
insert into e2pg.task (src_name, backfill, num, hash)
values ($1, $2, $3, $4)
`
_, err := task.pgp.Exec(task.ctx, q,
task.srcName,
task.backfill,
n,
h,
)
return err
}

func (task *Task) Latest() (uint64, []byte, error) {
const q = `
select num, hash
func (task *Task) Setup() error {
const q1 = `
select true
from e2pg.task
where src_name = $1
and backfill = $2
order by num desc
limit 1
`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(task.ctx, q, task.srcName, task.backfill).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
return n, nil, nil
}
return n, h, err
}

func (task *Task) Setup() error {
_, localHash, err := task.Latest()
if err != nil {
return err
}
if len(localHash) > 0 {
// already setup
return nil
var f bool
err := task.pgp.QueryRow(task.ctx, q1, task.srcName, task.backfill).Scan(&f)
switch {
case errors.Is(err, pgx.ErrNoRows):
break
case err != nil:
return fmt.Errorf("querying for latest task update: %w", err)
default:
return nil // already setup
}
if task.start > 0 {
const insertQuery = `
insert into e2pg.task(src_name, backfill, hash, num)
values ($1, $2, $3, $4)
`
switch {
case task.start > 0:
h, err := task.src.Hash(task.start - 1)
if err != nil {
return err
}
return task.Insert(task.start-1, h)
}
gethNum, _, err := task.src.Latest()
if err != nil {
_, err = task.pgp.Exec(task.ctx,
insertQuery,
task.srcName,
task.backfill,
h,
task.start-1,
)
return err
default:
gethNum, _, err := task.src.Latest()
if err != nil {
return err
}
h, err := task.src.Hash(gethNum - 1)
if err != nil {
return fmt.Errorf("getting hash for %d: %w", gethNum-1, err)
}
_, err = task.pgp.Exec(task.ctx,
insertQuery,
task.srcName,
task.backfill,
h,
gethNum-1,
)
return err
}
h, err := task.src.Hash(gethNum - 1)
if err != nil {
return fmt.Errorf("getting hash for %d: %w", gethNum-1, err)
}
return task.Insert(gethNum-1, h)
}

var (
Expand Down
50 changes: 26 additions & 24 deletions e2pg/e2pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestSetup(t *testing.T) {
tg = &testGeth{}
pg = testpg(t)
task = NewTask(
WithSource(0, "1", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithDestinations(newTestDestination()),
)
Expand All @@ -153,18 +153,21 @@ func TestSetup(t *testing.T) {
tg.add(2, hash(2), hash(1))
diff.Test(t, t.Errorf, task.Setup(), nil)

n, h, err := task.Latest()
diff.Test(t, t.Errorf, err, nil)
diff.Test(t, t.Errorf, n, uint64(1))
diff.Test(t, t.Errorf, h, hash(1))
checkQuery(t, pg, `
select true
from e2pg.task
where src_name = 'foo'
and hash = $1
and num = $2
`, hash(1), uint64(1))
}

func TestConverge_Zero(t *testing.T) {
var (
tg = &testGeth{}
pg = testpg(t)
task = NewTask(
WithSource(0, "1", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithDestinations(newTestDestination()),
)
Expand All @@ -178,15 +181,15 @@ func TestConverge_EmptyDestination(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination()
task = NewTask(
WithSource(0, "1", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithDestinations(dest),
)
)
tg.add(0, hash(0), hash(0))
tg.add(1, hash(1), hash(0))
dest.add(0, hash(0), hash(0))
task.Insert(0, hash(0))
taskAdd(t, pg, "foo", 0, hash(0))
diff.Test(t, t.Fatalf, task.Converge(true), nil)
diff.Test(t, t.Errorf, dest.blocks(), tg.blocks)
}
Expand All @@ -197,7 +200,7 @@ func TestConverge_Reorg(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination()
task = NewTask(
WithSource(0, "", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithDestinations(dest),
)
Expand All @@ -210,8 +213,8 @@ func TestConverge_Reorg(t *testing.T) {
dest.add(0, hash(0), hash(0))
dest.add(1, hash(2), hash(0))

taskAdd(t, pg, "", 0, hash(0), dest.Name())
taskAdd(t, pg, "", 1, hash(1), dest.Name())
taskAdd(t, pg, "foo", 0, hash(0), dest.Name())
taskAdd(t, pg, "foo", 1, hash(1), dest.Name())

diff.Test(t, t.Fatalf, task.Converge(false), nil)
diff.Test(t, t.Fatalf, task.Converge(false), nil)
Expand Down Expand Up @@ -257,7 +260,7 @@ func TestConverge_DeltaBatchSize(t *testing.T) {
tg = &testGeth{}
dest = newTestDestination()
task = NewTask(
WithSource(0, "1", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithConcurrency(workers, batchSize),
WithDestinations(dest),
Expand All @@ -266,7 +269,7 @@ func TestConverge_DeltaBatchSize(t *testing.T) {

tg.add(0, hash(0), hash(0))
dest.add(0, hash(0), hash(0))
task.Insert(0, hash(0))
taskAdd(t, pg, "foo", 0, hash(0))

for i := uint64(1); i <= batchSize+1; i++ {
tg.add(i, hash(byte(i)), hash(byte(i-1)))
Expand All @@ -286,14 +289,13 @@ func TestConverge_MultipleTasks(t *testing.T) {
dest1 = newTestDestination()
dest2 = newTestDestination()
task1 = NewTask(
WithSource(0, "1", tg),
WithSource(0, "a", tg),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest1),
)
task2 = NewTask(
WithSource(0, "1", tg),
WithBackfill(true),
WithSource(0, "b", tg),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest2),
Expand All @@ -302,8 +304,8 @@ func TestConverge_MultipleTasks(t *testing.T) {
tg.add(1, hash(1), hash(0))
tg.add(2, hash(2), hash(1))

task1.Insert(0, hash(0))
task2.Insert(0, hash(0))
taskAdd(t, pg, "a", 0, hash(0))
taskAdd(t, pg, "b", 0, hash(0))

diff.Test(t, t.Errorf, task1.Converge(true), nil)
diff.Test(t, t.Errorf, dest1.blocks(), tg.blocks)
Expand All @@ -318,24 +320,24 @@ func TestConverge_LocalAhead(t *testing.T) {
pg = testpg(t)
dest = newTestDestination()
task = NewTask(
WithSource(0, "1", tg),
WithSource(0, "foo", tg),
WithPG(pg),
WithConcurrency(1, 3),
WithDestinations(dest),
)
)
tg.add(1, hash(1), hash(0))

task.Insert(0, hash(0))
task.Insert(1, hash(1))
task.Insert(2, hash(2))
taskAdd(t, pg, "foo", 0, hash(0))
taskAdd(t, pg, "foo", 1, hash(1))
taskAdd(t, pg, "foo", 2, hash(2))

diff.Test(t, t.Errorf, task.Converge(true), ErrAhead)
}

func checkQuery(tb testing.TB, pg wpg.Conn, query string) {
func checkQuery(tb testing.TB, pg wpg.Conn, query string, args ...any) {
var found bool
err := pg.QueryRow(context.Background(), query).Scan(&found)
err := pg.QueryRow(context.Background(), query, args...).Scan(&found)
diff.Test(tb, tb.Fatalf, err, nil)
if !found {
tb.Errorf("query\n%s\nreturned false", query)
Expand Down
6 changes: 3 additions & 3 deletions e2pg/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func (th *Helper) Process(dest Destination, n uint64) {
WithSource(0, "", geth),
WithPG(th.PG),
WithDestinations(dest),
WithRange(n, n+1),
)
)
cur, err := geth.Hash(n)
check(th.tb, err)
prev, err := geth.Hash(n - 1)
check(th.tb, err)
th.gt.SetLatest(n, cur)
check(th.tb, task.Insert(n-1, prev))

check(th.tb, task.Setup())
check(th.tb, task.Converge(true))
}

Expand Down
1 change: 1 addition & 0 deletions geth/testdata/1-bodies
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
���
1 change: 1 addition & 0 deletions geth/testdata/1-hashes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
|��mE7����]T��%aӿ1�Z�sL��@l�
Binary file added geth/testdata/1-headers
Binary file not shown.
Binary file added geth/testdata/1-receipts
Binary file not shown.

0 comments on commit 5608bbd

Please sign in to comment.