Skip to content

Commit

Permalink
e2pg,abi2: add unique index on pg tables by default
Browse files Browse the repository at this point in the history
E2PG will look at each integration and figure out a default set of
columns to create a unique index on. Since the following columns are
required: intg_name, src_name, block_num, and tx_idx, E2PG will start
with that as a unique index. If your integration also accessed data from
a log it will include log_idx. If the log contains ABI encoded data it
will include abi_idx as well.

If you provide your own set of unique columns then E2PG will not supply
a unique index and instead it will create the one you supplied. For
example:

	{
		"integrations": [
			{
				"table": {
					"unique": [
						["my_unique_hash"]
					]
				}
			}
		]
	}

This behaviour can be disabled by the following config:

	{
		"integrations": [
			{
				"table": {
					"disable_unique": true
				}
			}
		]
	}

(Only showing the relevant config field path. Other required config is
omitted for brevity)
  • Loading branch information
ryandotsmith committed Nov 3, 2023
1 parent a85dc16 commit 725d899
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 46 deletions.
133 changes: 89 additions & 44 deletions abi2/abi2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"strconv"
"strings"
"unicode"
Expand Down Expand Up @@ -599,45 +600,17 @@ func New(name string, ev Event, bd []BlockData, table Table) (Integration, error
resultCache: NewResult(ev.ABIType()),
sighash: ev.SignatureHash(),
}
ig.addRequiredFields()
if err := ig.validate(); err != nil {
return ig, fmt.Errorf("validating %s: %w", name, err)
}
for _, input := range ev.Selected() {
c, err := col(ig.Table, input.Column)
if err != nil {
return ig, err
}
ig.Columns = append(ig.Columns, c.Name)
ig.coldefs = append(ig.coldefs, coldef{
Input: input,
Column: c,
})
ig.numSelected++
}
for _, data := range ig.Block {
c, err := col(ig.Table, data.Column)
if err != nil {
return ig, err
}
ig.Columns = append(ig.Columns, c.Name)
ig.coldefs = append(ig.coldefs, coldef{
BlockData: data,
Column: c,
})
ig.numBDSelected++
if err := ig.validateSQL(); err != nil {
return ig, fmt.Errorf("validating sql input: %w", err)
}
return ig, nil
}

func (ig *Integration) validate() error {
ig.addRequiredFields()
if err := ig.validateCols(); err != nil {
return fmt.Errorf("validating columns: %w", err)
return ig, fmt.Errorf("validating columns: %w", err)
}
if err := ig.validateSQL(); err != nil {
return fmt.Errorf("validating sql input: %w", err)
}
return nil
ig.setCols()
ig.addUniqueIndex()
return ig, nil
}

func (ig *Integration) validateSQL() error {
Expand Down Expand Up @@ -716,6 +689,59 @@ func (ig *Integration) validateCols() error {
return nil
}

func (ig *Integration) setCols() {
getCol := func(name string) Column {
for _, c := range ig.Table.Cols {
if c.Name == name {
return c
}
}
return Column{}
}
for _, input := range ig.Event.Selected() {
c := getCol(input.Column)
ig.Columns = append(ig.Columns, c.Name)
ig.coldefs = append(ig.coldefs, coldef{
Input: input,
Column: c,
})
ig.numSelected++
}
for _, bd := range ig.Block {
c := getCol(bd.Column)
ig.Columns = append(ig.Columns, c.Name)
ig.coldefs = append(ig.coldefs, coldef{
BlockData: bd,
Column: c,
})
ig.numBDSelected++
}
}

// sets default unique columns unless already set by user
func (ig *Integration) addUniqueIndex() {
if len(ig.Table.Unique) > 0 {
return
}
possible := []string{
"intg_name",
"src_name",
"block_num",
"tx_idx",
"log_idx",
"abi_idx",
}
var uidx []string
for _, name := range possible {
if slices.Contains(ig.Columns, name) {
uidx = append(uidx, name)
}
}
if len(uidx) > 0 {
ig.Table.Unique = append(ig.Table.Unique, uidx)
}
}

func (ig *Integration) addRequiredFields() {
hasBD := func(name string) bool {
for _, bd := range ig.Block {
Expand Down Expand Up @@ -766,15 +792,6 @@ func validateString(s string) error {
return nil
}

func col(t Table, name string) (Column, error) {
for i := range t.Cols {
if t.Cols[i].Name == name {
return t.Cols[i], nil
}
}
return Column{}, fmt.Errorf("table %q doesn't contain column %q", t.Name, name)
}

func (ig Integration) Name() string { return ig.name }

func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} }
Expand Down Expand Up @@ -1065,6 +1082,9 @@ type Column struct {
type Table struct {
Name string `json:"name"`
Cols []Column `json:"columns"`

DisableUnique bool `json:"disable_unique"`
Unique [][]string `json:"unique"`
}

func (t *Table) Create(ctx context.Context, pg wpg.Conn) error {
Expand All @@ -1082,6 +1102,31 @@ func (t *Table) Create(ctx context.Context, pg wpg.Conn) error {
return err
}

func (t *Table) CreateUIDX(ctx context.Context, pg wpg.Conn) error {
if t.DisableUnique {
slog.InfoContext(ctx, "disable unique index", "table", t.Name)
return nil
}
var s strings.Builder
for _, cols := range t.Unique {
s.Reset()
const q = "create unique index if not exists u_%s on %s ("
s.WriteString(fmt.Sprintf(q, t.Name, t.Name))
for i, cname := range cols {
s.WriteString(cname)
if i+1 == len(cols) {
s.WriteString(")")
break
}
s.WriteString(",")
}
if _, err := pg.Exec(ctx, s.String()); err != nil {
return fmt.Errorf("creating index %s: %w", t.Name, err)
}
}
return nil
}

func Indexes(ctx context.Context, pg wpg.Conn, table string) []map[string]any {
const q = `
select indexname, indexdef
Expand Down
14 changes: 12 additions & 2 deletions abi2/abi2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,16 @@ func TestValidate_MissingCols(t *testing.T) {
},
},
}
const want = "validating columns: missing column for b"
diff.Test(t, t.Errorf, want, ig.validate().Error())
const want = "missing column for b"
diff.Test(t, t.Errorf, want, ig.validateCols().Error())
}

func TestAddUniqueIndex(t *testing.T) {
ig := Integration{}
ig.addRequiredFields()
ig.setCols()
ig.addUniqueIndex()
want := []string{"intg_name", "src_name", "block_num", "tx_idx"}
diff.Test(t, t.Fatalf, 1, len(ig.Table.Unique))
diff.Test(t, t.Errorf, want, ig.Table.Unique[0])
}
3 changes: 3 additions & 0 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,9 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) {
if err := aig.Table.Create(context.Background(), pgp); err != nil {
return nil, fmt.Errorf("create intg table: %w", err)
}
if err := aig.Table.CreateUIDX(context.Background(), pgp); err != nil {
return nil, fmt.Errorf("create intg unique index: %w", err)
}
return aig, nil
}
}
Expand Down

0 comments on commit 725d899

Please sign in to comment.