From df4f69c0a528c9ad3a22de5ef53dedf6a247bf40 Mon Sep 17 00:00:00 2001 From: Ehsan Noureddin Moosa Date: Wed, 15 Nov 2023 20:53:44 +0300 Subject: [PATCH] [kit] add batcher to utils package --- kit/utils/batch/batcher.go | 172 ++++++++++++++++++++++++++++++++ kit/utils/batch/batcher_test.go | 137 +++++++++++++++++++++++++ kit/utils/batch/entry.go | 40 ++++++++ kit/utils/batch/export_test.go | 9 ++ kit/utils/batch/option.go | 44 ++++++++ 5 files changed, 402 insertions(+) create mode 100644 kit/utils/batch/batcher.go create mode 100644 kit/utils/batch/batcher_test.go create mode 100644 kit/utils/batch/entry.go create mode 100644 kit/utils/batch/export_test.go create mode 100644 kit/utils/batch/option.go diff --git a/kit/utils/batch/batcher.go b/kit/utils/batch/batcher.go new file mode 100644 index 00000000..b1371020 --- /dev/null +++ b/kit/utils/batch/batcher.go @@ -0,0 +1,172 @@ +package batch + +import ( + "sync/atomic" + "time" + _ "unsafe" + + "github.com/clubpay/ronykit/kit/utils" +) + +/* + Creation Time: 2022 - Jul - 22 + Created by: (ehsan) + Maintainers: + 1. Ehsan N. Moosa (E2) + Auditor: Ehsan N. Moosa (E2) +*/ + +type NA = struct{} + +type Func[IN, OUT any] func(targetID string, entries []Entry[IN, OUT]) + +type MultiBatcher[IN, OUT any] struct { + cfg config + batcherFunc Func[IN, OUT] + poolMtx utils.SpinLock + pool map[string]*Batcher[IN, OUT] +} + +// NewMulti creates a pool of Batcher funcs. By calling Enter or EnterAndWait you add +// the item into the Batcher which is identified by 'tagID'. +func NewMulti[IN, OUT any](f Func[IN, OUT], opt ...Option) *MultiBatcher[IN, OUT] { + cfg := defaultConfig + for _, o := range opt { + o(&cfg) + } + + fp := &MultiBatcher[IN, OUT]{ + cfg: cfg, + batcherFunc: f, + pool: make(map[string]*Batcher[IN, OUT], 16), + } + + return fp +} + +func (fp *MultiBatcher[IN, OUT]) getBatcher(tagID string) *Batcher[IN, OUT] { + fp.poolMtx.Lock() + f := fp.pool[tagID] + if f == nil { + f = newBatcher[IN, OUT](fp.batcherFunc, tagID, fp.cfg) + fp.pool[tagID] = f + } + fp.poolMtx.Unlock() + + return f +} + +func (fp *MultiBatcher[IN, OUT]) Enter(targetID string, entry Entry[IN, OUT]) { + fp.getBatcher(targetID).enter(entry) +} + +func (fp *MultiBatcher[IN, OUT]) EnterAndWait(targetID string, entry Entry[IN, OUT]) { + fp.getBatcher(targetID).enterAndWait(entry) +} + +type Batcher[IN, OUT any] struct { + utils.SpinLock + + readyWorkers int32 + batchSize int32 + minWaitTime time.Duration + flusherFunc Func[IN, OUT] + entryChan chan Entry[IN, OUT] + tagID string +} + +func NewBatcher[IN, OUT any](f Func[IN, OUT], tagID string, opt ...Option) *Batcher[IN, OUT] { + cfg := defaultConfig + for _, o := range opt { + o(&cfg) + } + + return newBatcher[IN, OUT](f, tagID, cfg) +} + +func newBatcher[IN, OUT any](f Func[IN, OUT], tagID string, cfg config) *Batcher[IN, OUT] { + return &Batcher[IN, OUT]{ + readyWorkers: cfg.maxWorkers, + batchSize: cfg.batchSize, + minWaitTime: cfg.minWaitTime, + flusherFunc: f, + entryChan: make(chan Entry[IN, OUT], cfg.batchSize), + tagID: tagID, + } +} + +func (f *Batcher[IN, OUT]) startWorker() { + f.Lock() + if atomic.AddInt32(&f.readyWorkers, -1) < 0 { + atomic.AddInt32(&f.readyWorkers, 1) + f.Unlock() + + return + } + f.Unlock() + + w := &worker[IN, OUT]{ + f: f, + bs: int(f.batchSize), + } + go w.run() +} + +func (f *Batcher[IN, OUT]) enter(entry Entry[IN, OUT]) { + f.entryChan <- entry + f.startWorker() +} + +func (f *Batcher[IN, OUT]) enterAndWait(entry Entry[IN, OUT]) { + f.enter(entry) + entry.wait() +} + +type worker[IN, OUT any] struct { + f *Batcher[IN, OUT] + bs int +} + +func (w *worker[IN, OUT]) run() { + var ( + el = make([]Entry[IN, OUT], 0, w.bs) + startTime = utils.NanoTime() + ) + for { + for { + select { + case e := <-w.f.entryChan: + el = append(el, e) + if len(el) < w.bs { + continue + } + default: + } + + break + } + + if w.f.minWaitTime > 0 && len(el) < w.bs { + delta := w.f.minWaitTime - time.Duration(utils.NanoTime()-startTime) + if delta > 0 { + time.Sleep(delta) + + continue + } + } + w.f.Lock() + if len(el) == 0 { + // clean up and shutdown the worker + atomic.AddInt32(&w.f.readyWorkers, 1) + w.f.Unlock() + + break + } + w.f.Unlock() + w.f.flusherFunc(w.f.tagID, el) + for idx := range el { + el[idx].done() + } + el = el[:0] + } +} diff --git a/kit/utils/batch/batcher_test.go b/kit/utils/batch/batcher_test.go new file mode 100644 index 00000000..d9bd3015 --- /dev/null +++ b/kit/utils/batch/batcher_test.go @@ -0,0 +1,137 @@ +package batch_test + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/clubpay/ronykit/kit/utils/batch" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestGateway(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "RonyKit/Kit/Utils Suite") +} + +var _ = Describe("Flusher Without WaitTime", func() { + var out, in int64 + f := batch.NewMulti[int, batch.NA]( + func(targetID string, entries []batch.Entry[int, batch.NA]) { + time.Sleep(time.Millisecond * 100) + atomic.AddInt64(&out, int64(len(entries))) + }, + batch.WithBatchSize(20), + batch.WithMaxWorkers(10), + ) + + wg := sync.WaitGroup{} + total := int64(10000) + for i := 0; i < int(total); i++ { + wg.Add(1) + go func() { + f.EnterAndWait( + fmt.Sprintf("T%d", rand.Intn(3)), + batch.NewEntry[int, batch.NA](rand.Intn(10), nil), + ) + atomic.AddInt64(&in, 1) + wg.Done() + }() + } + wg.Wait() + It("should flush all entries", func() { + for _, q := range f.Pool() { + Expect(q.EntryChan()).To(BeEmpty()) + } + Expect(in).To(Equal(total)) + Expect(out).To(Equal(total)) + }) +}) + +var _ = Describe("Flusher With WaitTime", func() { + var out, in int64 + f := batch.NewMulti[int, batch.NA]( + func(targetID string, entries []batch.Entry[int, batch.NA]) { + time.Sleep(time.Millisecond * 100) + atomic.AddInt64(&out, int64(len(entries))) + for _, e := range entries { + e.Value() + } + }, + batch.WithBatchSize(20), + batch.WithMaxWorkers(10), + batch.WithMinWaitTime(250*time.Millisecond), + ) + + wg := sync.WaitGroup{} + total := int64(10000) + for i := 0; i < int(total); i++ { + wg.Add(1) + go func() { + f.EnterAndWait( + fmt.Sprintf("T%d", rand.Intn(3)), + batch.NewEntry[int, batch.NA](rand.Intn(10), nil), + ) + atomic.AddInt64(&in, 1) + wg.Done() + }() + } + wg.Wait() + + It("should flush all entries", func() { + for _, q := range f.Pool() { + Expect(q.EntryChan()).To(BeEmpty()) + } + Expect(in).To(Equal(total)) + Expect(out).To(Equal(total)) + }) +}) + +var _ = Describe("Flusher With Callback", func() { + var out, in int64 + f := batch.NewMulti[int, int]( + func(targetID string, entries []batch.Entry[int, int]) { + time.Sleep(time.Millisecond * 100) + atomic.AddInt64(&out, int64(len(entries))) + for _, e := range entries { + e.Callback(e.Value()) + } + }, + batch.WithBatchSize(20), + batch.WithMaxWorkers(10), + batch.WithMinWaitTime(250*time.Millisecond), + ) + + wg := sync.WaitGroup{} + total := int64(10000) + var sum int64 + for i := 0; i < int(total); i++ { + wg.Add(1) + go func(x int) { + f.EnterAndWait( + "sameID", + batch.NewEntry( + x, + func(out int) { atomic.AddInt64(&sum, int64(out)) }, + ), + ) + atomic.AddInt64(&in, 1) + wg.Done() + }(i) + } + wg.Wait() + + It("should flush all entries", func() { + for _, q := range f.Pool() { + Expect(q.EntryChan()).To(BeEmpty()) + } + Expect(in).To(Equal(total)) + Expect(out).To(Equal(total)) + Expect(sum).To(Equal(total * (total - 1) / 2)) + }) +}) diff --git a/kit/utils/batch/entry.go b/kit/utils/batch/entry.go new file mode 100644 index 00000000..55df6a1e --- /dev/null +++ b/kit/utils/batch/entry.go @@ -0,0 +1,40 @@ +package batch + +type Entry[IN, OUT any] interface { + wait() + done() + Value() IN + Callback(out OUT) +} + +type entry[IN, OUT any] struct { + v IN + ch chan struct{} + cb func(OUT) +} + +func NewEntry[IN, OUT any](v IN, callbackFn func(out OUT)) Entry[IN, OUT] { + return &entry[IN, OUT]{ + v: v, + cb: callbackFn, + ch: make(chan struct{}, 1), + } +} + +func (e *entry[IN, OUT]) wait() { + <-e.ch +} + +func (e *entry[IN, OUT]) done() { + e.ch <- struct{}{} +} + +func (e *entry[IN, OUT]) Value() IN { + return e.v +} + +func (e *entry[IN, OUT]) Callback(out OUT) { + if e.cb != nil { + e.cb(out) + } +} diff --git a/kit/utils/batch/export_test.go b/kit/utils/batch/export_test.go new file mode 100644 index 00000000..6b7e200f --- /dev/null +++ b/kit/utils/batch/export_test.go @@ -0,0 +1,9 @@ +package batch + +func (fp *MultiBatcher[IN, OUT]) Pool() map[string]*Batcher[IN, OUT] { + return fp.pool +} + +func (f *Batcher[IN, OUT]) EntryChan() chan Entry[IN, OUT] { + return f.entryChan +} diff --git a/kit/utils/batch/option.go b/kit/utils/batch/option.go new file mode 100644 index 00000000..f662ceab --- /dev/null +++ b/kit/utils/batch/option.go @@ -0,0 +1,44 @@ +package batch + +import ( + "runtime" + "time" +) + +type config struct { + maxWorkers int32 + batchSize int32 + minWaitTime time.Duration +} + +var defaultConfig = config{ + maxWorkers: int32(runtime.NumCPU() * 10), + batchSize: 100, + minWaitTime: 0, +} + +type Option func(*config) + +// WithMaxWorkers sets the maximum number of workers to use. +// Defaults to runtime.NumCPU() * 10. +func WithMaxWorkers(maxWorkers int32) Option { + return func(c *config) { + c.maxWorkers = maxWorkers + } +} + +// WithBatchSize sets the maximum number of entries to batch together. +// Defaults to 100. +func WithBatchSize(batchSize int32) Option { + return func(c *config) { + c.batchSize = batchSize + } +} + +// WithMinWaitTime sets the minimum amount of time to wait before flushing +// a batch. Defaults to 0. +func WithMinWaitTime(minWaitTime time.Duration) Option { + return func(c *config) { + c.minWaitTime = minWaitTime + } +}