Skip to content

Commit

Permalink
Add concatArrayEager operator
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Sep 2, 2021
1 parent 55793ec commit 7b44635
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 23 deletions.
17 changes: 0 additions & 17 deletions .travis.yml

This file was deleted.

21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Table of contents
- Sources
- `range`
- `timer`
- [`concatArrayEager`](#concatarrayeager)
- Intermediate Flow operators (`FlowExtensions`)
- `Flow.concatWith`
- `Flow.groupBy`
Expand Down Expand Up @@ -259,4 +260,24 @@ uws.take(5).collect { println(it) }

// prints lines 11..15
uws.take(5).collect { println(it) }
```

## concatArrayEager

Launches all at once and emits all items from a source before items of the next are emitted.

For example, given two sources, if the first is slow, the items of the second won't be emitted until the first has
finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those
items in the order their respective `Flow`s are listed.

Note that each source is consumed in an unbounded manner and thus, depending on the speed of
the current source and the collector, the operator may retain items longer and may use more memory
during its execution.

```kotlin
concatArrayEager(
range(1, 5).onStart { delay(200) },
range(6, 5)
)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
```
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
GROUP=com.github.akarnokd
VERSION_NAME=0.0.8
VERSION_NAME=0.0.9

POM_ARTIFACT_ID=kotlin-flow-extensions
POM_NAME=Kotlin Flow Extensions
Expand Down
13 changes: 11 additions & 2 deletions src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ fun <T> Flow<T>.toList() : Flow<List<T>> {
* Drops items from the upstream when the downstream is not ready to receive them.
*/
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
fun <T> Flow<T>.onBackpressureDrop() : Flow<T> = FlowOnBackpressureDrop(this)

/**
* Maps items from the upstream to [Flow] and relays its items while dropping upstream items
Expand All @@ -211,12 +211,21 @@ fun <T, R> Flow<T>.flatMapDrop(mapper: suspend (T) -> Flow<R>) : Flow<R> = FlowF
@FlowPreview
fun <T> mergeArray(vararg sources: Flow<T>) : Flow<T> = FlowMergeArray(sources)

/**
* Launches all [sources] at once and emits all items from a source before items of the next are emitted.
* Note that each source is consumed in an unbounded manner and thus, depending on the speed of
* the current source and the collector, the operator may retain items longer and may use more memory
* during its execution.
*/
@FlowPreview
fun <T> concatArrayEager(vararg sources: Flow<T>) : Flow<T> = FlowConcatArrayEager(sources)

// -----------------------------------------------------------------------------------------
// Parallel Extensions
// -----------------------------------------------------------------------------------------

/**
* Consumes the upstream and dispatches individual items to a parrallel rail
* Consumes the upstream and dispatches individual items to a parallel rail
* of the parallel flow for further consumption.
*/
fun <T> Flow<T>.parallel(parallelism: Int, runOn: (Int) -> CoroutineDispatcher) : ParallelFlow<T> =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2019-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicIntegerArray

@FlowPreview
class FlowConcatArrayEager<T>(private val sources: Array<out Flow<T>>) : AbstractFlow<T>() {

@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
val n = sources.size
val queues = Array(n) { ConcurrentLinkedQueue<T>() }
val done = AtomicIntegerArray(n)
var index = 0;
val reader = Resumable()

coroutineScope {
for (i in 0 until n) {
val f = sources[i]
val q = queues[i]
val j = i
launch {
try {
f.collect {
q.offer(it)
reader.resume()
}
} finally {
done.set(j, 1)
reader.resume()
}
}
}


while (isActive && index < n) {
val q = queues[index]
val d = done.get(index) != 0

if (d && q.isEmpty()) {
index++
continue
}

val v = q.poll()
if (v != null) {
collector.emit(v)
continue;
}

reader.await()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.assertResult
import hu.akarnokd.kotlin.flow.concatArrayEager
import hu.akarnokd.kotlin.flow.range
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals

@FlowPreview
class FlowConcatArrayEagerTest {
@Test
fun basic() = runBlocking {
val state1 = AtomicInteger()
val state2 = AtomicInteger()
concatArrayEager(
range(1, 5).onStart {
delay(200)
state1.set(1)
}.onEach { println(it) },
range(6, 5).onStart {
state2.set(state1.get())
}.onEach { println(it) },
)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

assertEquals(0, state2.get())
}

@Test
fun basic1() = runBlocking {
concatArrayEager(
range(1, 5)
)
.assertResult(1, 2, 3, 4, 5)

}

@Test
fun take() = runBlocking {
concatArrayEager(
range(1, 5).onStart { delay(100) },
range(6, 5)
)
.take(6)
.assertResult(1, 2, 3, 4, 5, 6)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.assertResult
import hu.akarnokd.kotlin.flow.onBackpressurureDrop
import hu.akarnokd.kotlin.flow.startCollectOn
import hu.akarnokd.kotlin.flow.onBackpressureDrop
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
Expand All @@ -35,7 +34,7 @@ class FlowOnBackpressureDropTest {
delay(100)
}
}
.onBackpressurureDrop()
.onBackpressureDrop()
.map {
delay(130)
it
Expand Down

0 comments on commit 7b44635

Please sign in to comment.