From e4e5143994200322efd95d4947dd19a3845cb17c Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Thu, 1 Aug 2024 23:00:48 +0200 Subject: [PATCH] Add optionalVia and unsafeOptionalVia --- .../paradox/release-notes/releases-1.1.md | 1 + .../operators/Source-or-Flow/optionalVia.md | 15 +++++ .../jdocs/stream/operators/SourceOrFlow.java | 24 +++++++ .../operators/sourceorflow/OptionalVia.scala | 22 +++++++ .../pekko/stream/scaladsl/FlowSpec.scala | 16 +++++ .../stream/scaladsl/FlowWithContextSpec.scala | 20 ++++++ .../pekko/stream/scaladsl/SourceSpec.scala | 13 ++++ .../scaladsl/SourceWithContextSpec.scala | 15 +++++ .../apache/pekko/stream/javadsl/Flow.scala | 26 ++++++++ .../stream/javadsl/FlowWithContext.scala | 33 ++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 26 ++++++++ .../stream/javadsl/SourceWithContext.scala | 34 ++++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 51 ++++++++++++++ .../stream/scaladsl/FlowWithContext.scala | 66 +++++++++++++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 51 ++++++++++++++ .../stream/scaladsl/SourceWithContext.scala | 65 ++++++++++++++++++ 16 files changed, 478 insertions(+) create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md create mode 100644 docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 4a0ae7611b9..42335ec4047 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions. * add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989)) * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) +* add optionalVia/unsafeOptionalVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) The Stream Testkit Java DSL has some extra functions. diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md new file mode 100644 index 00000000000..a8e25cd196b --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md @@ -0,0 +1,15 @@ +# optionalVia + +For a stream containing optional elements, transforms each element by applying +the given `viaFlow` and passing the value downstream as an optional value. + +## Description + +For a stream containing optional elements, transforms each element by applying +the given `viaFlow` and passing the value downstream as an optional value. + +Scala +: @@snip [Take.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia } diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 4385dd2de1f..d5368fe9a00 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -532,6 +532,30 @@ void foldAsyncExample() { // #foldAsync } + void optionalViaExample() { + + // #optionalVia + Flow flow = + Flow.create(); + + Source, NotUsed> source = + Source.from(Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4"))); + + Source.optionalVia( + source, + flow.map(Integer::parseInt), + Keep.none() + ).runForeach(System.out::println, system); + // Optional[1] + // Optional.empty + // Optional.empty + // Optional[4] + + // #optionalVia + + + } + void takeExample() { // #take Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system); diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala new file mode 100644 index 00000000000..4c0f03ec727 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala @@ -0,0 +1,22 @@ +package docs.stream.operators.sourceorflow + +object OptionalVia { + def optionalViaExample(): Unit = { + import org.apache.pekko.actor.ActorSystem + import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source } + + implicit val system: ActorSystem = ActorSystem() + + // #optionalVia + Source.optionalVia( + Source(List(Some("1"), None, None, Some("4"))), + Flow[String].map(_.toInt) + )(Keep.none).runForeach(println) + // Some(1) + // None + // None + // Some(4) + // #optionalVia + } + +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 632f98a320e..dfe689ba397 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r "should be created from a function easily" in { Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10) } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + val flow = Flow[Option[String]] + + Source(data).via( + Flow.optionalVia( + flow, + Flow[String].map(_.toInt) + )(Keep.none) + ).runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } /** diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 0036cf34a13..206e6c41760 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val flow = Flow[(Option[String], Int)] + .asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) + .map(_._1) + + SourceWithContext + .fromTuples(Source(data)).via( + FlowWithContext.unsafeOptionalVia( + flow, + Flow[String].map(_.toInt) + )(Keep.none) + ) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 2444e724298..ba0dc9fcedb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout { import Attributes._ val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("") } + + "Apply a viaFlow with optional elements using optionalVia" in { + val data = List(Some("1"), None, None, Some("4")) + + Source.optionalVia( + Source(data), + Flow[String].map(_.toInt) + )(Keep.none) + .runWith(TestSink.probe[Option[Int]]) + .request(4) + .expectNext(Some(1), None, None, Some(4)) + .expectComplete() + } } "A Source.run" must { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 137b83ee888..40af0421438 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) .expectComplete() } + + "Apply a viaFlow with optional elements using unsafeOptionalVia" in { + val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4)) + + val source = SourceWithContext.fromTuples(Source(data)) + + SourceWithContext.unsafeOptionalVia( + source, + Flow[String].map(_.toInt) + )(Keep.none) + .runWith(TestSink.probe[(Option[Int], Int)]) + .request(4) + .expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4)) + .expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1963a97f584..685cc8ab7d9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -69,6 +69,32 @@ object Flow { def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] = Flow.create[I]().map(f) + /** + * Creates a Flow from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combine: function.Function2[FMat, FViaMat, Mat] + ): Flow[FIn, Optional[FViaOut], Mat] = + scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava + /** Create a `Flow` which can process elements of type `T`. */ def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index cb76749dbd9..a3cd0183d24 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -25,6 +26,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object FlowWithContext { @@ -39,6 +41,37 @@ object FlowWithContext { under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(under) + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat], + combine: function.Function2[FMat, FViaMat, Mat] + ): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] = + scaladsl.FlowWithContext.unsafeOptionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map( + _.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bf384639d4f..a7d0d95906d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -137,6 +137,32 @@ object Source { def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.cycle(() => f.create().asScala)) + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat], + combine: function.Function2[SMat, FMat, Mat] + ): Source[Optional[FOut], Mat] = + scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava + /** * Helper to create [[Source]] from `Iterable`. * Example usage: diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 39e2c4e4d96..7f91d6391c0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance @@ -28,6 +29,7 @@ import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ object SourceWithContext { @@ -38,6 +40,38 @@ object SourceWithContext { def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = { new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala))) } + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat], + combine: function.Function2[SMat, FMat, Mat] + ): SourceWithContext[Optional[FOut], Ctx, Mat] = + scaladsl.SourceWithContext.unsafeOptionalVia(source.map(_.toScala).asScala, viaFlow.asScala)( + combinerToScala(combine)).map( + _.toJava).asJava + } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 07ba28c9bfe..f2a967e4f92 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -428,6 +428,57 @@ object Flow { */ def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f) + /** + * Creates a FlowW from an existing base Flow outputting an optional element and + * applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of flow and viaFlow + * @return a Flow with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combine: (FMat, FViaMat) => Mat + ): Flow[FIn, Option[FViaOut], Mat] = + Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[FOut]](2)) + val merge = b.add(Merge[Option[FViaOut]](2)) + + val filterAvailable = Flow[Option[FOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[FOut]].filter { opt => + opt.isEmpty + }.map { + _ => Option.empty[FViaOut] + } + + val mapIntoOption = Flow[FViaOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(s.in, merge.out) + }) + /** * A graph with the shape of a flow logically is a flow, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index b2e9bced042..027fb449912 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.NotUsed import pekko.japi.Pair import pekko.stream._ @@ -36,6 +37,71 @@ object FlowWithContext { def fromTuples[In, CtxIn, Out, CtxOut, Mat]( flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(flow) + + /** + * Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param flow The base flow that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of flow and viaFlow + * @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original flow's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat]( + flow: FlowWithContext[FIn, Ctx, Option[FOut], Ctx, FMat], + viaFlow: Flow[FOut, FViaOut, FViaMat])( + combine: (FMat, FViaMat) => Mat + ): FlowWithContext[FIn, Ctx, Option[FViaOut], Ctx, Mat] = + FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { + implicit b => (f, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FViaOut], Ctx)](2)) + + val unzip = b.add(Unzip[FOut, Ctx]()) + val zipper = b.add(Zip[FViaOut, Ctx]()) + + val filterAvailable = Flow[(Option[FOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[FOut], Ctx)].filter { case (opt, _) => + opt.isEmpty + }.map { + case (_, ctx) => (Option.empty[FViaOut], ctx) + } + + val mapIntoOption = Flow[(FViaOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + f ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + FlowShape(f.in, merge.out) + })) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a39a15df59b..66bc362a251 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -308,6 +308,57 @@ object Source { fromIterator(() => iterator).withAttributes(DefaultAttributes.cycledSource) } + /** + * Creates a Source from an existing base Source outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. + * @param combine How to combine the materialized values of source and viaFlow + * @return a Source with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Option[SOut], SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combine: (SMat, FMat) => Mat + ): Source[Option[FOut], Mat] = + Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Option[SOut]](2)) + val merge = b.add(Merge[Option[FOut]](2)) + + val filterAvailable = Flow[Option[SOut]].collect { + case Some(f) => f + } + + val filterUnavailable = Flow[Option[SOut]].filter { opt => + opt.isEmpty + }.map { + _ => Option.empty[FOut] + } + + val mapIntoOption = Flow[FOut].map { + f => Some(f) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + }) + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 55a6ce316c8..958c8facd51 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko +import pekko.annotation.ApiMayChange import pekko.stream._ object SourceWithContext { @@ -25,6 +26,70 @@ object SourceWithContext { */ def fromTuples[Out, CtxOut, Mat](source: Source[(Out, CtxOut), Mat]): SourceWithContext[Out, CtxOut, Mat] = new SourceWithContext(source) + + /** + * Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element + * and applying an additional viaFlow only if the element in the stream is defined. + * + * '''Emits when''' the provided viaFlow is runs with defined elements + * + * '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures + * + * '''Completes when''' upstream completes or the first element is emitted + * + * '''Cancels when''' downstream cancels + * + * @param source The base source that outputs an optional element + * @param viaFlow The flow that gets used if the optional element in is defined. This flow only works + * on the data portion of flow and ignores the context so this flow *must* not re-order, + * drop or emit multiple elements for one incoming element + * @param combine How to combine the materialized values of source and viaFlow + * @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value + * is contained within an Option which indicates whether the original source's element had viaFlow + * applied. + * @since 1.1.0 + */ + @ApiMayChange + def unsafeOptionalVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Option[SOut], Ctx, SMat], + viaFlow: Flow[SOut, FOut, FMat])( + combine: (SMat, FMat) => Mat + ): SourceWithContext[Option[FOut], Ctx, Mat] = + SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { + implicit b => (s, viaF) => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2)) + val merge = b.add(Merge[(Option[FOut], Ctx)](2)) + + val unzip = b.add(Unzip[SOut, Ctx]()) + val zipper = b.add(Zip[FOut, Ctx]()) + + val filterAvailable = Flow[(Option[SOut], Ctx)].collect { + case (Some(f), ctx) => (f, ctx) + } + + val filterUnavailable = Flow[(Option[SOut], Ctx)].filter { case (opt, _) => + opt.isEmpty + }.map { + case (_, ctx) => (Option.empty[FOut], ctx) + } + + val mapIntoOption = Flow[(FOut, Ctx)].map { + case (f, ctx) => (Some(f), ctx) + } + + s ~> broadcast.in + + broadcast.out(0) ~> filterAvailable ~> unzip.in + + unzip.out0 ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + + zipper.out ~> mapIntoOption ~> merge.in(0) + + broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + + SourceShape(merge.out) + })) } /**