diff --git a/abi2/abi2.go b/abi2/abi2.go index f9da5d69..c4dbeb68 100644 --- a/abi2/abi2.go +++ b/abi2/abi2.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "strconv" "strings" "unicode" @@ -599,45 +600,17 @@ func New(name string, ev Event, bd []BlockData, table Table) (Integration, error resultCache: NewResult(ev.ABIType()), sighash: ev.SignatureHash(), } - ig.addRequiredFields() - if err := ig.validate(); err != nil { - return ig, fmt.Errorf("validating %s: %w", name, err) - } - for _, input := range ev.Selected() { - c, err := col(ig.Table, input.Column) - if err != nil { - return ig, err - } - ig.Columns = append(ig.Columns, c.Name) - ig.coldefs = append(ig.coldefs, coldef{ - Input: input, - Column: c, - }) - ig.numSelected++ - } - for _, data := range ig.Block { - c, err := col(ig.Table, data.Column) - if err != nil { - return ig, err - } - ig.Columns = append(ig.Columns, c.Name) - ig.coldefs = append(ig.coldefs, coldef{ - BlockData: data, - Column: c, - }) - ig.numBDSelected++ + if err := ig.validateSQL(); err != nil { + return ig, fmt.Errorf("validating sql input: %w", err) } - return ig, nil -} -func (ig *Integration) validate() error { + ig.addRequiredFields() if err := ig.validateCols(); err != nil { - return fmt.Errorf("validating columns: %w", err) + return ig, fmt.Errorf("validating columns: %w", err) } - if err := ig.validateSQL(); err != nil { - return fmt.Errorf("validating sql input: %w", err) - } - return nil + ig.setCols() + ig.addUniqueIndex() + return ig, nil } func (ig *Integration) validateSQL() error { @@ -716,6 +689,59 @@ func (ig *Integration) validateCols() error { return nil } +func (ig *Integration) setCols() { + getCol := func(name string) Column { + for _, c := range ig.Table.Cols { + if c.Name == name { + return c + } + } + return Column{} + } + for _, input := range ig.Event.Selected() { + c := getCol(input.Column) + ig.Columns = append(ig.Columns, c.Name) + ig.coldefs = append(ig.coldefs, coldef{ + Input: input, + Column: c, + }) + ig.numSelected++ + } + for _, bd := range ig.Block { + c := getCol(bd.Column) + ig.Columns = append(ig.Columns, c.Name) + ig.coldefs = append(ig.coldefs, coldef{ + BlockData: bd, + Column: c, + }) + ig.numBDSelected++ + } +} + +// sets default unique columns unless already set by user +func (ig *Integration) addUniqueIndex() { + if len(ig.Table.Unique) > 0 { + return + } + possible := []string{ + "intg_name", + "src_name", + "block_num", + "tx_idx", + "log_idx", + "abi_idx", + } + var uidx []string + for _, name := range possible { + if slices.Contains(ig.Columns, name) { + uidx = append(uidx, name) + } + } + if len(uidx) > 0 { + ig.Table.Unique = append(ig.Table.Unique, uidx) + } +} + func (ig *Integration) addRequiredFields() { hasBD := func(name string) bool { for _, bd := range ig.Block { @@ -766,15 +792,6 @@ func validateString(s string) error { return nil } -func col(t Table, name string) (Column, error) { - for i := range t.Cols { - if t.Cols[i].Name == name { - return t.Cols[i], nil - } - } - return Column{}, fmt.Errorf("table %q doesn't contain column %q", t.Name, name) -} - func (ig Integration) Name() string { return ig.name } func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} } @@ -1065,6 +1082,9 @@ type Column struct { type Table struct { Name string `json:"name"` Cols []Column `json:"columns"` + + DisableUnique bool `json:"disable_unique"` + Unique [][]string `json:"unique"` } func (t *Table) Create(ctx context.Context, pg wpg.Conn) error { @@ -1082,6 +1102,31 @@ func (t *Table) Create(ctx context.Context, pg wpg.Conn) error { return err } +func (t *Table) CreateUIDX(ctx context.Context, pg wpg.Conn) error { + if t.DisableUnique { + slog.InfoContext(ctx, "disable unique index", "table", t.Name) + return nil + } + var s strings.Builder + for _, cols := range t.Unique { + s.Reset() + const q = "create unique index if not exists u_%s on %s (" + s.WriteString(fmt.Sprintf(q, t.Name, t.Name)) + for i, cname := range cols { + s.WriteString(cname) + if i+1 == len(cols) { + s.WriteString(")") + break + } + s.WriteString(",") + } + if _, err := pg.Exec(ctx, s.String()); err != nil { + return fmt.Errorf("creating index %s: %w", t.Name, err) + } + } + return nil +} + func Indexes(ctx context.Context, pg wpg.Conn, table string) []map[string]any { const q = ` select indexname, indexdef diff --git a/abi2/abi2_test.go b/abi2/abi2_test.go index 8e8dc7fb..063419c7 100644 --- a/abi2/abi2_test.go +++ b/abi2/abi2_test.go @@ -399,6 +399,16 @@ func TestValidate_MissingCols(t *testing.T) { }, }, } - const want = "validating columns: missing column for b" - diff.Test(t, t.Errorf, want, ig.validate().Error()) + const want = "missing column for b" + diff.Test(t, t.Errorf, want, ig.validateCols().Error()) +} + +func TestAddUniqueIndex(t *testing.T) { + ig := Integration{} + ig.addRequiredFields() + ig.setCols() + ig.addUniqueIndex() + want := []string{"intg_name", "src_name", "block_num", "tx_idx"} + diff.Test(t, t.Fatalf, 1, len(ig.Table.Unique)) + diff.Test(t, t.Errorf, want, ig.Table.Unique[0]) } diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 41e350b8..b1821996 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -1023,6 +1023,9 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) { if err := aig.Table.Create(context.Background(), pgp); err != nil { return nil, fmt.Errorf("create intg table: %w", err) } + if err := aig.Table.CreateUIDX(context.Background(), pgp); err != nil { + return nil, fmt.Errorf("create intg unique index: %w", err) + } return aig, nil } }