Skip to content

Commit

Permalink
[kit] add batcher to utils package
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed Nov 15, 2023
1 parent ccb1873 commit df4f69c
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 0 deletions.
172 changes: 172 additions & 0 deletions kit/utils/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package batch

import (
"sync/atomic"
"time"
_ "unsafe"

"github.com/clubpay/ronykit/kit/utils"
)

/*
Creation Time: 2022 - Jul - 22
Created by: (ehsan)
Maintainers:
1. Ehsan N. Moosa (E2)
Auditor: Ehsan N. Moosa (E2)
*/

type NA = struct{}

type Func[IN, OUT any] func(targetID string, entries []Entry[IN, OUT])

type MultiBatcher[IN, OUT any] struct {
cfg config
batcherFunc Func[IN, OUT]
poolMtx utils.SpinLock
pool map[string]*Batcher[IN, OUT]
}

// NewMulti creates a pool of Batcher funcs. By calling Enter or EnterAndWait you add
// the item into the Batcher which is identified by 'tagID'.
func NewMulti[IN, OUT any](f Func[IN, OUT], opt ...Option) *MultiBatcher[IN, OUT] {
cfg := defaultConfig
for _, o := range opt {
o(&cfg)
}

fp := &MultiBatcher[IN, OUT]{
cfg: cfg,
batcherFunc: f,
pool: make(map[string]*Batcher[IN, OUT], 16),
}

return fp
}

func (fp *MultiBatcher[IN, OUT]) getBatcher(tagID string) *Batcher[IN, OUT] {
fp.poolMtx.Lock()
f := fp.pool[tagID]
if f == nil {
f = newBatcher[IN, OUT](fp.batcherFunc, tagID, fp.cfg)
fp.pool[tagID] = f
}
fp.poolMtx.Unlock()

return f
}

func (fp *MultiBatcher[IN, OUT]) Enter(targetID string, entry Entry[IN, OUT]) {
fp.getBatcher(targetID).enter(entry)
}

func (fp *MultiBatcher[IN, OUT]) EnterAndWait(targetID string, entry Entry[IN, OUT]) {
fp.getBatcher(targetID).enterAndWait(entry)
}

type Batcher[IN, OUT any] struct {
utils.SpinLock

readyWorkers int32
batchSize int32
minWaitTime time.Duration
flusherFunc Func[IN, OUT]
entryChan chan Entry[IN, OUT]
tagID string
}

func NewBatcher[IN, OUT any](f Func[IN, OUT], tagID string, opt ...Option) *Batcher[IN, OUT] {
cfg := defaultConfig
for _, o := range opt {
o(&cfg)
}

return newBatcher[IN, OUT](f, tagID, cfg)
}

func newBatcher[IN, OUT any](f Func[IN, OUT], tagID string, cfg config) *Batcher[IN, OUT] {
return &Batcher[IN, OUT]{
readyWorkers: cfg.maxWorkers,
batchSize: cfg.batchSize,
minWaitTime: cfg.minWaitTime,
flusherFunc: f,
entryChan: make(chan Entry[IN, OUT], cfg.batchSize),
tagID: tagID,
}
}

func (f *Batcher[IN, OUT]) startWorker() {
f.Lock()
if atomic.AddInt32(&f.readyWorkers, -1) < 0 {
atomic.AddInt32(&f.readyWorkers, 1)
f.Unlock()

return
}
f.Unlock()

w := &worker[IN, OUT]{
f: f,
bs: int(f.batchSize),
}
go w.run()
}

func (f *Batcher[IN, OUT]) enter(entry Entry[IN, OUT]) {
f.entryChan <- entry
f.startWorker()
}

func (f *Batcher[IN, OUT]) enterAndWait(entry Entry[IN, OUT]) {
f.enter(entry)
entry.wait()
}

type worker[IN, OUT any] struct {
f *Batcher[IN, OUT]
bs int
}

func (w *worker[IN, OUT]) run() {
var (
el = make([]Entry[IN, OUT], 0, w.bs)
startTime = utils.NanoTime()
)
for {
for {
select {
case e := <-w.f.entryChan:
el = append(el, e)
if len(el) < w.bs {
continue
}
default:
}

break
}

if w.f.minWaitTime > 0 && len(el) < w.bs {
delta := w.f.minWaitTime - time.Duration(utils.NanoTime()-startTime)
if delta > 0 {
time.Sleep(delta)

continue
}
}
w.f.Lock()
if len(el) == 0 {
// clean up and shutdown the worker
atomic.AddInt32(&w.f.readyWorkers, 1)
w.f.Unlock()

break
}
w.f.Unlock()
w.f.flusherFunc(w.f.tagID, el)
for idx := range el {
el[idx].done()
}
el = el[:0]
}
}
137 changes: 137 additions & 0 deletions kit/utils/batch/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package batch_test

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/clubpay/ronykit/kit/utils/batch"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestGateway(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecs(t, "RonyKit/Kit/Utils Suite")
}

var _ = Describe("Flusher Without WaitTime", func() {
var out, in int64
f := batch.NewMulti[int, batch.NA](
func(targetID string, entries []batch.Entry[int, batch.NA]) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt64(&out, int64(len(entries)))
},
batch.WithBatchSize(20),
batch.WithMaxWorkers(10),
)

wg := sync.WaitGroup{}
total := int64(10000)
for i := 0; i < int(total); i++ {
wg.Add(1)
go func() {
f.EnterAndWait(
fmt.Sprintf("T%d", rand.Intn(3)),
batch.NewEntry[int, batch.NA](rand.Intn(10), nil),
)
atomic.AddInt64(&in, 1)
wg.Done()
}()
}
wg.Wait()
It("should flush all entries", func() {
for _, q := range f.Pool() {
Expect(q.EntryChan()).To(BeEmpty())
}
Expect(in).To(Equal(total))
Expect(out).To(Equal(total))
})
})

var _ = Describe("Flusher With WaitTime", func() {
var out, in int64
f := batch.NewMulti[int, batch.NA](
func(targetID string, entries []batch.Entry[int, batch.NA]) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt64(&out, int64(len(entries)))
for _, e := range entries {
e.Value()
}
},
batch.WithBatchSize(20),
batch.WithMaxWorkers(10),
batch.WithMinWaitTime(250*time.Millisecond),
)

wg := sync.WaitGroup{}
total := int64(10000)
for i := 0; i < int(total); i++ {
wg.Add(1)
go func() {
f.EnterAndWait(
fmt.Sprintf("T%d", rand.Intn(3)),
batch.NewEntry[int, batch.NA](rand.Intn(10), nil),
)
atomic.AddInt64(&in, 1)
wg.Done()
}()
}
wg.Wait()

It("should flush all entries", func() {
for _, q := range f.Pool() {
Expect(q.EntryChan()).To(BeEmpty())
}
Expect(in).To(Equal(total))
Expect(out).To(Equal(total))
})
})

var _ = Describe("Flusher With Callback", func() {
var out, in int64
f := batch.NewMulti[int, int](
func(targetID string, entries []batch.Entry[int, int]) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt64(&out, int64(len(entries)))
for _, e := range entries {
e.Callback(e.Value())
}
},
batch.WithBatchSize(20),
batch.WithMaxWorkers(10),
batch.WithMinWaitTime(250*time.Millisecond),
)

wg := sync.WaitGroup{}
total := int64(10000)
var sum int64
for i := 0; i < int(total); i++ {
wg.Add(1)
go func(x int) {
f.EnterAndWait(
"sameID",
batch.NewEntry(
x,
func(out int) { atomic.AddInt64(&sum, int64(out)) },
),
)
atomic.AddInt64(&in, 1)
wg.Done()
}(i)
}
wg.Wait()

It("should flush all entries", func() {
for _, q := range f.Pool() {
Expect(q.EntryChan()).To(BeEmpty())
}
Expect(in).To(Equal(total))
Expect(out).To(Equal(total))
Expect(sum).To(Equal(total * (total - 1) / 2))
})
})
40 changes: 40 additions & 0 deletions kit/utils/batch/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package batch

type Entry[IN, OUT any] interface {
wait()
done()
Value() IN
Callback(out OUT)
}

type entry[IN, OUT any] struct {
v IN
ch chan struct{}
cb func(OUT)
}

func NewEntry[IN, OUT any](v IN, callbackFn func(out OUT)) Entry[IN, OUT] {
return &entry[IN, OUT]{
v: v,
cb: callbackFn,
ch: make(chan struct{}, 1),
}
}

func (e *entry[IN, OUT]) wait() {
<-e.ch
}

func (e *entry[IN, OUT]) done() {
e.ch <- struct{}{}
}

func (e *entry[IN, OUT]) Value() IN {
return e.v
}

func (e *entry[IN, OUT]) Callback(out OUT) {
if e.cb != nil {
e.cb(out)
}
}
9 changes: 9 additions & 0 deletions kit/utils/batch/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package batch

func (fp *MultiBatcher[IN, OUT]) Pool() map[string]*Batcher[IN, OUT] {
return fp.pool
}

func (f *Batcher[IN, OUT]) EntryChan() chan Entry[IN, OUT] {
return f.entryChan
}
Loading

0 comments on commit df4f69c

Please sign in to comment.