From 8a64e8db877ddf8a356839e7d9749737dd244d75 Mon Sep 17 00:00:00 2001 From: Kyle Maas Date: Mon, 16 Jan 2023 00:51:41 +0000 Subject: [PATCH] Add version of Pump() with status callbacks --- stream.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/stream.go b/stream.go index ae76210..d93884c 100644 --- a/stream.go +++ b/stream.go @@ -72,3 +72,35 @@ func Pump(ctx context.Context, dst Sink, src Source) error { panic("unreachable") } + +// Pump moves values from a source into a sink. +// PumpWithStatus lets you include callbacks so you know when it's processing vs. waiting. +// +// Currently this doesn't work atomically, so if a Sink errors in the +// Pour call, the value that was read from the source is lost. +func PumpWithStatus(ctx context.Context, dst Sink, src Source, startWaiting func(), doneWaiting func(), startProcessing func(), doneProcessing func()) error { + if psrc, ok := src.(PushSource); ok { + return psrc.Push(ctx, dst) + } + + for { + startWaiting() + v, err := src.Next(ctx) + doneWaiting() + if IsEOS(err) { + return nil + } else if err != nil { + return err + } + + startProcessing() + err = dst.Pour(ctx, v) + doneProcessing() + if err != nil { + return err + } + } + + panic("unreachable") +} +