Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Extensions #30

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package com.trendyol.transmission.components
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.trendyol.transmission.Transmission
import com.trendyol.transmission.effect.RouterEffect
import com.trendyol.transmission.components.features.colorpicker.ColorPickerTransformer
import com.trendyol.transmission.components.features.input.InputTransformer
import com.trendyol.transmission.router.TransmissionRouter
import com.trendyol.transmission.router.onEach
import com.trendyol.transmission.router.toState
import com.trendyol.transmission.components.features.ColorPickerUiState
import com.trendyol.transmission.components.features.InputUiState
import com.trendyol.transmission.components.features.MultiOutputUiState
import com.trendyol.transmission.components.features.OutputUiState
import com.trendyol.transmission.components.features.colorpicker.ColorPickerTransformer
import com.trendyol.transmission.components.features.input.InputTransformer
import com.trendyol.transmission.effect.RouterEffect
import com.trendyol.transmission.router.TransmissionRouter
import com.trendyol.transmission.router.asState
import com.trendyol.transmission.router.streamData
import com.trendyol.transmission.router.streamDataAsState
import com.trendyol.transmission.router.streamEffect
import dagger.hilt.android.lifecycle.HiltViewModel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
Expand All @@ -27,30 +29,30 @@ class ComponentViewModel @Inject constructor(
private val router: TransmissionRouter
) : ViewModel() {

val inputUiState = router.dataStream
.onEach<InputUiState> { _transmissionList.value = listOf() }
.toState(viewModelScope, InputUiState())
val inputUiState = router
.streamData<InputUiState> { _transmissionList.value = listOf() }
.asState(viewModelScope, InputUiState())

val outputUiState = router.dataStream.toState(viewModelScope, OutputUiState())
val colorPickerUiState = router.dataStream.toState(viewModelScope, ColorPickerUiState())
val multiOutputUiState = router.dataStream.toState(viewModelScope, MultiOutputUiState())
val outputUiState = router.streamDataAsState(viewModelScope, OutputUiState())
val colorPickerUiState = router.streamDataAsState(viewModelScope, ColorPickerUiState())
val multiOutputUiState = router.streamDataAsState(viewModelScope, MultiOutputUiState())

private val _transmissionList = MutableStateFlow<List<String>>(emptyList())
val transmissionList = _transmissionList.asStateFlow()

init {
viewModelScope.launch {
launch {
router.dataStream.collect(::onData)
router.streamData().collect(::onData)
}
launch {
router.effectStream.collect(::onEffect)
router.streamEffect().collect(::onEffect)
}
}
}

fun processSignal(signal: Transmission.Signal) {
router.processSignal(signal)
router.process(signal)
_transmissionList.update { it.plus("Signal: $signal") }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.trendyol.transmission.Transmission
import com.trendyol.transmission.router.builder.TransmissionRouterBuilder
import com.trendyol.transmission.router.streamData
import dagger.hilt.android.lifecycle.HiltViewModel
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -34,12 +35,12 @@ class CounterViewModel @Inject constructor() : ViewModel() {

init {
viewModelScope.launch(Dispatchers.Default) {
router.dataStream.collect(::onData)
router.streamData().collect(::onData)
}
}

fun processSignal(signal: Transmission.Signal) {
router.processSignal(signal)
router.process(signal)
_areAllDistinct.tryEmit("Calculating")
checkTransmissions()
counter.addAndGet(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.trendyol.transmissiontest
import com.trendyol.transmission.Transmission
import com.trendyol.transmission.router.TransmissionRouter
import com.trendyol.transmission.router.builder.TransmissionRouterBuilder
import com.trendyol.transmission.router.streamData
import com.trendyol.transmission.router.streamEffect
import com.trendyol.transmission.transformer.Transformer
import com.trendyol.transmission.transformer.request.Contract
import com.trendyol.transmissiontest.computation.ComputationTransformer
Expand Down Expand Up @@ -72,10 +74,10 @@ class TestSuite {
val effectStream: MutableList<Transmission.Effect> = mutableListOf()
try {
backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
router.dataStream.toList(dataStream)
router.streamData().toList(dataStream)
}
backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
router.effectStream.toList(effectStream)
router.streamEffect().toList(effectStream)
}
val testScope = object : TransformerTestScope {
override val dataStream: List<Transmission.Data> = dataStream
Expand All @@ -84,15 +86,15 @@ class TestSuite {
orderedInitialProcessing.forEach {
when (it) {
is Transmission.Data -> throw IllegalArgumentException("Transmission.Data should not be sent for processing")
is Transmission.Effect -> router.processEffect(it)
is Transmission.Signal -> router.processSignal(it)
is Transmission.Effect -> router.process(it)
is Transmission.Signal -> router.process(it)
}
transformer?.waitProcessingToFinish()
}
if (transmission is Transmission.Signal) {
router.processSignal(transmission)
router.process(transmission)
} else if (transmission is Transmission.Effect) {
router.processEffect(transmission)
router.process(transmission)
}
transformer?.waitProcessingToFinish()
testScope.scope(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,58 @@ package com.trendyol.transmission.router
import com.trendyol.transmission.Transmission
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.stateIn

inline fun <reified T : Transmission.Data?> SharedFlow<Transmission.Data?>.toState(
@JvmName("streamData")
fun TransmissionRouter.streamData(): Flow<Transmission.Data> {
return this.dataStream
}

@JvmName("streamDataWithType")
inline fun <reified T : Transmission.Data> TransmissionRouter.streamData(): Flow<T> {
return this.dataStream.filterIsInstance<T>()
}

@JvmName("streamDataWithAction")
inline fun <reified T : Transmission.Data> TransmissionRouter.streamData(
noinline action: suspend (T) -> Unit
): Flow<T> {
return this.dataStream.filterIsInstance<T>().onEach(action)
}

@JvmName("streamEffect")
inline fun <reified T : Transmission.Effect> TransmissionRouter.streamEffect(): Flow<T> {
return this.effectStream.filterIsInstance<T>()
}

@JvmName("streamEffectWithType")
fun TransmissionRouter.streamEffect(): Flow<Transmission.Effect> {
return this.effectStream
}

@JvmName("streamEffectWithAction")
inline fun <reified T : Transmission.Effect> TransmissionRouter.streamEffect(
noinline action: suspend (T) -> Unit
): Flow<T> {
return this.effectStream.filterIsInstance<T>().onEach(action)
}

inline fun <reified T : Transmission.Data> TransmissionRouter.streamDataAsState(
scope: CoroutineScope,
initialValue: T,
sharingStarted: SharingStarted = SharingStarted.WhileSubscribed(),
): StateFlow<T> {
return this.filterIsInstance<T>().stateIn(scope, sharingStarted, initialValue)
return this.dataStream.filterIsInstance<T>().stateIn(scope, sharingStarted, initialValue)
}

inline fun <reified T : Transmission.Data?> Flow<Transmission.Data?>.toState(
inline fun <reified T : Transmission.Data> Flow<T>.asState(
scope: CoroutineScope,
initialValue: T,
sharingStarted: SharingStarted = SharingStarted.WhileSubscribed(),
): StateFlow<T> {
return this.filterIsInstance<T>().stateIn(scope, sharingStarted, initialValue)
}

inline fun <reified T : Transmission.Data?> Flow<Transmission.Data?>.onEach(
noinline action: suspend (T) -> Unit
): Flow<T> {
return this.filterIsInstance<T>().onEach(action)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ class TransmissionRouter internal constructor(

private val checkpointTracker = CheckpointTracker()

val dataStream = dataBroadcast.output
val effectStream: SharedFlow<Transmission.Effect> = effectBroadcast.output.map { it.effect }
.shareIn(routerScope, SharingStarted.WhileSubscribed())
@PublishedApi
internal val dataStream = dataBroadcast.output

@PublishedApi
internal val effectStream: SharedFlow<Transmission.Effect> =
effectBroadcast.output.map { it.effect }
.shareIn(routerScope, SharingStarted.WhileSubscribed())

private val _requestDelegate = RequestDelegate(
queryScope = routerScope,
Expand Down Expand Up @@ -74,13 +78,13 @@ class TransmissionRouter internal constructor(
initializeInternal(loader)
}

fun processSignal(signal: Transmission.Signal) {
fun process(signal: Transmission.Signal) {
routerScope.launch {
signalBroadcast.producer.send(signal)
}
}

fun processEffect(effect: Transmission.Effect) {
fun process(effect: Transmission.Effect) {
routerScope.launch {
effectBroadcast.producer.send(EffectWrapper(effect))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TransmissionRouterTest {
addDispatcher(testDispatcher)
}
// When
sut.processSignal(TestSignal)
sut.process(TestSignal)

// Then
assertEquals(transformer.signalList.last(), TestSignal)
Expand All @@ -90,7 +90,7 @@ class TransmissionRouterTest {
addDispatcher(testDispatcher)
}
// When
sut.processSignal(TestSignal)
sut.process(TestSignal)

// Then
assertEquals(transformer1.signalList.last(), TestSignal)
Expand All @@ -113,7 +113,7 @@ class TransmissionRouterTest {
addDispatcher(testDispatcher)
}
// When
sut.processSignal(TestSignal)
sut.process(TestSignal)

// Then
assertEquals(transformer1.effectList.last(), TestEffect)
Expand All @@ -135,7 +135,7 @@ class TransmissionRouterTest {
}
// When
val effects = sut.effectStream.testIn(backgroundScope)
sut.processSignal(TestSignal)
sut.process(TestSignal)
assertEquals(6, effects.cancelAndConsumeRemainingEvents().size)
// Then
}
Expand All @@ -154,7 +154,7 @@ class TransmissionRouterTest {
addDispatcher(testDispatcher)
}
// When
sut.processSignal(TestSignal)
sut.process(TestSignal)
sut.dataStream.test {
assertEquals(TestData("update with TestTransformer1"), awaitItem())
assertEquals(TestData("update with TestTransformer2"), awaitItem())
Expand All @@ -176,7 +176,7 @@ class TransmissionRouterTest {
addDispatcher(testDispatcher)
}
// When
sut.processSignal(TestSignal)
sut.process(TestSignal)

// Then
assertEquals(transformer1.effectList.contains(RouterEffect("")), false)
Expand Down