Reactive Programming For Android

Despite being a fairly new paradigm, reactive programming is receiving quite a major distribution. Libraries allowing to write reactive code have been implemented for multiple languages. Some of the most popular among such libraries are RxJava, RxKotlin, RxSwift, RxJS, etc.

Writing asynchronous code is often connected to the following issues:

  • Complicated error processing. If there are errors in multiple async tasks, these errors have to be taken somewhere. In case the app logic does not allow a task if the previous one was not completed, the problem of standard approach task realization rises up.
  • Callback Hell. Developing a complex Android app with multiple network connections, user interactions, and animations means the application code might have a bunch of nested callbacks. For example, a callback is received in an async operation, and then another async operation is performed in it with another callback received within, and so on.
  • High risk of receiving untrackable errors.

Reactive code can simplify the process of writing async code.

It is hard to explain reactive programming in a nutshell. Well, at least I couldn’t find any. You can say that reactive programming is an Observer template on steroids. Or rather, reactive programming is programming aimed at flows. The main idea is in presenting events and data as flows that can be unified, filtered, transformed, and separated. Sounds quite vague but I hope some of the examples below will help make sense of it.

🔭  Observable

The basic building blocks of reactive code are Observables and Subscribers. The Observable class is the source of data and the Subscriber class is the consumer.

Java developers have to be familiar with the Iterator interface:

interface Iterator<E> {
boolean hasNext();
E next();
}

This interface provides the data in a so-called pull-based fashion, as we have to request the data from it ourselves.

One of the main classes in RxJava is Observable<T>. Its public interface has the following methods: onNext(), onComplete(), onError(). The Observable class two states: the complete successfully state, after which Observable stops working and the error stoppage state.

The concept of this class is fairly similar to the one of  Iterator but with one significant difference – the push-based data supply. On its side, Observable regulates when to supply data. We are supposed to react to those supplies. This class provides that flow of data and events we talked about.

For example:

 // Iterator
val iterator = IntStream.range(1, 10).iterator()
while (iterator.hasNext()) {
println(iterator.next()) // requesting data in the cycle
}

// Observable
Observable.range(1, 10).subscribe { println(it) } // reacting to the inline data

It’s worth mentioning that in order to get Observable to start sending data, we have to subscribe to its reception with the help of the subscribe method which receives the type Observer parameter.

interface Observer<T> {
void onNext(T t);
void onError(Throwable t);
void onCompleted()
}

In the example above, with Observable.range(1, 10) we used a lambda expression as an onNext method with empty onError and onCompleted.

Observable can transfer unlimited volumes of data by calling the onNext method with the subsequent call of onCompleted if the transfer is finished or onError if something went wrong. Also, the data flow may be endless (for example, the Observable.interval(1, TimeUnit.SECONDS) stopwatch. In this case, onCompleted won’t ever be called for.

You can create Observable manually:

val usersObservable = Observable.create<List<User>> { observer ->
getUsersFromServer { users ->
try {
observer.onNext(users)
observer.onComplete()
} catch (e: Throwable) {
observer.onError(e)
}
}
}

I can’t stress this enough, creating Observable alone won’t transfer any data. To do this, you need to subscribe to Observable. Thereafter it becomes active.

usersObservable.subscribe(
{ users -> println("Users: $users") },
{ error -> println("Error: $error") },
{ println("Completed.") })

Here’s a quick example of the creation of Observable from the existing list. Suppose a server returned a list of some Entity entity:

ArrayList<Entity> list = getEntityList();
Subscription entitySubscription = Observable.from(list)
.subscribe(...);

If there is some sort of an old code, you can teach it to work with Observable. Simply put, connect the old code to the new one without rewriting the whole thing. For this, using Observable.just() and Observable.from() should be enough:

private Object getOldObject() {
Object object = ...; // get some old code
return object;
}
public Observable<Object> newObject() {
return Observable.just(getOldObject());
}

This will work just fine if the inherited old code executes fast. But what if it takes time to execute? This blocks the entire flow as first, the old code gets executed, and then its result is transferred to Observable.just(). This situation can be fixed with the help of Observable.defer():

public Observable<Object> newObject() {
return Observable.defer(() -> Observable.just(getOldObject()));
}

The Defer operator awaits Observer’s subscription and after that, it generates Observable mainly with a factory function Observable. It does it again for every subscriber and even though every subscriber may consider they are subscribing to the same Observable, in fact, every single subscriber receives their own individual chain.

In some cases, the Observable creation waiting time is a guarantee of this Observable containing the latest data.

🔍 Observer & 📮 Subscriber

So what’s common between Observer and Subscriber, and what’s different? Observer is a spectator of Observable. While Subscriber is Observer + the unsubscribe() method + the isUnsubscribed() method, i.e. subscriber implements the Observer and Subscription interfaces:

public abstract class Subscriber<T> implements Observer<T>, Subscription

Therefore, Subscriber can not only subscribe to Observable but also unsubscribe from it.

It is necessary to remember to unsubscribe from the asynchronous calls. Rx allows you to conveniently unsubscribe from Observable. When subscribing to Observable, the subscribe method returns the Subscription object that contains the unsubscribe() method. In other words, this is some sort of a conversion chain. When calling unsubscribe(), all the operators unsubscribe from one another in sequence from top to bottom. This is how you can avoid memory leaks. The unsubscribe call can be placed in onDestroy():

Subscription entitySubscription = Observable.from(list)
.subscribe(...);
@Override
public void onDestroy() {
super.onDestroy();
entitySubscription.unsubscribe();
}

All the subscriptions can be stored in the CompositeSubscription collection. You can create a basic class of Activity or Fragment, which contains CompositeSubscription that you can later save all the subscriptions in and which automatically cleans itself.

CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(entitySubscription);
compositeSubscription.add(Observable.just(new Object()).subscribe());

compositeSubscription.clear();

🔬 Subject

Subject is the Observable extension that simultaneously implements the Observer interface:

public abstract class Subject<T, R> extends Observable<R> implements Observer<T>

Subject acts like Observer and Observable both and can receive messages about events (like Observer) and notify its subscribers (like Observable). There are several implementations of Subject which you can find out more about here.

🌡 Hot & Cold Observable

There are two types of Observable: hot and cold. In the cold Observable, every subscribed Observer receives all the data from the start regardless of when it has subscribed. The cold Observable can be compared to a CD – every listener can play the record from the start.

The hot Observable means, if some Observer has been subscribed with a delay, it can skip some part of data. This Observable can be compared to a radio broadcast, where you start listening only from the part you tuned into.

The hot Observable in RxJava is presented as ConnectableObservable. It does not start transmitting the data before its connect method is called, regardless of whether it has spectators subscribed to it:

Observable<String> observable = Observable.from(list);
ConnectableObservable<String> connectableObservable = observable.publish();
connectableObservable.connect();

🌪 Data Stream Operators

As I said, the essence of reactive programming is working with flows. We have an Observable class that supplies the data flow. Now as an example, let’s take a look at some of the data flow operators.

flatMap

In the previous example, we implemented Observable with only one element in its flow – the list of users received from the server.

usersObservable.subscribe { users -> println("Users: $users") }

The result:

Users: [Bob, Rob, Ben, Stan]

Assume we have to break this list and turn the List<User> element flow into the User element flow. The flatMap operator is just enough for that.

usersObservable
.flatMap { users -> Observable.from(users) }
.subscribe { user -> println(user) }

The result:

Bob
Rob
Ben
Stan

filter

Assume we have to select the users with their names starting with a specific letter. The filter operator can do that.

usersObservable
.flatMap { users -> Observable.from(users) }
.filter { it.name.startsWith("b", ignoreCase = true) }
.subscribe { user -> println(user) }

The result:

Bob
Ben

take

We can select several initial elements with the help of the take operator. For example, the first one:

usersObservable
.flatMap { users -> Observable.from(users) }
.filter { it.name.startsWith("в", ignoreCase = true) }
.take(1)
.subscribe { user -> println(user) }

The result:

Bob

Or the last one:

usersObservable
...
.takeLast(1)
.subscribe { user -> println(user) }

The result:

Stan

To enable takeLast to return the value, the flow has to be finite, i.e. call onComplete.

map

Assume we need to transform flow elements of one type to another. For example, we want to figure out the number of letters in the usernames:

usersObservable
.flatMap { users -> Observable.from(users) }
.map { it.name.length }
.subscribe { println(it) }

The result:

3
3
3
4

distinct

Or remove duplicates from the flow:

usersObservable
 .flatMap { users -> Observable.from(users) }
 .map { it.name.length }
 .distinct()
 .subscribe { println(it) }

The result:

3
4

As you can tell, operators can be easily manipulated and set for various tasks. The list above is far from being detailed and is just an example. Besides, you can create your own operators.

🖖🏼  Multithreading

If you have a local database that caches the data from a server, this is how you can access the list of users:  

usersDao.loadAll()

You can easily turn the action of retrieving the user list into an Observable:

val usersObservable = Observable.fromCallable {
usersDao.loadAll()
}

After you subscribe to this Observable, you will receive the list of all the users just like we did before. If this happens in the UI flow and the number of users is quite significant, the UI flow may be blocked for a long time which is unlikely to delight the users. Observable helps deal with these issues:

usersObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { println(it) }

So what happens here, is we request the data in a thread pool instead of the main thread for I/O by using the subscribeOn method. The result will be sent back in the main thread due to the observeOn method call.

subscribeOn

With the help of subscribeOn, we specify the background thread where the data flow will be generated and processed.

subscribeOn accepts Scheduler as a parameter. Scheduler is an abstraction for the thread pool management, just like ExecutorService in Java. In RxJava there are several off-the-shelf implementations of Scheduler and here are some of them:

  • Schedulers.computation(). Contains several threads for computations. The number of threads depends on the number of the processes available for Java on this device.
  • Schedulers.io(). Perfect for the I/O activities, like reading or recording into a database, server requests, reading of or recording to drive. In other words, the operations that require complex computations and many times, waiting for data to be sent or received.
  • Schedulers.newThread(). This thread will create a new thread whenever someone subscribes to Observable and after the work is done, the thread will be finished.
  • Schedulers.immediate(). Enables you to start the work right away in the current thread. This scheduler is a default one for timeout(), timeInterval(), and timestamp().
  • Schedulers.trampoline(). Enables you to delay the task in the current thread and queue it. This scheduler will process its queue and start tasks one by one. The default scheduler for repeat() and retry().
  • Schedulers.from(executorService). With the help of this, you can create Scheduler from ExecutorService.

subscribeOn is only applied once to the entire operation pool regardless of the order and the place you called this method. The repeated calls of subscribeOn will have no effect.

usersObservable
   .subscribeOn(Schedulers.io())
   .flatMap { users -> Observable.from(users) }
   .map { it.name.length }
   .distinct()
   .subscribe { println(it) }

Similarly,

usersObservable
   .flatMap { users -> Observable.from(users) }
   .map { it.name.length }
   .distinct()
   .subscribeOn(Schedulers.io())
   .subscribe { println(it) }

Similarly,

usersObservable
   .flatMap { users -> Observable.from(users) }
   .subscribeOn(Schedulers.io())
   .map { it.name.length }
   .distinct()
   .subscribeOn(Schedulers.computation()) // no need to call twice
   .subscribe { println(it) }

observeOn

observeOn is supposed to redirect the chain of operations to another thread. As opposed to subscribeOn, the observeOn position influences the way operations after it in the chain will be processed. Besides that, observeOn can be called multiple times.

usersObservable
.subscribeOn(Schedulers.io())
.flatMap { users -> Observable.from(users) }
.observeOn(Schedulers.computation()) // operations below will be executed on the computation pool threads 
.map { it.name.length }
.distinct()
.observeOn(AndroidSchedulers.mainThread()) // the result will be printed in the main thread
.subscribe { println(it) }

As you can see, every Observable can be easily used both synchronously and asynchronously by switching between the threads if it is necessary.

🗂 Reactive Programming Use Cases

From all the above, I can say that using reactive approach might be useful in various cases, for example:

  • User interaction (clicks, gestures, etc.), processing system events (GPS, gyroscope, etc.).
  • Processing asynchronous incoming data (server interaction).
  • And so on.

Case 1: Retrofit and RxJava

Retrofit is a very popular Android library to communicate with REST services. Observable support included.

Let’s see how with the help of RxJava we can combine several asynchronous requests. For example, we have two API methods: the first one returns a photo, the second one  – its metadata.

@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

@GET("/photo/{id}/metadata")
Observable<Metadata> getPhotoMetadata(@Path("id") int id);

These two requests can be easily combined:

Observable.zip(
   service.getUserPhoto(id),
   service.getPhotoMetadata(id),
   (photo, metadata) -> createPhotoWithData(photo, metadata))
   .subscribeOn(Schedulers.io()) // executed in the background thread
   .observeOn(AndroidSchedulers.mainThread()) // shows the result in the UI thread
   .subscribe(photoWithData -> showPhoto(photoWithData));

Case 2: Delayed Search

Say, we have to implement the search using the server API. With that, we want a user to see the suggested results as they are typing in the query. We don’t want to send all the intermediate strings to avoid overloading the device with unnecessary server requests. The request has to be sent if the latest user input was for example, 300ms ago.

First, let’s create Observable from TextWatcher that tracks user input.

val searchObservable = Observable.create<String?>(object : ObservableOnSubscribe<String?> {
override fun subscribe(emitter: ObservableEmitter<String?>?) {
emailEditText.addTextChangedListener(object : TextWatcher {
override fun afterTextChanged(editable: Editable?) {
// do nothing
}

override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
// do nothing
}

override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
emitter?.onNext(s?.toString())
}
})
} 
})

This Observable will supply the flow of user data. Our task is to process this flow.

searchObservable
   .filter { it.isNotEmpty() }
   .debounce(300, TimeUnit.MILLISECONDS)
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe {
   startSearch(it) // starting the thead
}

All the work here is done by the debounce operator. It filters the quick input and returns the last value from the thread after the specified inactive period.

If we used the RxBinding library, with implemented Observable and Android widgets, we wouldn’t be forced to create Observable manually and the example would have been much more simple.

Case 3: Combining Asynchronous Tasks

In The True Colors Of Kotlin article, we tried to solve this task with the help of coroutines. We have the unreliable (for example, susceptible to network connection errors) functions f1 and f2. We need to get the sum of these functions by trying to call each of them asynchronously with a time-out but within the specified attempts number.

Now let’s solve the same problem with the help of the reactive approach:

fun f1(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1
}

fun f2(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2
}

fun reactiveExample() {
val timeout = 500L
val retryCount = 3
var counterF1 = 0
var counterF2 = 0

val firstTask = Single.fromCallable { f1(counterF1++) }
  .timeout(timeout, TimeUnit.MILLISECONDS)
  .retry { retryNumber, error -> retryNumber < retryCount }

val secondTask = Single.fromCallable { f2(counterF2++) }
  .timeout(timeout, TimeUnit.MILLISECONDS)
  .retry { retryNumber, error -> retryNumber < retryCount }

Single.zip(firstTask, secondTask, BiFunction<Int, Int, Int> { a, b -> a + b })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread()) 
  .subscribe({ println(it) }, { it.printStackTrace() })
}

Here, Single is sort of an Observable but with one element in the thread. Single has a slightly different public UI, for example, instead of onNext() and onComplete(), the onSuccess() method is used to unify these methods.

The result of the reactiveExample() function will be 3.

🙅 Conclusion

In my opinion, reactive programming where the data and events are treated as streams (flows), adds to the object-oriented approach, where the events are portrayed as the object states. Such an approach allows better dynamic process description, also simplifying the asynchronous code writing.