Skip to content

Commit

Permalink
Add optionalVia and unsafeOptionalDataVia
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Aug 4, 2024
1 parent 5bf60da commit 517a8aa
Show file tree
Hide file tree
Showing 17 changed files with 502 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The Stream API has been updated to add some extra functions.
* add Sink.forall operator ([PR989](https://github.com/apache/pekko/pull/989))
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# optionalVia

For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Source.optionalVia](Source$) { scala="#optionalVia%5BSOut,FOut,SMat,FMat,Mat](source:org.apache.pekko.stream.scaladsl.Source%5BOption%5BSOut],SMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BSOut,FOut,FMat])(combine:(SMat,FMat)=%3EMat):org.apache.pekko.stream.scaladsl.Source%5BOption%5BFOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Source,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }
@apidoc[Flow.optionalVia](Flow$) { scala="#optionalVia%5BFIn,FOut,FViaOut,FMat,FViaMat,Mat](flow:org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFOut],FMat],viaFlow:org.apache.pekko.stream.scaladsl.Flow%5BFOut,FViaOut,FViaMat])(combine:(FMat,FViaMat)=%3EMat):org.apache.pekko.stream.scaladsl.Flow%5BFIn,Option%5BFViaOut],Mat]" java="#optionalVia(org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.stream.javadsl.Flow,org.apache.pekko.japi.function.Function2)" }

## Description

For a stream containing optio nal elements, transforms each element by applying
the given `viaFlow` and passing the value downstream as an optional value.

Scala
: @@snip [OptionalVia.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/OptionalVia.scala) { #optionalVia }

Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #optionalVia }

## Reactive Streams semantics

@@@div { .callout }

**emits** while the provided viaFlow is runs with defined elements

**backpressures** when the viaFlow runs for the defined elements and downstream backpressures

**completes** when the upstream completes

@@@

2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|Source/Flow|<a name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.|
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
Expand Down Expand Up @@ -558,6 +559,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [optionalVia](Source-or-Flow/optionalVia.md)
* [orElse](Source-or-Flow/orElse.md)
* [Partition](Partition.md)
* [prefixAndTail](Source-or-Flow/prefixAndTail.md)
Expand Down
17 changes: 17 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,23 @@ void foldAsyncExample() {
// #foldAsync
}

void optionalViaExample() {

// #optionalVia
Flow<String, Integer, NotUsed> flow = Flow.fromFunction(Integer::parseInt);

Source<Optional<String>, NotUsed> source =
Source.from(
Arrays.asList(Optional.of("1"), Optional.empty(), Optional.empty(), Optional.of("4")));

Source.optionalVia(source, flow, Keep.none()).runForeach(System.out::println, system);
// Optional[1]
// Optional.empty
// Optional.empty
// Optional[4]
// #optionalVia
}

void takeExample() {
// #take
Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sourceorflow

object OptionalVia {
def optionalViaExample(): Unit = {
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Flow, Keep, Source }

implicit val system: ActorSystem = ActorSystem()

// #optionalVia
Source.optionalVia(
Source(List(Some("1"), None, None, Some("4"))),
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none).runForeach(println)
// Some(1)
// None
// None
// Some(4)
// #optionalVia
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,22 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
"should be created from a function easily" in {
Source(0 to 9).via(Flow.fromFunction(_ + 1)).runWith(Sink.seq).futureValue should ===(1 to 10)
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

val flow = Flow[Option[String]]

Source(data).via(
Flow.optionalVia(
flow,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
).runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,25 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val flow = Flow[(Option[String], Int)]
.asFlowWithContext[Option[String], Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)

SourceWithContext
.fromTuples(Source(data)).via(
FlowWithContext.unsafeOptionalDataVia(
flow,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
import Attributes._
val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("")
}

"Apply a viaFlow with optional elements using optionalVia" in {
val data = List(Some("1"), None, None, Some("4"))

Source.optionalVia(
Source(data),
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
.runWith(TestSink.probe[Option[Int]])
.request(4)
.expectNext(Some(1), None, None, Some(4))
.expectComplete()
}
}

"A Source.run" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,20 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}

"Apply a viaFlow with optional elements using unsafeOptionalVia" in {
val data = List((Some("1"), 1), (None, 2), (None, 3), (Some("4"), 4))

val source = SourceWithContext.fromTuples(Source(data))

SourceWithContext.unsafeOptionalDataVia(
source,
Flow.fromFunction { (string: String) => string.toInt }
)(Keep.none)
.runWith(TestSink.probe[(Option[Int], Int)])
.request(4)
.expectNext((Some(1), 1), (None, 2), (None, 3), (Some(4), 4))
.expectComplete()
}
}
}
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ object Flow {
def fromFunction[I, O](f: function.Function[I, O]): javadsl.Flow[I, O, NotUsed] =
Flow.create[I]().map(f)

/**
* Creates a Flow from an existing base Flow outputting an optional element and
* applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of flow and viaFlow
* @return a Flow with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Optional[FOut], FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): Flow[FIn, Optional[FViaOut], Mat] =
scaladsl.Flow.optionalVia(flow.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/** Create a `Flow` which can process elements of type `T`. */
def of[T](@unused clazz: Class[T]): javadsl.Flow[T, T, NotUsed] = create[T]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -25,6 +26,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object FlowWithContext {
Expand All @@ -39,6 +41,38 @@ object FlowWithContext {
under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(under)

/**
* Creates a FlowWithContext from an existing base FlowWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param flow The base flow that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of flow and viaFlow
* @return a FlowWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original flow's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[FIn, FOut, FViaOut, Ctx, FMat, FViaMat, Mat](
flow: FlowWithContext[FIn, Ctx, Optional[FOut], Ctx, FMat],
viaFlow: Flow[FOut, FViaOut, FViaMat],
combine: function.Function2[FMat, FViaMat, Mat]
): FlowWithContext[FIn, Ctx, Optional[FViaOut], Ctx, Mat] =
scaladsl.FlowWithContext.unsafeOptionalDataVia(flow.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
26 changes: 26 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,32 @@ object Source {
def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.cycle(() => f.create().asScala))

/**
* Creates a Source from an existing base Source outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined.
* @param combine How to combine the materialized values of source and viaFlow
* @return a Source with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
def optionalVia[SOut, FOut, SMat, FMat, Mat](source: Source[Optional[SOut], SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): Source[Optional[FOut], Mat] =
scaladsl.Source.optionalVia(source.map(_.toScala).asScala, viaFlow.asScala)(combinerToScala(combine)).map(_.toJava).asJava

/**
* Helper to create [[Source]] from `Iterable`.
* Example usage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.apache.pekko.stream.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -28,6 +29,7 @@ import pekko.stream._
import pekko.util.ConstantFun
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

object SourceWithContext {
Expand All @@ -38,6 +40,38 @@ object SourceWithContext {
def fromPairs[Out, CtxOut, Mat](under: Source[Pair[Out, CtxOut], Mat]): SourceWithContext[Out, CtxOut, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.fromTuples(under.asScala.map(_.toScala)))
}

/**
* Creates a SourceWithContext from an existing base SourceWithContext outputting an optional element
* and applying an additional viaFlow only if the element in the stream is defined.
*
* '''Emits when''' the provided viaFlow is runs with defined elements
*
* '''Backpressures when''' the viaFlow runs for the defined elements and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param source The base source that outputs an optional element
* @param viaFlow The flow that gets used if the optional element in is defined. This flow only works
* on the data portion of flow and ignores the context so this flow *must* not re-order,
* drop or emit multiple elements for one incoming element
* @param combine How to combine the materialized values of source and viaFlow
* @return a SourceWithContext with the viaFlow applied onto defined elements of the flow. The output value
* is contained within an Optional which indicates whether the original source's element had viaFlow
* applied.
* @since 1.1.0
*/
@ApiMayChange
def unsafeOptionalDataVia[SOut, FOut, Ctx, SMat, FMat, Mat](source: SourceWithContext[Optional[SOut], Ctx, SMat],
viaFlow: Flow[SOut, FOut, FMat],
combine: function.Function2[SMat, FMat, Mat]
): SourceWithContext[Optional[FOut], Ctx, Mat] =
scaladsl.SourceWithContext.unsafeOptionalDataVia(source.map(_.toScala).asScala, viaFlow.asScala)(
combinerToScala(combine)).map(
_.toJava).asJava

}

/**
Expand Down
Loading

0 comments on commit 517a8aa

Please sign in to comment.