diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 47de59f..0000000 --- a/.travis.yml +++ /dev/null @@ -1,17 +0,0 @@ -language: java -jdk: - - openjdk8 - -before_install: - - chmod +x gradlew - - chmod +x gradle/wrapper/gradle-wrapper.jar - -# Code coverage -after_success: - - bash <(curl -s https://codecov.io/bash) - -# cache between builds -cache: - directories: - - $HOME/.m2 - - $HOME/.gradle \ No newline at end of file diff --git a/README.md b/README.md index 14f71ae..0c493dd 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Table of contents - Sources - `range` - `timer` + - [`concatArrayEager`](#concatarrayeager) - Intermediate Flow operators (`FlowExtensions`) - `Flow.concatWith` - `Flow.groupBy` @@ -259,4 +260,24 @@ uws.take(5).collect { println(it) } // prints lines 11..15 uws.take(5).collect { println(it) } +``` + +## concatArrayEager + +Launches all at once and emits all items from a source before items of the next are emitted. + +For example, given two sources, if the first is slow, the items of the second won't be emitted until the first has +finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those +items in the order their respective `Flow`s are listed. + +Note that each source is consumed in an unbounded manner and thus, depending on the speed of +the current source and the collector, the operator may retain items longer and may use more memory +during its execution. + +```kotlin +concatArrayEager( + range(1, 5).onStart { delay(200) }, + range(6, 5) +) +.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) ``` \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 29db2be..689c519 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ GROUP=com.github.akarnokd -VERSION_NAME=0.0.8 +VERSION_NAME=0.0.9 POM_ARTIFACT_ID=kotlin-flow-extensions POM_NAME=Kotlin Flow Extensions diff --git a/src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt b/src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt index 526b9d2..e890392 100644 --- a/src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt +++ b/src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt @@ -196,7 +196,7 @@ fun Flow.toList() : Flow> { * Drops items from the upstream when the downstream is not ready to receive them. */ @FlowPreview -fun Flow.onBackpressurureDrop() : Flow = FlowOnBackpressureDrop(this) +fun Flow.onBackpressureDrop() : Flow = FlowOnBackpressureDrop(this) /** * Maps items from the upstream to [Flow] and relays its items while dropping upstream items @@ -211,12 +211,21 @@ fun Flow.flatMapDrop(mapper: suspend (T) -> Flow) : Flow = FlowF @FlowPreview fun mergeArray(vararg sources: Flow) : Flow = FlowMergeArray(sources) +/** + * Launches all [sources] at once and emits all items from a source before items of the next are emitted. + * Note that each source is consumed in an unbounded manner and thus, depending on the speed of + * the current source and the collector, the operator may retain items longer and may use more memory + * during its execution. + */ +@FlowPreview +fun concatArrayEager(vararg sources: Flow) : Flow = FlowConcatArrayEager(sources) + // ----------------------------------------------------------------------------------------- // Parallel Extensions // ----------------------------------------------------------------------------------------- /** - * Consumes the upstream and dispatches individual items to a parrallel rail + * Consumes the upstream and dispatches individual items to a parallel rail * of the parallel flow for further consumption. */ fun Flow.parallel(parallelism: Int, runOn: (Int) -> CoroutineDispatcher) : ParallelFlow = diff --git a/src/main/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEager.kt b/src/main/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEager.kt new file mode 100644 index 0000000..5e46ee4 --- /dev/null +++ b/src/main/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEager.kt @@ -0,0 +1,78 @@ +/* + * Copyright 2019-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.kotlin.flow.impl + +import hu.akarnokd.kotlin.flow.Resumable +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.AbstractFlow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import kotlinx.coroutines.flow.collect +import java.util.concurrent.atomic.AtomicIntegerArray + +@FlowPreview +class FlowConcatArrayEager(private val sources: Array>) : AbstractFlow() { + + @InternalCoroutinesApi + override suspend fun collectSafely(collector: FlowCollector) { + val n = sources.size + val queues = Array(n) { ConcurrentLinkedQueue() } + val done = AtomicIntegerArray(n) + var index = 0; + val reader = Resumable() + + coroutineScope { + for (i in 0 until n) { + val f = sources[i] + val q = queues[i] + val j = i + launch { + try { + f.collect { + q.offer(it) + reader.resume() + } + } finally { + done.set(j, 1) + reader.resume() + } + } + } + + + while (isActive && index < n) { + val q = queues[index] + val d = done.get(index) != 0 + + if (d && q.isEmpty()) { + index++ + continue + } + + val v = q.poll() + if (v != null) { + collector.emit(v) + continue; + } + + reader.await() + } + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEagerTest.kt b/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEagerTest.kt new file mode 100644 index 0000000..12b056d --- /dev/null +++ b/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowConcatArrayEagerTest.kt @@ -0,0 +1,54 @@ +package hu.akarnokd.kotlin.flow.impl + +import hu.akarnokd.kotlin.flow.assertResult +import hu.akarnokd.kotlin.flow.concatArrayEager +import hu.akarnokd.kotlin.flow.range +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking +import org.junit.Test +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertEquals + +@FlowPreview +class FlowConcatArrayEagerTest { + @Test + fun basic() = runBlocking { + val state1 = AtomicInteger() + val state2 = AtomicInteger() + concatArrayEager( + range(1, 5).onStart { + delay(200) + state1.set(1) + }.onEach { println(it) }, + range(6, 5).onStart { + state2.set(state1.get()) + }.onEach { println(it) }, + ) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + + assertEquals(0, state2.get()) + } + + @Test + fun basic1() = runBlocking { + concatArrayEager( + range(1, 5) + ) + .assertResult(1, 2, 3, 4, 5) + + } + + @Test + fun take() = runBlocking { + concatArrayEager( + range(1, 5).onStart { delay(100) }, + range(6, 5) + ) + .take(6) + .assertResult(1, 2, 3, 4, 5, 6) + } +} \ No newline at end of file diff --git a/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowOnBackpressureDropTest.kt b/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowOnBackpressureDropTest.kt index 6cad05c..5bed55f 100644 --- a/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowOnBackpressureDropTest.kt +++ b/src/test/kotlin/hu/akarnokd/kotlin/flow/impl/FlowOnBackpressureDropTest.kt @@ -17,8 +17,7 @@ package hu.akarnokd.kotlin.flow.impl import hu.akarnokd.kotlin.flow.assertResult -import hu.akarnokd.kotlin.flow.onBackpressurureDrop -import hu.akarnokd.kotlin.flow.startCollectOn +import hu.akarnokd.kotlin.flow.onBackpressureDrop import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import org.junit.Test @@ -35,7 +34,7 @@ class FlowOnBackpressureDropTest { delay(100) } } - .onBackpressurureDrop() + .onBackpressureDrop() .map { delay(130) it