Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uncurried Pekko Stream ops in javadsl #1406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
def groupedWeighted(minWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

Expand Down Expand Up @@ -1311,7 +1311,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, Out, Mat] = {
new Flow(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -1355,7 +1355,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -1386,7 +1386,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -1412,7 +1412,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -1464,7 +1464,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -2636,7 +2636,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -2697,7 +2697,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -3497,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] =
new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })

Expand Down Expand Up @@ -4169,7 +4169,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.watchTermination()((left, right) => matF(left, right.asJava)))

/**
Expand All @@ -4180,7 +4180,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.monitorMat(combinerToScala(combine)))

/**
Expand Down Expand Up @@ -4504,7 +4504,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Flow[In, Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ object GraphDSL extends GraphCreate {
new GenericGraph(s, gbuilder.delegate.result(s))
}

final class Builder[+Mat]()(private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self =>
final class Builder[+Mat](private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do implicit args work when calling from Java? Is it possible that this constructor is not meant to be used directly by Java users, that is for use by internal Pekko code?

Copy link
Contributor Author

@mdedetrich mdedetrich Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implicit args also get flattened to flat parameter lists since under the hood they are also implemented via currying (with only Scala having the ability to pass those arguments implicitly).

Thats what I figured out, any type of currying (whether its done directly or via implicits) is just translated to a standard flattened parameter list for JVM bytecode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But do you think that this constructor is to meant to be called by external Java users? It looks like an internal Pekko use only constructor.

Copy link
Contributor Author

@mdedetrich mdedetrich Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the surface you seem to be right although it is referred to in multiple places in the docs.

In any case even if the constructor is only meant for internal Pekko usage, this change is harmless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is fine, but it does break some binary compatibility, I'm using Javadsl only in Java, so I think that should also apply to others.

Copy link
Contributor Author

@mdedetrich mdedetrich Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it breaks binary compatibility MiMa would detect it.

In Java its just called as a one arg method

import pekko.stream.scaladsl.GraphDSL.Implicits._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object Sink {
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def foreachAsync[T](parallelism: Int)(
def foreachAsync[T](parallelism: Int,
f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
new Sink(
scaladsl.Sink
Expand All @@ -225,7 +225,7 @@ object Sink {
@deprecated(
"Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.",
since = "Akka 2.5.17")
def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(
def foreachParallel[T](parallel: Int, f: function.Procedure[T],
ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())

Expand Down
25 changes: 12 additions & 13 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U,
matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] =
new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })

Expand Down Expand Up @@ -3002,7 +3002,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(costFn: java.util.function.Function[Out, java.lang.Long])
def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))

Expand Down Expand Up @@ -3055,9 +3055,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.Source[Out, Mat] =
new Source(delegate.limitWeighted(n)(costFn.apply))
}

/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
Expand Down Expand Up @@ -3099,7 +3098,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -3130,7 +3129,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* See also [[FlowOps#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -3156,7 +3155,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -3206,7 +3205,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -4142,7 +4141,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -4202,7 +4201,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy, p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))

/**
Expand Down Expand Up @@ -4738,7 +4737,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
def watchTermination[M](matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()((left, right) => matF(left, right.asJava)))

/**
Expand All @@ -4748,7 +4747,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
@deprecated("Use monitor() or monitorMat(combine) instead", "Akka 2.5.17")
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
def monitor[M](combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
new Source(delegate.monitorMat(combinerToScala(combine)))

/**
Expand Down Expand Up @@ -5052,7 +5051,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object StreamConverters {
* Note that a flow can be materialized multiple times, so the function producing the ``Collector`` must be able
* to handle multiple invocations.
*/
def javaCollectorParallelUnordered[T, R](parallelism: Int)(
def javaCollectorParallelUnordered[T, R](parallelism: Int,
collector: function.Creator[Collector[T, _ <: Any, R]]): Sink[T, CompletionStage[R]] =
new Sink(
scaladsl.StreamConverters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
def groupedWeighted(minWeight: Long,
costFn: function.Function[Out, java.lang.Long]): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

Expand Down Expand Up @@ -691,7 +691,7 @@ class SubFlow[In, Out, Mat](
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubFlow[In, Out, Mat] = {
new SubFlow(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -735,7 +735,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -766,7 +766,7 @@ class SubFlow[In, Out, Mat](
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -792,7 +792,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -844,7 +844,7 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -3031,7 +3031,7 @@ class SubFlow[In, Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubFlow[In, Emit, Mat] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long)(
def groupedWeighted(minWeight: Long,
costFn: function.Function[Out, java.lang.Long]): SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step

Expand Down Expand Up @@ -697,7 +697,7 @@ class SubSource[Out, Mat](
*
* See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def limitWeighted(n: Long)(costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
def limitWeighted(n: Long, costFn: function.Function[Out, java.lang.Long]): javadsl.SubSource[Out, Mat] = {
new SubSource(delegate.limitWeighted(n)(costFn.apply))
}

Expand Down Expand Up @@ -726,7 +726,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
def scan[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.scan(zero)(f.apply))

/**
Expand Down Expand Up @@ -757,7 +757,7 @@ class SubSource[Out, Mat](
*
* See also [[#scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
def scanAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.scanAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand All @@ -783,7 +783,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
def fold[T](zero: T, f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.fold(zero)(f.apply))

/**
Expand Down Expand Up @@ -831,7 +831,7 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
def foldAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
def foldAsync[T](zero: T, f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.foldAsync(zero) { (out, in) =>
f(out, in).asScala
})
Expand Down Expand Up @@ -3002,7 +3002,7 @@ class SubSource[Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@ApiMayChange
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg])(
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.SubSource[Emit, Mat] =
Expand Down