Hey, What's up everybody. Reactive programming has become almost a standard in the development of Android apps, as Kotlin already is. Combining the reactive paradigm with Kotlin development can help developers to keep a clean architecture in their apps in an easy way. I had given a try to sketch out certain materials in this repo that can help you to get a better perspective of understanding the basics of RxKotlin and a handful of its important operators.
- Introduction
- General idea about Observables
- Upstream and Downstream
- Creating Observable
- Creating Subscription
- Using Create operator
- Subject and non ending sequences of streams
- Filtering Operators
- Schedulers
- Transforming Operators
- Combining Operators
So, What is Rx-Kotlin? Why would you want to use it in your project?
Rx-Kotlin is an asynchronous programming library that is based on using observables. Observables are sequences of data or events that you can react to, such as data coming back from a web service, or even taps by the user. you can refer to the marble diagrams for getting a better insight about Rx's event based programming. One thing that you should keep in your mind is that, everything is a sequence in Rx
As an Android developer, you are no stranger to writing asynchronous and concurrent code in order to keep your app snappy and responsive and your user's happy. We are talking about handling things like reacting to:
- Button taps
- Keyboard animation
- Downloading data
- Processing images
- Writing to disk …and lot more.
And there are a lot of alternative in Android SDK like Thread, Runnable, AsyncTask, WorkManager ..etc. This is really where Rx-Kotlin comes into play. It allows you to write:
- Declarative
- Functional and Reactive
- Consistent patterns and operators
- Handles mutable state ( Android developers will love this :) )
- Compositional
- Decoupled
The learning curve for Rx is quite steep, this is the reason why developers often chooses other alternatives. The Rx-Kotlin is primarily based on two patterns Observable and Observer aka Publisher/subscriber, We will be creating observable sequences and observe them in a reactive manner. The primary base type in Rx-Kotlin in Observable<*> which emits an event containing elements. Subscribers can react to each event emitted. Actually, an Observable doesn't emit anything unless it has at least one Subscriber.
Observables are Typed, we cannot have multiple types emitted but the same observable. Every time an emitter publishes a new element the Subscribers will have an opportunity to do something with the emitted value or react to that event in some other way such as displaying errors(in Android perspective). Each will observable should be terminated at some point which will make it stop emitting any more events. The above mentioned is the normal termination of an observable, there is an another way by which the observables can be terminated which is when an unexpected error occurring.
So to recap of what we had seen in observables.
- Next Function () => which emits value
- Complete Function() => to flag the observables as completed.
- Error Function() => to flag if an error has occurred.
Now you have seen a glimpse what event based programming is. Now lets dive into the code
Upstream and Downstream Ex1.kt
// Upstream
Observable.just("This is my first Rx App")
// Downstream
.subscribe {
print("Observed Result: \"$it\"")
}
This program consists of two parts:
Observable.just("This is my first Rx App")
This part emits an event "This is my first Rx App" of type String.
subscribe {
print("Observed Result: \"$it\"")
}
This will collect the event from the upstream and prints them.
Next up, it's time for you to roll up your sleeves and get some more hands on with bread and butter of RxKotlin
Creating Observable Ex2.kt
For explaining things in a much simpler manner, lets take the example of Game of thrones :).
- Creating observable that generates single event with .just()
val mostPopular: Observable<String> = Observable.just(Season1)
- Creating observable that generates multiple events with .just()
val fanFav: Observable<String> = Observable.just(Season3, Season4, Season10)
- Creating observable that generates multiple events with .just() on a List
val fanFavByList = Observable.just(listOf(Season3, Season4, Season10))
- Creating observable that generates multiple events with .fromIterable
/*
* Note : the type of the fanFavByIterable will be Observable<String!>! ,
* dont get this confused with the above one.
*/
val fanFavByIterable = Observable.fromIterable(listOf(Season3, Season4, Season10))
you can also do this with the help of extension functions that rx gives us on lists.
val fanFavByIterableAlternative = listOf(Season3, Season4, Season10).toObservable()
Creating Subscription Ex3.kt
Subscription is one of the prime idea behind generation of observables, there may be certain actions that we need to do when an event is generated.
A plain Subscription will look like
val observable = Observable.just(Season1, Season2, Season3)
observable.subscribe {
println(it)
}
A Subscription doesn't always end up in happy path, there can be error states to, This is how we do it.
val observable = Observable.just(Season1, Season2, Season3, Season6)
observable.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
)
the error methods throws a Throwable, now this has one more method named onCompleted. The onCompleted method is for flagging the subscriber that the Observable had completed its event emission and will no longer emit values.
Now you had seen examples of how to emit observables, Sometimes you will want an
observable that emits no elements, just a completed event... That's Absurd!
We can do that by using .empty(). You have noticed that each observable comes with a type, as
we have .empty() with no elements, the compiler cannot infer the types. In this case, we can use unit.
val observable = Observable.empty<Unit>()
observable.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
)
As you can see the from the above example, there were no events generated. but you can see a completed event. There is one way by which you can emit nothing using the never operator.
exampleOf("Never") {
val observable = Observable.never<Any>()
observable.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
)
}
Now, what does a subscription return -> Disposable
if you want to cancel a subscription at a point of time, you can use .dispose() function.
val observable = Observable.never<Any>()
val sub = observable.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
)
sub.dispose()
The key idea behind the observables are to avoid memory leaks.
what if you have multiple disposable to be disposed, RxKotlin has a method to handle this issue -- CompositeDisposable
val compositeDisposable = CompositeDisposable()
val observable1 = Observable.never<Any>()
val observable2 = Observable.never<Any>()
val observable3 = Observable.never<Any>()
compositeDisposable.add(observable1.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
))
compositeDisposable.add(observable2.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
))
compositeDisposable.add(observable3.subscribeBy(
onNext = {
println(it)
},
onError = {
println(it)
},
onComplete = {
println("completed")
}
))
compositeDisposable.dispose()
Using Create operator Ex4.kt
we can use .create() operator for creating an observable
val observable = Observable.create<String> { emitter ->
// onNext() can be called any number of times.
emitter.onNext("Event 1")
emitter.onNext("Event 2")
emitter.onNext("Event 3")
emitter.onComplete()
}
observable.subscribeBy(
onNext = {
print(it)
},
onError = {
},
onComplete = {
}
)
//Error condition
val observable = Observable.create<String> { emitter ->
emitter.onNext("Event 1")
emitter.onNext("Event 2")
// This will only be triggered once.
emitter.onError(Error.MyError())
emitter.onNext("Event 3")
emitter.onComplete()
}
observable.subscribeBy(
onNext = {
print(it)
},
onError = {
print(it)
},
onComplete = {
}
)
There are some other reactive elements other than Observables, they are:
Can emit one Error or one Completed event (you can see the implementation of single in combination with Retrofit)
Can return one Error or one Completed event
Can emit either one Next/Error/Completed event
val single = Single.create<String> { emitter ->
emitter.onSuccess("Completed Successfully")
}
single.subscribeBy(
onSuccess = {
println(it)
},
onError = {
}
)
Subject and non ending sequences of streams Ex5.kt
Subjects are comprised of two parts :
1. Observable (just like that you have seen before)
2. Observer : This can receive new events, upon receiving new events it will be passed to the subscribers.
They are kind of like a Yin and Yang combination of Observable and Observer, This gives us capability of
working with observables that are not finite sequences. New elements can be added to the Subject at
runtime, and they will be emitted to Subscribers.
There are 3 types of subjects that we use:
This starts as an empty sequence and emits only new next events to its subscribers. In other words, Elements added to PublishSubject before the subscription will not be received by the subscriber.
val compositeDisposable = CompositeDisposable()
val seasonBroadcast = PublishSubject.create<String>()
seasonBroadcast.onNext(Season1)
seasonBroadcast.onNext(Se ason2)
val subscriber1 = seasonBroadcast.subscribeBy {
println("Subscription from subscriber 1 , data = $it")
}
compositeDisposable.add(subscriber1)
seasonBroadcast.onNext(Season3)
seasonBroadcast.onNext(Season4)
seasonBroadcast.onNext(Season5)
seasonBroadcast.onNext(Season6)
seasonBroadcast.onNext(Season7)
seasonBroadcast.onNext(Season8)
seasonBroadcast.onNext(Season9)
compositeDisposable.dispose()
seasonBroadcast.onNext(Season10)
Sometimes, you want the new subscribers to receive the most recent next event. even if they subscribe after that event was originally emitted. For this we can use BehaviourSubject. They start with an initial value, and they will replay the latest value to the new subscribers.They are Stateful (you can access the latest state anytime)
val compositeDisposable = CompositeDisposable()
val sendBroadcast = BehaviorSubject.createDefault(Season1)
/*
* You can access the behaviour subject value in the below way.
* This can particularly be helpful in UI related stuff.
*/
println("value of the Subject is :${sendBroadcast.value}")
val sub1 = sendBroadcast.subscribeBy(onNext = {
println(it)
}, onComplete = {
println("completed 1")
})
compositeDisposable.add(sub1)
compositeDisposable.clear()
sendBroadcast.onNext(Season2)
val sub2 = sendBroadcast.subscribeBy(onNext = {
println(it)
}, onComplete = {
println("completed 2")
})
compositeDisposable.add(sub2)
What if you want to replay more than just one event other than the latest value... ReplaySubject comes to the rescue. It starts empty, but is initialized with ab buffer size, It will replay upto that bufferSize to the new subscribers. We should not make the buffer size larger, because it will be held in the memory for the life of the subject.
val compositeDisposable = CompositeDisposable()
val subject = ReplaySubject.create<String>(2)
subject.onNext("event 1")
subject.onNext("event 2")
compositeDisposable.add(
subject.subscribeBy {
println(it)
}
)
subject.onNext("event 3")
Operators Ex6.kt
Operators are the building blocks of Rx, which you can use to filter, transform, process and react to events emitted by observables. You can chain this operators to perform complex operations in a very succinct and understandable way when you go back review that code later.
we can start by looking into filtering operators, which allow you to process some events but ignore others. Then we will move on to transforming operators, which allow you to manipulate events and data that are emitted by an observable in order to prepare it for subscribers.
Filtering Operator
In a nutshell, this applies conditional constraints to next events to only pass through to subscribers the elements you
want.
As shown in the marble diagram, ignoreElements() will ignore next events. However, it will allow through stop events, In
other words Completed or Error events. Allowing through stop events are usually implied in marble diagrams. We are just
explicitly calling it out this time because that's all ignoreElements will let through.
This will filter next events except the one at the specified index. This marble diagram depicts using elementAt() to
only return the 3rd next event element and ignore the rest.
RxKotlin also has a filter operator for observable sequence that works similarly to kotlin's filter function for
collections. It takes a predicate to apply to each element to determine if the element should be allowed through or not.
In this marble diagram it will allow to pass the elements that are greater than 10 only.
This will skip the count of elements that you pass for its parameter and then allow all forthcoming elements through.
This will apply a predicate and skip elements up until the predicate fails and then all future elements through. In
other words, it stops skipping once the predicate fails.
This work in a way opposite to skip. Take will wait for it, take the count of elements up to and including the number
you provided for its parameter, then stop taking any additional elements.
This will only take elements while a condition resolves to true and then stop taking any more elements.
note: it is different from filter operator. Once the condition is false, it stops taking any more elements.
However, so far, Filtering has been based on static conditions. There are also filtering operators that let you
dynamically filter elements based on some other Observables.
This will skip an element until a second Observable triggers the skipUntil operator top stop skipping.
This will keep taking elements until a second observable triggers it to stop taking.
This operator prevents contiguous duplicate to get through, so the second one in this marble diagram gets through
because the previous element was different.
This operator prevents duplicate elements to pass through
Debouncing is something that we had heard while doing UI - button clicks. yes, this is the same thing!
This operator has 2 parameters, timeout and time unit. Timeout is the window for which the filtering will be blocked and
will only take the last element in the sequence. If you can take a look at the marble diagram, you can see that debounce
time is 10, This first element is passed in. then we have 2,3,4,5 that are in an overlapping duration if we consider a
10ms time window, bacause of this, 2,3,4 will be discarded and only 5 will be filtered in.
This operator helps us to share an observable among multiple subscribers.
This covers the important operators for android use cases, there are a bit more self-explanatory operator
implementations in
Ex6.kt
By default in Rx-Kotlin(Android), Observables and the operators work on the same thread as where the subscription occurs, which is typically on the MainThread. Schedulers provide an abstraction for managing threads for changing that default behaviour. A scheduler is a context where a process takes place.
- Abstract thread management.
- Easy to use.
This directs where events are received after it is called. In a Chain of operators, what comes after a call to ObserveOn will be performed on the Scheduler that is specified
This will direct where an entire subscription is performed, regardless of where it is called.
To get a better understanding of Scheduler lets consider this marble diagram, so that it will help you visualize how it works in a real life use-case
I'm starting out on the main thread, which is represented by the bottom green timeline. I'll call ObserveOn and specify purpleScheduler. Imagine that I am going to do some intensive work here. So I am specifying a background thread to receive the events on for a map operation, in this case. Then I will call SubscribeOn and specify orangeScheduler. I am specifying where I want subscription to be created. their handlers executed, and where they will be disposed of. By using the subscribeOn operator here, It will direct subscriptions for the entire chain, so that means that I am no longer in the MainThread, it is going to be the thread established by the orangeScheduler. I am still observing on the purpleScheduler, nothing's changed there. And finally I'll use observeOn again to receive the transformed events from the map operation back on the mainThread.
Transforming Operators Ex7.kt
You have already established the foundation of your RxKotlin skyscraper of skills and have added the first floor in learning about filtering operators. Let's keep building upward. You will use transforming operator all the time to prep data coming from an observable. There are parallels between transforming operator in RxKotlin and the kotlin standard library, such as map and flatMap.
RxKotlin's Map operator works just like kotlin's standard map, except it operates on an observable sequences instead of a normal collection.
In the marble diagram, map takes a lambda that multiples each element by 10. see the coding example
in Ex7.kt
for getting more insights.
Now let's venture into a something bit more elusive at first. Before I show the marble diagram let me just say that some of these operators have elicited more than their fair share of questions, and groans and moans from newcomers to RxKotlin. They may seem complex at first, but they are very easy to understand... lets dive in...
let's walk through the marble diagram. The easiest way to follow what's happening in this marble diagram is to take each
path from the source observable, the top line, all the way to the target observable that will deliver elements to the
subscriber, the bottom line. the source observable is of type "O" that has a value property that itself is an observable
of type Int. It's value property's initial value is the number of the object ie. O1 -> 1 , O2 -> 2 and O3 -> 3. Starting
with O1, flatmap receives object and reaches into access it's value property and multiply it by 10. It the projects the
transformed elements from O1 into new Observable. The first one below flatmap just for O1, and that Observable is
flattened down to the target observable that will deliver elements to the subscriber, the bottom line.
This is the general principle of flatMap
Observable -> Transform -> Project -> Falttened to a target observable
It is similar to flatmap, but it only produces value from most recent Observable sequence. SwitchMap is actually a combination of two operators, map and switch. Here is switch map's marble diagram.
O1 is received by switchMap. It transforms its value to 10, projects it onto a new Observable for O1, and flattens it down to the target observable, just like before. But, then switchMap receives O2 and it does its thing, switching to O2 observable because it is now the latest... and so on... The result is that the target observable only receives element from the latest observable.
Combining Operators Ex8.kt
RxKotlin is about working with asynchronous sequences, which will often make order out of chaos. There is a lot you can accomplish by combining observables.
StartWith() prepends a sequence of values onto an observable that subscribes are guaranteed to receive first before any other elements.
It turns out that the startWith is actually just a simplified variant of the contact operator. Concat joins two observables together and combines their elements in the order the observables are specified.
This is similar to concat, but the only difference is that concatWith waits for the first observable to complete to start emitting the second one.
Merge will do as it says, interspersing elements from the combined observables as their elements are emitted. Like with concat there is a merge with instance method.
This is a static method on observable that will take the latest from each of the source observables whenever any source observable emits and pass those latest elements to a lambda for you to specify how to combine them.
This will emit the combined element everytime either source emits, So you might not get what you had expected here. Another thing to be aware of is that the combineLatest will wait until all source observables emit an initial value.
** there are also certain variants of combine latest that will let you combine upto 8 source observables. Now you might be wondering what the use case is... You can observe several text fields top combine their value everytime text is emitted into one of the text field.
What if you only wanted to emit a new combined element when all source observables have emitted a new element? Zip it :)
Zip will wait until all the source observables have produced an element at a corresponding index before emitting the combined element.
** this will be very much useful at the time of doing parallel api calls.
This operator allows you to switch between observables.
It will subscribe to both observables and wait until either one emits an element, and then stay subscribed to that one and unsubscribe to the other observable.
Have you ever used reduce method on an iterable in kotlin? If so, you are going to feel right at home with
this combining operator. reduce
in RxKotlin works very similarly.
It allows you to combine an observable with itself using a lambda to determine how to do the combining. In this marble diagram, you pass a lambda to determine how to perform accumulation.
This works similar to the reduce operator, but the only difference is that it passes through the intermediate results.
** you can use the combinations of multiple combining operators(see the zip + scan combining operator example in the Ex8.kt
)