diff --git a/pkg/eventfilter/benchmarks/all_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/all_filter_benchmark_test.go index 1c974b3098b..00ebfdfcebb 100644 --- a/pkg/eventfilter/benchmarks/all_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/all_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -56,59 +57,59 @@ func BenchmarkAllFilter(b *testing.B) { return subscriptionsapi.NewAllFilter(filters...) }, FilterBenchmark{ - name: "All filter with exact filter test", - arg: []eventfilter.Filter{filter}, - event: event, + name: "All filter with exact filter test", + arg: []eventfilter.Filter{filter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter match all subfilters", - arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter}, - event: event, + name: "All filter match all subfilters", + arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter no 1 match at end of array", - arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter}, - event: event, + name: "All filter no 1 match at end of array", + arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter no 1 match at start of array", - arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch}, - event: event, + name: "All filter no 1 match at start of array", + arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with multiple exact filters that match", - arg: []eventfilter.Filter{filter, exactFilter2, exactFilter3}, - event: event, + name: "All filter with multiple exact filters that match", + arg: []eventfilter.Filter{filter, exactFilter2, exactFilter3}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with one non-matching filter in the middle", - arg: []eventfilter.Filter{filter, prefixFilterNoMatch, exactFilter2}, - event: event, + name: "All filter with one non-matching filter in the middle", + arg: []eventfilter.Filter{filter, prefixFilterNoMatch, exactFilter2}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with one non-matching filter at the end", - arg: []eventfilter.Filter{filter, exactFilter2, prefixFilterNoMatch}, - event: event, + name: "All filter with all non-matching filters", + arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with all non-matching filters", - arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch}, - event: event, + name: "All filter with one non-matching filter at the end", + arg: []eventfilter.Filter{filter, exactFilter2, prefixFilterNoMatch}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with large number of sub-filters that match", - arg: largeMatchingFilters, - event: event, + name: "All filter with large number of sub-filters that match", + arg: largeMatchingFilters, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with large number of sub-filters that do not match", - arg: largeNonMatchingFilters, - event: event, + name: "All filter with large number of sub-filters that do not match", + arg: largeNonMatchingFilters, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "All filter with alternating matching and non-matching filters", - arg: alternatingFilters, - event: event, + name: "All filter with alternating matching and non-matching filters", + arg: alternatingFilters, + events: []cloudevents.Event{event}, }, ) diff --git a/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go index 8386eabceba..08319608354 100644 --- a/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -27,6 +28,8 @@ import ( func BenchmarkAnyFilter(b *testing.B) { // Full event with all possible fields filled event := cetest.FullEvent() + otherEvent := cetest.FullEvent() + otherEvent.SetType("qwertyuiop") filter, _ := subscriptionsapi.NewExactFilter(map[string]string{"id": event.ID()}) prefixFilter, _ := subscriptionsapi.NewPrefixFilter(map[string]string{"type": event.Type()[0:5]}) @@ -40,24 +43,34 @@ func BenchmarkAnyFilter(b *testing.B) { return subscriptionsapi.NewAnyFilter(filters...) }, FilterBenchmark{ - name: "Any filter with exact filter test", - arg: []eventfilter.Filter{filter}, - event: event, + name: "Any filter with exact filter test", + arg: []eventfilter.Filter{filter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "Any filter match all subfilters", - arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter}, - event: event, + name: "Any filter match all subfilters", + arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "Any filter no 1 match at end of array", - arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter}, - event: event, + name: "Any filter no 1 match at end of array", + arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "Any filter no 1 match at start of array", - arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch}, - event: event, + name: "Any filter no 1 match at start of array", + arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch}, + events: []cloudevents.Event{event}, + }, + FilterBenchmark{ + name: "Any filter 2 events match 2 different filters", + arg: []eventfilter.Filter{prefixFilter, prefixFilterNoMatch}, + events: []cloudevents.Event{event, otherEvent}, + }, + FilterBenchmark{ + name: "Any filter 2 events match 2 different filters, one filter in front which matches neither", + arg: []eventfilter.Filter{suffixFilterNoMatch, prefixFilter, prefixFilterNoMatch}, + events: []cloudevents.Event{event, otherEvent}, }, ) } diff --git a/pkg/eventfilter/benchmarks/attributes_benchmark_test.go b/pkg/eventfilter/benchmarks/attributes_benchmark_test.go index 7aface791ee..14b04f15dcf 100644 --- a/pkg/eventfilter/benchmarks/attributes_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/attributes_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" @@ -33,9 +34,9 @@ func BenchmarkAttributesFilter(b *testing.B) { return attributes.NewAttributesFilter(i.(map[string]string)) }, FilterBenchmark{ - name: "Pass with exact match of id", - arg: map[string]string{"id": event.ID()}, - event: event, + name: "Pass with exact match of id", + arg: map[string]string{"id": event.ID()}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ // We don't test time because the exact match on the string apparently doesn't work well, @@ -49,7 +50,7 @@ func BenchmarkAttributesFilter(b *testing.B) { "datacontenttype": event.DataContentType(), "subject": event.Subject(), }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "No pass with exact match of id and source", @@ -57,7 +58,7 @@ func BenchmarkAttributesFilter(b *testing.B) { "id": "qwertyuiopasdfghjklzxcvbnm", "source": "qwertyuiopasdfghjklzxcvbnm", }, - event: event, + events: []cloudevents.Event{event}, }, ) } diff --git a/pkg/eventfilter/benchmarks/common_benchmark_test.go b/pkg/eventfilter/benchmarks/common_benchmark_test.go index 2bca36b0583..5f360031be5 100644 --- a/pkg/eventfilter/benchmarks/common_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/common_benchmark_test.go @@ -26,9 +26,9 @@ import ( ) type FilterBenchmark struct { - name string - arg interface{} - event cloudevents.Event + name string + arg interface{} + events []cloudevents.Event } // Avoid DCE @@ -40,7 +40,7 @@ var Result eventfilter.FilterResult // 2. "Run: ..." benchmark measures the time/mem to execute the filter, given a pre-built filter instance and the provided event func RunFilterBenchmarks(b *testing.B, filterCtor func(interface{}) eventfilter.Filter, filterBenchmarks ...FilterBenchmark) { for _, fb := range filterBenchmarks { - b.Run("Creation: "+fb.name, func(b *testing.B) { + b.Run("Creation and teardown: "+fb.name, func(b *testing.B) { for i := 0; i < b.N; i++ { Filter = filterCtor(fb.arg) Filter.Cleanup() @@ -48,9 +48,10 @@ func RunFilterBenchmarks(b *testing.B, filterCtor func(interface{}) eventfilter. }) // Filter to use for the run f := filterCtor(fb.arg) + n := len(fb.events) b.Run("Run: "+fb.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - Result = f.Filter(context.TODO(), fb.event) + Result = f.Filter(context.TODO(), fb.events[i%n]) } }) f.Cleanup() diff --git a/pkg/eventfilter/benchmarks/exact_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/exact_filter_benchmark_test.go index dfd8b3ee463..4e1b91a6e4d 100644 --- a/pkg/eventfilter/benchmarks/exact_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/exact_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -37,9 +38,9 @@ func BenchmarkExactFilter(b *testing.B) { return filter }, FilterBenchmark{ - name: "Pass with exact match of id", - arg: map[string]string{"id": event.ID()}, - event: event, + name: "Pass with exact match of id", + arg: map[string]string{"id": event.ID()}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ // We don't test time because the exact match on the string apparently doesn't work well, @@ -53,7 +54,7 @@ func BenchmarkExactFilter(b *testing.B) { "datacontenttype": event.DataContentType(), "subject": event.Subject(), }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "No pass with exact match of id and source", @@ -61,7 +62,7 @@ func BenchmarkExactFilter(b *testing.B) { "id": "qwertyuiopasdfghjklzxcvbnm", "source": "qwertyuiopasdfghjklzxcvbnm", }, - event: event, + events: []cloudevents.Event{event}, }, ) } diff --git a/pkg/eventfilter/benchmarks/not_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/not_filter_benchmark_test.go index 08a65c51da4..6488dc229c8 100644 --- a/pkg/eventfilter/benchmarks/not_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/not_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -37,19 +38,19 @@ func BenchmarkNotFilter(b *testing.B) { return subscriptionsapi.NewNotFilter(i.(eventfilter.Filter)) }, FilterBenchmark{ - name: "Not filter with exact filter test", - arg: filter, - event: event, + name: "Not filter with exact filter test", + arg: filter, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "Not filter with prefix filter test", - arg: prefixFilter, - event: event, + name: "Not filter with prefix filter test", + arg: prefixFilter, + events: []cloudevents.Event{event}, }, FilterBenchmark{ - name: "Not filter with suffix filter test", - arg: suffixFilter, - event: event, + name: "Not filter with suffix filter test", + arg: suffixFilter, + events: []cloudevents.Event{event}, }, ) } diff --git a/pkg/eventfilter/benchmarks/prefix_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/prefix_filter_benchmark_test.go index b36df0091a6..cfedac294c5 100644 --- a/pkg/eventfilter/benchmarks/prefix_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/prefix_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -37,9 +38,9 @@ func BenchmarkPrefixFilter(b *testing.B) { return filter }, FilterBenchmark{ - name: "Pass with prefix match of id", - arg: map[string]string{"id": event.ID()[0:5]}, - event: event, + name: "Pass with prefix match of id", + arg: map[string]string{"id": event.ID()[0:5]}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "Pass with prefix match of all context attributes", @@ -51,7 +52,7 @@ func BenchmarkPrefixFilter(b *testing.B) { "datacontenttype": event.DataContentType()[0:5], "subject": event.Subject()[0:5], }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "Pass with prefix match of all context attributes", @@ -63,7 +64,7 @@ func BenchmarkPrefixFilter(b *testing.B) { "datacontenttype": event.DataContentType()[0:3], "subject": event.Subject()[0:3], }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "No pass with prefix match of id and source", @@ -71,7 +72,7 @@ func BenchmarkPrefixFilter(b *testing.B) { "id": "qwertyuiopasdfghjklzxcvbnm", "source": "qwertyuiopasdfghjklzxcvbnm", }, - event: event, + events: []cloudevents.Event{event}, }, ) } diff --git a/pkg/eventfilter/benchmarks/suffix_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/suffix_filter_benchmark_test.go index 867d9ffd532..1e688c9cdf0 100644 --- a/pkg/eventfilter/benchmarks/suffix_filter_benchmark_test.go +++ b/pkg/eventfilter/benchmarks/suffix_filter_benchmark_test.go @@ -19,6 +19,7 @@ package benchmarks import ( "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" cetest "github.com/cloudevents/sdk-go/v2/test" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" @@ -37,9 +38,9 @@ func BenchmarkSuffixFilter(b *testing.B) { return filter }, FilterBenchmark{ - name: "Pass with suffix match of id", - arg: map[string]string{"id": event.ID()[len(event.ID())-5:]}, - event: event, + name: "Pass with suffix match of id", + arg: map[string]string{"id": event.ID()[len(event.ID())-5:]}, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "Pass with suffix match of all context attributes", @@ -51,7 +52,7 @@ func BenchmarkSuffixFilter(b *testing.B) { "datacontenttype": event.DataContentType()[len(event.DataContentType())-5:], "subject": event.Subject()[len(event.Subject())-5:], }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "Pass with suffix match of all context attributes", @@ -63,7 +64,7 @@ func BenchmarkSuffixFilter(b *testing.B) { "datacontenttype": event.DataContentType()[len(event.DataContentType())-3:], "subject": event.Subject()[len(event.Subject())-3:], }, - event: event, + events: []cloudevents.Event{event}, }, FilterBenchmark{ name: "No pass with suffix match of id and source", @@ -71,7 +72,7 @@ func BenchmarkSuffixFilter(b *testing.B) { "id": "qwertyuiopasdfghjklzxcvbnm", "source": "qwertyuiopasdfghjklzxcvbnm", }, - event: event, + events: []cloudevents.Event{event}, }, ) } diff --git a/pkg/eventfilter/subscriptionsapi/any_filter.go b/pkg/eventfilter/subscriptionsapi/any_filter.go index c716f909bf2..cd95fcfc418 100644 --- a/pkg/eventfilter/subscriptionsapi/any_filter.go +++ b/pkg/eventfilter/subscriptionsapi/any_filter.go @@ -18,35 +18,87 @@ package subscriptionsapi import ( "context" + "sync" cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/atomic" "go.uber.org/zap" "knative.dev/pkg/logging" "knative.dev/eventing/pkg/eventfilter" ) -// AnyFilter runs each filter and performs an Or -type anyFilter []eventfilter.Filter +type anyFilter struct { + filters []filterCount + rwMutex sync.RWMutex + indexChan chan int + doneChan chan bool +} // NewAnyFilter returns an event filter which passes if any of the contained filters passes. func NewAnyFilter(filters ...eventfilter.Filter) eventfilter.Filter { - return append(anyFilter{}, filters...) + filterCounts := make([]filterCount, len(filters)) + for i, filter := range filters { + filterCounts[i] = filterCount{ + count: *atomic.NewUint64(uint64(0)), + filter: filter, + } + } + f := &anyFilter{ + filters: filterCounts, + indexChan: make(chan int, 1), + doneChan: make(chan bool), + } + go f.optimzeLoop() // this goroutine is cleaned up when the Cleanup() method is called on the any filter + return f } -func (filter anyFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { +func (filter *anyFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult { res := eventfilter.NoFilter logging.FromContext(ctx).Debugw("Performing an ANY match ", zap.Any("filters", filter), zap.Any("event", event)) - for _, f := range filter { - res = res.Or(f.Filter(ctx, event)) + filter.rwMutex.RLock() + defer filter.rwMutex.RUnlock() + for i, f := range filter.filters { + res = res.Or(f.filter.Filter(ctx, event)) // Short circuit to optimize it if res == eventfilter.PassFilter { + select { + // don't block if the channel is full + case filter.indexChan <- i: + default: + } return eventfilter.PassFilter } } return res } -func (filter anyFilter) Cleanup() {} +func (filter *anyFilter) Cleanup() { + close(filter.indexChan) + <-filter.doneChan + for _, f := range filter.filters { + f.filter.Cleanup() + } +} + +func (filter *anyFilter) optimzeLoop() { + for { + i, more := <-filter.indexChan + if !more { + filter.doneChan <- true + return + } + val := filter.filters[i].count.Inc() + if i != 0 && val > filter.filters[i-1].count.Load()*2 { + go filter.swapWithEarlierFilter(i) + } + } +} + +func (filter *anyFilter) swapWithEarlierFilter(swapIdx int) { + filter.rwMutex.Lock() + defer filter.rwMutex.Unlock() + filter.filters[swapIdx-1], filter.filters[swapIdx] = filter.filters[swapIdx], filter.filters[swapIdx-1] +} var _ eventfilter.Filter = &anyFilter{}