diff --git a/dig/dig.go b/dig/dig.go index 68eaffc..9975108 100644 --- a/dig/dig.go +++ b/dig/dig.go @@ -386,10 +386,11 @@ type Filter struct { Ref Ref `json:"filter_ref"` } -func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d any) (bool, error) { +func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d any, frs *filterResults) error { if len(f.Arg) == 0 && len(f.Ref.Integration) == 0 { - return true, nil + return nil } + switch v := d.(type) { case eth.Bytes: d = []byte(v) @@ -398,9 +399,9 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an } switch v := d.(type) { case []byte: + var res bool switch { case strings.HasSuffix(f.Op, "contains"): - var res bool switch { case len(f.Ref.Table) > 0: q := fmt.Sprintf( @@ -416,7 +417,7 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an res = false case err != nil: const tag = "filter using reference (%s %s): %w" - return false, fmt.Errorf(tag, f.Ref.Table, f.Ref.Column, err) + return fmt.Errorf(tag, f.Ref.Table, f.Ref.Column, err) } default: for i := range f.Arg { @@ -427,11 +428,10 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an } } if strings.HasPrefix(f.Op, "!") { - return !res, nil + res = !res } - return res, nil + frs.add(res) case f.Op == "eq" || f.Op == "ne": - var res bool for i := range f.Arg { if bytes.Contains(v, eth.DecodeHex(f.Arg[i])) { res = true @@ -439,55 +439,55 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an } } if f.Op == "ne" { - return !res, nil + res = !res } - return res, nil default: - return true, nil + res = true } + frs.add(res) case string: switch f.Op { case "contains": - return slices.Contains(f.Arg, v), nil + frs.add(slices.Contains(f.Arg, v)) case "!contains": - return !slices.Contains(f.Arg, v), nil + frs.add(!slices.Contains(f.Arg, v)) case "eq": - return v == f.Arg[0], nil + frs.add(v == f.Arg[0]) case "ne": - return v != f.Arg[0], nil + frs.add(v != f.Arg[0]) } case uint64: i, err := strconv.ParseUint(f.Arg[0], 10, 64) if err != nil { - return false, fmt.Errorf("unable to convert filter arg to int: %q", f.Arg[0]) + return fmt.Errorf("unable to convert filter arg to int: %q", f.Arg[0]) } switch f.Op { case "eq": - return v == i, nil + frs.add(v == i) case "ne": - return v != i, nil + frs.add(v != i) case "gt": - return v > i, nil + frs.add(v > i) case "lt": - return v < i, nil + frs.add(v < i) } case *uint256.Int: i := &uint256.Int{} if err := i.SetFromDecimal(f.Arg[0]); err != nil { - return false, fmt.Errorf("unable to convert filter arg dec to uint256: %q", f.Arg[0]) + return fmt.Errorf("unable to convert filter arg dec to uint256: %q", f.Arg[0]) } switch f.Op { case "eq": - return v.Cmp(i) == 0, nil + frs.add(v.Cmp(i) == 0) case "ne": - return v.Cmp(i) != 0, nil + frs.add(v.Cmp(i) != 0) case "gt": - return v.Cmp(i) == 1, nil + frs.add(v.Cmp(i) == 1) case "lt": - return v.Cmp(i) == -1, nil + frs.add(v.Cmp(i) == -1) } } - return true, nil + return nil } func parseArray(elm atype, s string) atype { @@ -949,29 +949,47 @@ func (lwc *logWithCtx) get(name string) any { } } +type filterResults struct { + set bool + val bool +} + +func (fr *filterResults) add(b bool) { + fr.set = true + if !fr.val { + fr.val = b + } +} + +func (fr *filterResults) accept() bool { + if !fr.set { + return true + } + return fr.val +} + func (ig Integration) processTx(rows [][]any, lwc *logWithCtx, pgmut *sync.Mutex, pg wpg.Conn) ([][]any, bool, error) { switch { case ig.numSelected > 0: return rows, false, nil case ig.numBDSelected > 0: + frs := filterResults{} row := make([]any, len(ig.coldefs)) for i, def := range ig.coldefs { switch { case !def.BlockData.Empty(): d := lwc.get(def.BlockData.Name) - accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, false, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, true, nil - } row[i] = d default: return rows, false, fmt.Errorf("expected only blockdata coldef") } } - rows = append(rows, row) + if frs.accept() { + rows = append(rows, row) + } } return rows, true, nil } @@ -989,18 +1007,15 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute } for i := 0; i < ig.resultCache.Len(); i++ { ictr, actr := 1, 0 + frs := filterResults{} row := make([]any, len(ig.coldefs)) for j, def := range ig.coldefs { switch { case def.Input.Indexed: d := dbtype(def.Input.Type, lwc.l.Topics[ictr]) - accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, nil - } row[j] = d ictr++ case !def.BlockData.Empty(): @@ -1010,59 +1025,48 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute d = i default: d = lwc.get(def.BlockData.Name) - accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, nil - } } row[j] = d default: d := dbtype(def.Input.Type, ig.resultCache.At(i)[actr]) - accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, nil - } row[j] = d actr++ } } - rows = append(rows, row) + if frs.accept() { + rows = append(rows, row) + } } default: + frs := filterResults{} row := make([]any, len(ig.coldefs)) for i, def := range ig.coldefs { switch { case def.Input.Indexed: d := dbtype(def.Input.Type, lwc.l.Topics[1+i]) - accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, nil - } row[i] = d case !def.BlockData.Empty(): d := lwc.get(def.BlockData.Name) - accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d) - if err != nil { + if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil { return nil, fmt.Errorf("checking filter: %w", err) } - if !accept { - return rows, nil - } row[i] = d default: return nil, fmt.Errorf("no rows for un-indexed data") } } - rows = append(rows, row) + if frs.accept() { + rows = append(rows, row) + } } return rows, nil } diff --git a/dig/dig_test.go b/dig/dig_test.go index e21f58a..570cdc5 100644 --- a/dig/dig_test.go +++ b/dig/dig_test.go @@ -457,8 +457,9 @@ func TestFilter(t *testing.T) { }, } for _, c := range cases { - got, err := c.f.Accept(context.Background(), mt, pg, c.d) + frs := filterResults{} + err := c.f.Accept(context.Background(), mt, pg, c.d, &frs) tc.NoErr(t, err) - tc.WantGot(t, c.want, got) + tc.WantGot(t, c.want, frs.accept()) } } diff --git a/indexsupply.com/shovel/docs/index.md b/indexsupply.com/shovel/docs/index.md index ee9e15e..985b505 100644 --- a/indexsupply.com/shovel/docs/index.md +++ b/indexsupply.com/shovel/docs/index.md @@ -104,6 +104,7 @@ The following resources are automatically deployed on a main commit: On main but not yet associated with a new version tag. +- fix multiple filters per block/event - fix filter operations on trace_action_value - empty decoded bytes are stored as an empty byte array instead of NULL - accept multiple URLs per source for redundancy