Skip to content

Commit

Permalink
Add optionalVia and unsafeOptionalVia
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Aug 2, 2024
1 parent 5bf60da commit a260bbe
Show file tree
Hide file tree
Showing 16 changed files with 476 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# optionalVia

For a stream containing optional elements, transforms each element by applying
the given `viaFlow` and passing the value downstream as an optional value.

## Description

For a stream containing optional elements, transforms each element by applying
the given `viaFlow` and passing the value downstream as an optional value.

Scala
: @@snip [Take.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia }

Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia }
21 changes: 21 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,27 @@ void foldAsyncExample() {
// #foldAsync
}

void optionalViaExample() {

// #optionalVia
Flow<String, String, NotUsed> flow =
Flow.create();

Source<Optional<String>, NotUsed> source =
Source.from(Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4")));

Source.optionalVia(
source,
flow.map(Integer::parseInt),
Keep.none()
).runForeach(System.out::println, system);
// Optional[1]
// Optional.empty
// Optional.empty
// Optional[4]
// #optionalVia
}

void takeExample() {
// #take
Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package docs.stream.operators.sourceorflow

object OptionalVia {
def optionalViaExample(): Unit = {
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source }

implicit val system: ActorSystem = ActorSystem()

// #optionalVia
Source.optionalVia(
Source(List(Some("1"), None, None, Some("4"))),
Flow[String].map(_.toInt)
)(Keep.none).runForeach(println)
// Some(1)
// None
// None
// Some(4)
// #optionalVia
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

val flow = Flow[Option[String]]

Source(data).via(
Flow.optionalVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
).runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val flow = Flow[(Option[String], Int)]
.asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)

SourceWithContext
.fromTuples(Source(data)).via(
FlowWithContext.unsafeOptionalDataVia(
flow,
Flow[String].map(_.toInt)
)(Keep.none)
)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

Source.optionalVia(
Source(data),
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

"A Source.run" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val source = SourceWithContext.fromTuples(Source(data))

SourceWithContext.unsafeOptionalDataVia(
source,
Flow[String].map(_.toInt)
)(Keep.none)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ object Flow {
def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] =
Flow.create[I]().map(f)

/**
* Creates a Flow from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): Flow[FIn, Optional[FViaOut], Mat] =
scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/** Create a `Flow` which can process elements of type `T`. */
def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -25,6 +26,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object FlowWithContext {
Expand All @@ -39,6 +41,38 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] =
scaladsl.FlowWithContext.unsafeOptionalDataVia(flow.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,32 @@ object Source {
def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.cycle(() => f.create().asScala))

/**
* Creates a Source from an existing base Source outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of source and viaFlow
* @return a Source with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): Source[Optional[FOut], Mat] =
scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/**
* Helper to create [[Source]] from `Iterable`.
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -28,6 +29,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object SourceWithContext {
Expand All @@ -38,6 +40,38 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}

/**
* Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of source and viaFlow
* @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): SourceWithContext[Optional[FOut], Ctx, Mat] =
scaladsl.SourceWithContext.unsafeOptionalDataVia(source.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
51 changes: 51 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,57 @@ object Flow {
*/
def fromFunction[A, B](f: A => B): Flow[A, B, NotUsed] = apply[A].map(f)

/**
* Creates a FlowW from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Option which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat])(
combine: (FMat, FViaMat) => Mat
): Flow[FIn, Option[FViaOut], Mat] =
Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { implicit b => (s, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Option[FOut]](2))
val merge = b.add(Merge[Option[FViaOut]](2))

val filterAvailable = Flow[Option[FOut]].collect {
case Some(f) => f
}

val filterUnavailable = Flow[Option[FOut]].filter { opt =>
opt.isEmpty
}.map {
_ => Option.empty[FViaOut]
}

val mapIntoOption = Flow[FViaOut].map {
f => Some(f)
}

s ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0)
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)

FlowShape(s.in, merge.out)
})

/**
* A graph with the shape of a flow logically is a flow, this method makes
* it so also in type.
Expand Down
Loading

0 comments on commit a260bbe

Please sign in to comment.