Skip to content

Commit

Permalink
(fix) allow fmap stream out data and produce errors effectively (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish authored Aug 13, 2024
1 parent ec85331 commit 8db50f9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 39 deletions.
22 changes: 7 additions & 15 deletions pipe/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,25 @@ func ForEach[A any](ctx context.Context, par int, in <-chan A, f func(A)) <-chan

// FMap applies function over channel messages, flatten the output channel and
// emits it result to new channel.
func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap func(A) (<-chan B, error)) (<-chan B, <-chan error) {
func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap func(context.Context, A, chan<- B) error) (<-chan B, <-chan error) {
var wg sync.WaitGroup
out := make(chan B, par)
exx := make(chan error, par)

pmap := func() {
defer wg.Done()

var (
a A
ch <-chan B
err error
)

var a A
for a = range in {
ch, err = fmap(a)
if err != nil {
if err := fmap(ctx, a, out); err != nil {
exx <- err
return
}

for x := range ch {
select {
case out <- x:
case <-ctx.Done():
return
}
select {
case <-ctx.Done():
return
default:
}
}
}
Expand Down
33 changes: 29 additions & 4 deletions pipe/fork/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func TestFMap(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := fork.Seq(1, 2, 3, 4, 5)
out := fork.StdErr(fork.FMap(ctx, par, seq,
func(x int) (<-chan string, error) {
return fork.Seq(strconv.Itoa(x)), nil
func(ctx context.Context, x int, ch chan<- string) error {
ch <- strconv.Itoa(x)
return nil
}),
)

Expand All @@ -99,8 +100,8 @@ func TestFMap(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := fork.Seq(1, 2, 3, 4, 5)
_, exx := fork.FMap(ctx, par, seq,
func(x int) (<-chan string, error) {
return nil, fmt.Errorf("fail")
func(ctx context.Context, x int, ch chan<- string) error {
return fmt.Errorf("fail")
},
)

Expand All @@ -110,6 +111,30 @@ func TestFMap(t *testing.T) {

close()
})

t.Run("Cancel", func(t *testing.T) {
acc := 0
emit := func() (int, error) {
acc++
return acc, nil
}

ctx, close := context.WithCancel(context.Background())
seq := fork.StdErr(fork.Emit(ctx, 1000, 10*time.Microsecond, emit))
out := fork.StdErr(fork.FMap(ctx, par, seq,
func(ctx context.Context, x int, ch chan<- int) error {
ch <- x
return nil
}),
)

vals := fork.ToSeq(fork.Take(ctx, out, 10))
close()

it.Then(t).Should(
it.Seq(vals).Contain().AllOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
)
})
}

func TestFold(t *testing.T) {
Expand Down
22 changes: 7 additions & 15 deletions pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,25 @@ func ForEach[A any](ctx context.Context, in <-chan A, f func(A)) <-chan struct{}

// FMap applies function over channel messages, flatten the output channel and
// emits it result to new channel.
func FMap[A, B any](ctx context.Context, in <-chan A, fmap func(A) (<-chan B, error)) (<-chan B, <-chan error) {
func FMap[A, B any](ctx context.Context, in <-chan A, fmap func(context.Context, A, chan<- B) error) (<-chan B, <-chan error) {
out := make(chan B, cap(in))
exx := make(chan error, 1)

go func() {
defer close(out)
defer close(exx)

var (
a A
ch <-chan B
err error
)

var a A
for a = range in {
ch, err = fmap(a)
if err != nil {
if err := fmap(ctx, a, out); err != nil {
exx <- err
return
}

for x := range ch {
select {
case out <- x:
case <-ctx.Done():
return
}
select {
case <-ctx.Done():
return
default:
}
}
}()
Expand Down
33 changes: 29 additions & 4 deletions pipe/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func TestFMap(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := pipe.Seq(1, 2, 3, 4, 5)
out := pipe.StdErr(pipe.FMap(ctx, seq,
func(x int) (<-chan string, error) {
return pipe.Seq(strconv.Itoa(x)), nil
func(ctx context.Context, x int, ch chan<- string) error {
ch <- strconv.Itoa(x)
return nil
}),
)

Expand All @@ -104,8 +105,8 @@ func TestFMap(t *testing.T) {
ctx, close := context.WithCancel(context.Background())
seq := pipe.Seq(1, 2, 3, 4, 5)
_, exx := pipe.FMap(ctx, seq,
func(x int) (<-chan string, error) {
return nil, fmt.Errorf("fail")
func(ctx context.Context, x int, ch chan<- string) error {
return fmt.Errorf("fail")
},
)

Expand All @@ -115,6 +116,30 @@ func TestFMap(t *testing.T) {

close()
})

t.Run("Cancel", func(t *testing.T) {
acc := 0
emit := func() (int, error) {
acc++
return acc, nil
}

ctx, close := context.WithCancel(context.Background())
seq := pipe.StdErr(pipe.Emit(ctx, 1000, 10*time.Microsecond, emit))
out := pipe.StdErr(pipe.FMap(ctx, seq,
func(ctx context.Context, x int, ch chan<- int) error {
ch <- x
return nil
}),
)

vals := pipe.ToSeq(pipe.Take(ctx, out, 10))
close()

it.Then(t).Should(
it.Seq(vals).Contain().AllOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
)
})
}

func TestFold(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pipe/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

package pipe

const Version = "pipe/v1.1.0"
const Version = "pipe/v1.1.1"

0 comments on commit 8db50f9

Please sign in to comment.