Skip to content

Commit

Permalink
shovel: add AND based filtering via filter_agg
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandotsmith committed Jul 10, 2024
1 parent 57fd7e5 commit d4be254
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 13 deletions.
37 changes: 26 additions & 11 deletions dig/dig.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ type Integration struct {
Block []BlockData
Table wpg.Table
Notification Notification
filterAGG string

Columns []string
coldefs []coldef
Expand All @@ -683,14 +684,15 @@ const (
indexLog
)

func New(name string, ev Event, bd []BlockData, table wpg.Table, notif Notification) (Integration, error) {
func New(name string, ev Event, bd []BlockData, table wpg.Table, notif Notification, filterAGG string) (Integration, error) {
ig := Integration{
name: name,
Event: ev,
Block: bd,
Table: table,
Notification: notif,

filterAGG: strings.ToLower(filterAGG),
numNotify: len(notif.Columns),
numIndexed: ev.numIndexed(),
resultCache: NewResult(ev.ABIType()),
Expand Down Expand Up @@ -958,30 +960,43 @@ func (lwc *logWithCtx) get(name string) any {
}

type filterResults struct {
set bool
val bool
kind string
used bool
val bool
}

func (fr *filterResults) add(b bool) {
fr.set = true
if !fr.val {
fr.val = b
fr.used = true
switch fr.kind {
case "and":
if !b {
fr.val = false
}
default:
if b {
fr.val = true
}
}
}

func (fr *filterResults) accept() bool {
if !fr.set {
if !fr.used {
return true
}
return fr.val
switch fr.kind {
case "and":
return !fr.val
default:
return fr.val
}
}

func (ig Integration) processTx(rows [][]any, lwc *logWithCtx, pgmut *sync.Mutex, pg wpg.Conn) ([][]any, bool, error) {
switch {
case ig.numSelected > 0:
return rows, false, nil
case ig.numBDSelected > 0:
frs := filterResults{}
frs := filterResults{kind: ig.filterAGG}
row := make([]any, len(ig.coldefs))
for i, def := range ig.coldefs {
switch {
Expand Down Expand Up @@ -1015,7 +1030,7 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute
}
for i := 0; i < ig.resultCache.Len(); i++ {
ictr, actr := 1, 0
frs := filterResults{}
frs := filterResults{kind: ig.filterAGG}
row := make([]any, len(ig.coldefs))
for j, def := range ig.coldefs {
switch {
Expand Down Expand Up @@ -1052,7 +1067,7 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute
}
}
default:
frs := filterResults{}
frs := filterResults{kind: ig.filterAGG}
row := make([]any, len(ig.coldefs))
for i, def := range ig.coldefs {
switch {
Expand Down
16 changes: 15 additions & 1 deletion indexsupply.com/shovel/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The following resources are automatically deployed on a main commit:

On main but not yet associated with a new version tag.

- add integrations[].filter_agg to allow AND xor OR based filtering
- add tx_max_fee_per_gas, tx_max_priority_fee_per_gas to tx indexing
- add tx_nonce, tx_gas_price to tx indexing
- fix multiple filters per block/event
Expand Down Expand Up @@ -1068,7 +1069,13 @@ A common BTREE index is automatically created for the reference table's column.

### Multiple Filters

An integration can have multiple filters. Evaluation order is unspecified. An event, or transaction, is saved if one of the filters evaluates to `true`. In other words, the filters are evaluated and the results are combined using an `OR` operation.
An integration can have multiple filters. The integration has an optional `filter_agg` field that can be one of: `"and"`, `"or"`.

When `filter_agg="or"` the Transaction/Event is saved when **any** of the filters evalutes true.

When `filter_agg="and"` the Transaction/Event is saved when **all** the filters evaluate true.



### Filter Examples

Expand Down Expand Up @@ -1333,6 +1340,7 @@ This JSON Config is the primary UI for Shovel. The Config object holds the datab
- [`integrations[]`](#config-integrations)
- [`name`](#config-integrations-name)
- [`enabled`](#config-integrations-enabled)
- [`filter_agg`](#config-integrations-filter-agg)
- [`sources`](#config-integrations-sources)
- [`name`](#config-integrations-sources-name)
- [`start`](#config-integrations-sources-start)
Expand Down Expand Up @@ -1484,6 +1492,12 @@ A good, concise description of the integration. This value should not be changed

Must be true or false. Shovel will only load integrations that are enabled. This is meant to be a quick way to disable an integration while keeping the config in the file.

### `integrations[].filter_agg` {#config-integrations-filter-agg .reference}

Must be one of: `"and"`, `"or"`. Default: `"or"`.

Determines outcome of multiple filters on an integration. Use `"or"` when at least one of the filters must be true in order for the Transaction/Event to be saved. Use `"and"` when all filters must be true in order for the Transaction/Event to be saved.

### `integrations[].sources` {#config-integrations-sources .reference}

An integration can reference many Sources by the Source's name. This is also the place to define the start/stop block numbers for the integration.
Expand Down
7 changes: 7 additions & 0 deletions shovel/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func ValidateFix(conf *Root) error {
return fmt.Errorf("checking config for filter_refs: %w", err)
}
for i := range conf.Integrations {
if conf.Integrations[i].FilterAGG == "" {
conf.Integrations[i].FilterAGG = "or"
}
if !slices.Contains([]string{"and", "or", ""}, conf.Integrations[i].FilterAGG) {
return fmt.Errorf("filter_agg must be one of: and, or. got: %s", conf.Integrations[i].FilterAGG)
}
conf.Integrations[i].AddRequiredFields()
AddUniqueIndex(&conf.Integrations[i].Table)
if err := ValidateColRefs(conf.Integrations[i]); err != nil {
Expand Down Expand Up @@ -418,6 +424,7 @@ type Integration struct {
Enabled bool `json:"enabled"`
Sources []Source `json:"sources"`
Table wpg.Table `json:"table"`
FilterAGG string `json:"filter_agg"`
Notification dig.Notification `json:"notification"`
Compiled Compiled `json:"compiled"`
Block []dig.BlockData `json:"block"`
Expand Down
2 changes: 1 addition & 1 deletion shovel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewDestination(ig config.Integration) (Destination, error) {
}
return dest, nil
default:
dest, err := dig.New(ig.Name, ig.Event, ig.Block, ig.Table, ig.Notification)
dest, err := dig.New(ig.Name, ig.Event, ig.Block, ig.Table, ig.Notification, ig.FilterAGG)
if err != nil {
return nil, fmt.Errorf("building abi integration: %w", err)
}
Expand Down

0 comments on commit d4be254

Please sign in to comment.