Skip to content

Commit

Permalink
throttling pipe with ops per time interval (rate limit) (#55)
Browse files Browse the repository at this point in the history
* throttling pipe with ops per time interval (rate limit)
* update static check GitHub action to v1.3.1
  • Loading branch information
fogfish authored Oct 14, 2024
1 parent 8db50f9 commit 25aadde
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
flag-name: ${{ matrix.module }}
parallel: true

- uses: dominikh/[email protected].0
- uses: dominikh/[email protected].1
with:
install-go: false
working-directory: ${{ matrix.module }}
Expand Down
49 changes: 49 additions & 0 deletions pipe/examples/throttling/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Copyright (C) 2022 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/fogfish/golem
//

package main

import (
"context"
"fmt"
"strconv"
"time"

"github.com/fogfish/golem/pipe"
)

const (
fastProducer = 10000
cap = 1
)

func main() {
ctx, close := context.WithCancel(context.Background())

// Generate sequence of integers
fast := pipe.StdErr(pipe.Unfold(ctx, fastProducer, 0,
func(x int) (int, error) { return x + 1, nil },
))

// Throttle the "fast" pipe
slow := pipe.Throttling(ctx, fast, 1, 100*time.Millisecond)

// Numbers to string
vals := pipe.StdErr(pipe.Map(ctx, slow,
func(x int) (string, error) { return strconv.Itoa(x), nil },
))

// Output strings
<-pipe.ForEach(ctx, vals,
func(x string) {
fmt.Printf("==> %s | %s\n", time.Now().Format(time.StampMilli), x)
},
)

close()
}
5 changes: 5 additions & 0 deletions pipe/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func TakeWhile[A any](ctx context.Context, in <-chan A, f func(A) bool) <-chan A
return pipe.TakeWhile(ctx, in, f)
}

// Throttling the channel to ops per time interval
func Throttling[A any](ctx context.Context, in <-chan A, ops int, interval time.Duration) <-chan A {
return pipe.Throttling(ctx, in, ops, interval)
}

// Lift sequence of values into channel
func Seq[T any](xs ...T) <-chan T {
return pipe.Seq(xs...)
Expand Down
22 changes: 22 additions & 0 deletions pipe/fork/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,25 @@ func TestTakeWhile(t *testing.T) {

close()
}

func TestThrottling(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := fork.Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
slowSeq := fork.Throttling(ctx, seq, 1, 100*time.Millisecond)
out := fork.StdErr(fork.Map(ctx, par, slowSeq,
func(_ int) (time.Time, error) {
return time.Now(), nil
},
))
wt := fork.ToSeq(out)
for i := 1; i < len(wt); i++ {
diff := wt[i].Sub(wt[i-1])

it.Then(t).Should(
it.Less(diff, 110*time.Millisecond),
it.Greater(diff, 99*time.Millisecond),
)
}

close()
}
46 changes: 46 additions & 0 deletions pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,52 @@ func TakeWhile[A any](ctx context.Context, in <-chan A, f func(A) bool) <-chan A
return out
}

// Throttling the channel to ops per time interval.
func Throttling[A any](ctx context.Context, in <-chan A, ops int, interval time.Duration) <-chan A {
out := make(chan A, cap(in))
ctl := make(chan struct{}, ops)

go func() {
defer close(ctl)

for {
for i := 0; i < ops; i++ {
select {
case ctl <- struct{}{}:
case <-ctx.Done():
return
}
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
}()

go func() {
defer close(out)

var a A
for a = range in {
select {
case <-ctl:
case <-ctx.Done():
return
}

select {
case out <- a:
case <-ctx.Done():
return
}
}
}()

return out
}

// Lift sequence of values into channel
func Seq[T any](xs ...T) <-chan T {
out := make(chan T, len(xs))
Expand Down
22 changes: 22 additions & 0 deletions pipe/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,28 @@ func TestTakeWhile(t *testing.T) {
close()
}

func TestThrottling(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := pipe.Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
slowSeq := pipe.Throttling(ctx, seq, 1, 100*time.Millisecond)
out := pipe.StdErr(pipe.Map(ctx, slowSeq,
func(_ int) (time.Time, error) {
return time.Now(), nil
},
))
wt := pipe.ToSeq(out)
for i := 1; i < len(wt); i++ {
diff := wt[i].Sub(wt[i-1])

it.Then(t).Should(
it.Less(diff, 110*time.Millisecond),
it.Greater(diff, 99*time.Millisecond),
)
}

close()
}

func BenchmarkPipe(b *testing.B) {
ctx, close := context.WithCancel(context.Background())
in, eg := pipe.New[int](ctx, 0)
Expand Down
2 changes: 1 addition & 1 deletion pipe/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package pipe

const Version = "pipe/v1.1.1"
const Version = "pipe/v1.2.0"

0 comments on commit 25aadde

Please sign in to comment.