From d4be254f2cbd866f39db27b66870d292a1cf1d28 Mon Sep 17 00:00:00 2001 From: ryan smith Date: Tue, 9 Jul 2024 17:11:38 -0700 Subject: [PATCH] shovel: add AND based filtering via filter_agg --- dig/dig.go | 37 +++++++++++++++++++--------- indexsupply.com/shovel/docs/index.md | 16 +++++++++++- shovel/config/config.go | 7 ++++++ shovel/task.go | 2 +- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/dig/dig.go b/dig/dig.go index 132a771..343144b 100644 --- a/dig/dig.go +++ b/dig/dig.go @@ -660,6 +660,7 @@ type Integration struct { Block []BlockData Table wpg.Table Notification Notification + filterAGG string Columns []string coldefs []coldef @@ -683,7 +684,7 @@ const ( indexLog ) -func New(name string, ev Event, bd []BlockData, table wpg.Table, notif Notification) (Integration, error) { +func New(name string, ev Event, bd []BlockData, table wpg.Table, notif Notification, filterAGG string) (Integration, error) { ig := Integration{ name: name, Event: ev, @@ -691,6 +692,7 @@ func New(name string, ev Event, bd []BlockData, table wpg.Table, notif Notificat Table: table, Notification: notif, + filterAGG: strings.ToLower(filterAGG), numNotify: len(notif.Columns), numIndexed: ev.numIndexed(), resultCache: NewResult(ev.ABIType()), @@ -958,22 +960,35 @@ func (lwc *logWithCtx) get(name string) any { } type filterResults struct { - set bool - val bool + kind string + used bool + val bool } func (fr *filterResults) add(b bool) { - fr.set = true - if !fr.val { - fr.val = b + fr.used = true + switch fr.kind { + case "and": + if !b { + fr.val = false + } + default: + if b { + fr.val = true + } } } func (fr *filterResults) accept() bool { - if !fr.set { + if !fr.used { return true } - return fr.val + switch fr.kind { + case "and": + return !fr.val + default: + return fr.val + } } func (ig Integration) processTx(rows [][]any, lwc *logWithCtx, pgmut *sync.Mutex, pg wpg.Conn) ([][]any, bool, error) { @@ -981,7 +996,7 @@ func (ig Integration) processTx(rows [][]any, lwc *logWithCtx, pgmut *sync.Mutex case ig.numSelected > 0: return rows, false, nil case ig.numBDSelected > 0: - frs := filterResults{} + frs := filterResults{kind: ig.filterAGG} row := make([]any, len(ig.coldefs)) for i, def := range ig.coldefs { switch { @@ -1015,7 +1030,7 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute } for i := 0; i < ig.resultCache.Len(); i++ { ictr, actr := 1, 0 - frs := filterResults{} + frs := filterResults{kind: ig.filterAGG} row := make([]any, len(ig.coldefs)) for j, def := range ig.coldefs { switch { @@ -1052,7 +1067,7 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute } } default: - frs := filterResults{} + frs := filterResults{kind: ig.filterAGG} row := make([]any, len(ig.coldefs)) for i, def := range ig.coldefs { switch { diff --git a/indexsupply.com/shovel/docs/index.md b/indexsupply.com/shovel/docs/index.md index b38f2d8..aecad13 100644 --- a/indexsupply.com/shovel/docs/index.md +++ b/indexsupply.com/shovel/docs/index.md @@ -104,6 +104,7 @@ The following resources are automatically deployed on a main commit: On main but not yet associated with a new version tag. +- add integrations[].filter_agg to allow AND xor OR based filtering - add tx_max_fee_per_gas, tx_max_priority_fee_per_gas to tx indexing - add tx_nonce, tx_gas_price to tx indexing - fix multiple filters per block/event @@ -1068,7 +1069,13 @@ A common BTREE index is automatically created for the reference table's column. ### Multiple Filters -An integration can have multiple filters. Evaluation order is unspecified. An event, or transaction, is saved if one of the filters evaluates to `true`. In other words, the filters are evaluated and the results are combined using an `OR` operation. +An integration can have multiple filters. The integration has an optional `filter_agg` field that can be one of: `"and"`, `"or"`. + +When `filter_agg="or"` the Transaction/Event is saved when **any** of the filters evalutes true. + +When `filter_agg="and"` the Transaction/Event is saved when **all** the filters evaluate true. + + ### Filter Examples @@ -1333,6 +1340,7 @@ This JSON Config is the primary UI for Shovel. The Config object holds the datab - [`integrations[]`](#config-integrations) - [`name`](#config-integrations-name) - [`enabled`](#config-integrations-enabled) + - [`filter_agg`](#config-integrations-filter-agg) - [`sources`](#config-integrations-sources) - [`name`](#config-integrations-sources-name) - [`start`](#config-integrations-sources-start) @@ -1484,6 +1492,12 @@ A good, concise description of the integration. This value should not be changed Must be true or false. Shovel will only load integrations that are enabled. This is meant to be a quick way to disable an integration while keeping the config in the file. +### `integrations[].filter_agg` {#config-integrations-filter-agg .reference} + +Must be one of: `"and"`, `"or"`. Default: `"or"`. + +Determines outcome of multiple filters on an integration. Use `"or"` when at least one of the filters must be true in order for the Transaction/Event to be saved. Use `"and"` when all filters must be true in order for the Transaction/Event to be saved. + ### `integrations[].sources` {#config-integrations-sources .reference} An integration can reference many Sources by the Source's name. This is also the place to define the start/stop block numbers for the integration. diff --git a/shovel/config/config.go b/shovel/config/config.go index 9ac6b87..bdac2d7 100644 --- a/shovel/config/config.go +++ b/shovel/config/config.go @@ -79,6 +79,12 @@ func ValidateFix(conf *Root) error { return fmt.Errorf("checking config for filter_refs: %w", err) } for i := range conf.Integrations { + if conf.Integrations[i].FilterAGG == "" { + conf.Integrations[i].FilterAGG = "or" + } + if !slices.Contains([]string{"and", "or", ""}, conf.Integrations[i].FilterAGG) { + return fmt.Errorf("filter_agg must be one of: and, or. got: %s", conf.Integrations[i].FilterAGG) + } conf.Integrations[i].AddRequiredFields() AddUniqueIndex(&conf.Integrations[i].Table) if err := ValidateColRefs(conf.Integrations[i]); err != nil { @@ -418,6 +424,7 @@ type Integration struct { Enabled bool `json:"enabled"` Sources []Source `json:"sources"` Table wpg.Table `json:"table"` + FilterAGG string `json:"filter_agg"` Notification dig.Notification `json:"notification"` Compiled Compiled `json:"compiled"` Block []dig.BlockData `json:"block"` diff --git a/shovel/task.go b/shovel/task.go index 55c0c5f..d856b46 100644 --- a/shovel/task.go +++ b/shovel/task.go @@ -122,7 +122,7 @@ func NewDestination(ig config.Integration) (Destination, error) { } return dest, nil default: - dest, err := dig.New(ig.Name, ig.Event, ig.Block, ig.Table, ig.Notification) + dest, err := dig.New(ig.Name, ig.Event, ig.Block, ig.Table, ig.Notification, ig.FilterAGG) if err != nil { return nil, fmt.Errorf("building abi integration: %w", err) }