Skip to content

Commit

Permalink
feat: Add Sink.none operator
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 21, 2024
1 parent 7184dad commit 99c8d0b
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 0 deletions.
47 changes: 47 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/none.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Sink.none

A `Sink` that will test the given predicate `p` for every received element and completes with the result.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.none](Sink$) { scala="#none[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#none(org.apache.pekko.japi.function.Predicate)" }

## Description
none operator applies a predicate function to assert each element received, it returns false if any element satisfy the assertion, otherwise it returns true.

It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.

Notes that if source is empty, it will return true

A `Sink` that will test the given predicate `p` for every received element and

- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is false for all elements;
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements);
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is true for any element.

The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.

## Example

This example tests all elements in the stream is `<=` 100.

Scala
: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala) { #none }

Java
: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java) { #none }

## Reactive Streams Semantics

@@@div { .callout }

***Completes*** when upstream completes or the predicate `p` returns `true`

**cancels** when predicate `p` returns `true`

**backpressures** when the invocation of predicate `p` has not yet completed

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|Sink|<a name="never"></a>@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.|
|Sink|<a name="none"></a>@ref[none](Sink/none.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
Expand Down Expand Up @@ -555,6 +556,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [monitor](Source-or-Flow/monitor.md)
* [never](Source/never.md)
* [never](Sink/never.md)
* [none](Sink/none.md)
* [onComplete](Sink/onComplete.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
Expand Down
41 changes: 41 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sink;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import java.util.concurrent.TimeUnit;

public class NoneMatch {
private ActorSystem system = null;

public void noneUsage() throws Exception {
// #none
final boolean noneMatch =
Source.range(1, 100)
.runWith(Sink.none(elem -> elem > 100), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
System.out.println(noneMatch);
// Expect prints:
// true
// #none
}
}
40 changes: 40 additions & 0 deletions docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sink

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }

object NoneMatch {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def noneExample(): Unit = {
// #none
val result: Future[Boolean] =
Source(1 to 100)
.runWith(Sink.none(_ > 100))
val noneMatch = Await.result(result, 3.seconds)
println(noneMatch)
// Expect prints:
// true
// #none
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ public void sinkMustBeAbleToUseForall()
assertTrue(allMatch);
}

@Test
public void sinkMustBeAbleToUseNoneMatch()
throws InterruptedException, ExecutionException, TimeoutException {
CompletionStage<Boolean> cs =
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.none(param -> param < 0), system);
boolean noneMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
assertTrue(noneMatch);
}

@Test
public void sinkMustBeAbleToUseForExists()
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,46 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {

}

"The none sink" must {

"completes with `ture` when all elements not match" in {
Source(1 to 4)
.runWith(Sink.none(_ < 0))
.futureValue shouldBe true
}

"completes with `false` when any element match" in {
Source(1 to 4)
.runWith(Sink.none(_ > 2))
.futureValue shouldBe false
}

"completes with `true` if the stream is empty" in {
Source.empty[Int]
.runWith(Sink.none(_ > 2))
.futureValue shouldBe true
}

"completes with `Failure` if the stream failed" in {
Source.failed[Int](new RuntimeException("Oops"))
.runWith(Sink.none(_ > 2))
.failed.futureValue shouldBe a[RuntimeException]
}

"completes with `false` with restart strategy" in {
val sink = Sink.none[Int](elem => {
if (elem == 2) {
throw new RuntimeException("Oops")
}
elem > 1
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))

Source(1 to 3)
.runWith(sink)
.futureValue shouldBe false
}
}

"The exists sink" must {

"completes with `false` when none element match" in {
Expand Down
25 changes: 25 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ object Sink {
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is false for all elements;
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is true for any element.
*
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.3
*/
def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.none[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
}

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;
Expand Down
24 changes: 24 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,30 @@ object Sink {
.toMat(Sink.head)(Keep.right)
.named("forallSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is false for all elements;
* 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements);
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is true for any element.
*
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Completes when''' upstream completes or the predicate `p` returns `true`
*
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
*
* '''Cancels when''' predicate `p` returns `true`
*
* @since 1.1.3
*/
def none[T](p: T => Boolean): Sink[T, Future[Boolean]] =
Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && !p(_))
.toMat(Sink.head)(Keep.right)
.named("noneSink")

/**
* A `Sink` that will test the given predicate `p` for every received element and
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;
Expand Down

0 comments on commit 99c8d0b

Please sign in to comment.