Skip to content

Commit

Permalink
All filter optimization (#7300)
Browse files Browse the repository at this point in the history
* Added basic all filter optimization

Signed-off-by: Calum Murray <[email protected]>

* Add another benchmark

Signed-off-by: Calum Murray <[email protected]>

* Used more descriptive names, made channel non blocking

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Sep 28, 2023
1 parent 88fe195 commit 55092a0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pkg/eventfilter/benchmarks/all_filter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func BenchmarkAllFilter(b *testing.B) {
arg: []eventfilter.Filter{filter, prefixFilterNoMatch, exactFilter2},
event: event,
},
FilterBenchmark{
name: "All filter with one non-matching filter at the end",
arg: []eventfilter.Filter{filter, exactFilter2, prefixFilterNoMatch},
event: event,
},
FilterBenchmark{
name: "All filter with all non-matching filters",
arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch},
Expand Down
2 changes: 2 additions & 0 deletions pkg/eventfilter/benchmarks/common_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func RunFilterBenchmarks(b *testing.B, filterCtor func(interface{}) eventfilter.
b.Run("Creation: "+fb.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
Filter = filterCtor(fb.arg)
Filter.Cleanup()
}
})
// Filter to use for the run
Expand All @@ -52,5 +53,6 @@ func RunFilterBenchmarks(b *testing.B, filterCtor func(interface{}) eventfilter.
Result = f.Filter(context.TODO(), fb.event)
}
})
f.Cleanup()
}
}
73 changes: 66 additions & 7 deletions pkg/eventfilter/subscriptionsapi/all_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,93 @@ package subscriptionsapi

import (
"context"
"sync"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/eventfilter"
)

type allFilter []eventfilter.Filter
type filterCount struct {
filter eventfilter.Filter
count atomic.Uint64
}

type allFilter struct {
filters []filterCount
rwMutex sync.RWMutex
indexChan chan int
doneChan chan bool
}

// NewAllFilter returns an event filter which passes if all the contained filters pass
func NewAllFilter(filters ...eventfilter.Filter) eventfilter.Filter {
return append(allFilter{}, filters...)
filterCounts := make([]filterCount, len(filters))
for i, filter := range filters {
filterCounts[i] = filterCount{
count: *atomic.NewUint64(uint64(0)),
filter: filter,
}
}

f := &allFilter{
filters: filterCounts,
indexChan: make(chan int, 1),
doneChan: make(chan bool),
}
go f.optimizeLoop()
return f
}

func (filter allFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult {
func (filter *allFilter) Filter(ctx context.Context, event cloudevents.Event) eventfilter.FilterResult {
res := eventfilter.NoFilter
logging.FromContext(ctx).Debugw("Performing an ALL match ", zap.Any("filters", filter), zap.Any("event", event))
for _, f := range filter {
res = res.And(f.Filter(ctx, event))
filter.rwMutex.RLock()
defer filter.rwMutex.RUnlock()
for i, f := range filter.filters {
res = res.And(f.filter.Filter(ctx, event))
// Short circuit to optimize it
if res == eventfilter.FailFilter {
select {
// don't block if the channel is full
case filter.indexChan <- i:
default:
}
return eventfilter.FailFilter
}
}
return res
}

func (filter allFilter) Cleanup() {}
func (filter *allFilter) optimizeLoop() {
for {
i, more := <-filter.indexChan
if !more {
filter.doneChan <- true
return
}
val := filter.filters[i].count.Inc()
if i != 0 && val > filter.filters[i-1].count.Load()*2 {
go filter.swapWithEarlierFilter(i)
}
}
}

func (filter *allFilter) swapWithEarlierFilter(swapIdx int) {
filter.rwMutex.Lock()
defer filter.rwMutex.Unlock()
filter.filters[swapIdx-1], filter.filters[swapIdx] = filter.filters[swapIdx], filter.filters[swapIdx-1]
}

func (filter *allFilter) Cleanup() {
close(filter.indexChan)
<-filter.doneChan
for _, f := range filter.filters {
f.filter.Cleanup()
}
}

var _ eventfilter.Filter = allFilter{}
var _ eventfilter.Filter = &allFilter{}

0 comments on commit 55092a0

Please sign in to comment.