Error Handling in RxJava

Error Handling in RxJava

As we say, we have an operator for almost everything in Rxjava. Correct ? In this blog, we will learn how to properly handle errors using RxJava operators in your android project.

But before that, let's understand

What is an Error?

Error can be understood as a mistake or an output which got generated but was not expected.

Now, let us see a normal example of Rx

Observable.just(1, 2, 3, 4)
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(t: Int) {
            Log.d("onNext", t.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

Here, it will generate an output in Logcat,

/onNext: 1
/onNext: 2
/onNext: 3
/onNext: 4
/onComplete: Completed

Now, lets see what happens if we get any sort of Exception for eg. Null Pointer.

Observable.just(1,2,3,4).subscribe(object : Observer<Int> {
    override fun onComplete() {
        Log.d("onComplete", "onComplete")
    }

    override fun onSubscribe(d: Disposable) {
    }

    override fun onNext(i: Int) {
        if (i == 3) {
            throw NullPointerException("Its a NPE")
        }
        Log.d("onNext", i.toString())
    }

    override fun onError(e: Throwable) {
        Log.d("onError", e.localizedMessage)
    }

})

It will crash the app with throwing an error,

Its a NPE

This is because we got an exception and so, here the execution was not completed.

What are exceptions in Android ?

Exceptions are events that occur during the execution of programs that disrupt the normal flow. They are of two types : Checked and Unchecked .

  • Checked Exception : Exception that must be either caught or declared in the method in which it is thrown. For example,
String transform(String input) throws IOException;
  • UnChecked Exception : The Exceptions whose handling is NOT verified during Compile time. For example,
if (i == 3) {
    throw NullPointerException("Its a NPE")
}

Till now, we understood about Exceptions, its type and how we get it in RxJava. But what if, exeception gets called in a operator even before the subscriber. We will discuss various ways to handle error in this blog.

Example for map operator,

Observable.just(1, 2, 3, 4)
    .map { x ->
        if (x == 2) {
            throw NullPointerException("Its a NPE")
        }
        x * 10
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onNext", "onComplete")
        }
        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)\
        }

    })

Here we will see the output

 onError :Its a NPE 

and we would see similar for flatMap and other operators as well.

Here, in the above code we see as soon as we get an exception in map operator and then we directly goto onError and the onNext doesn't get called or even onComplete. So, to handle the error in cases like this we use different operators and it will not move to onError directly.

Let us understand them one by one.

onExceptionResumeNext

In onExceptionResumeNext() operator if it gets an exception even before we start the methods onComplete, onError and onNext like in doOnNext then it will directly move to onExcetionResumeNext and will perform the task inside it.

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        if (it == 2) {
            throw (RuntimeException("Exception on 2"))
        }

    }.onExceptionResumeNext {
        Log.d("onExceptionResumeNext","1")
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }
        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

and when we run this, we get the output

onNext: 1
onExceptionResumeNext: 1

Here, once the onNext produces value 1 and then when it throws exception when the value returned is 2 and then onExceptionResumeNext get called and the task starts get performed inside it.

onErrorResumeNext

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .map { x ->
        if (x == 2) {
            throw NullPointerException("Its a NPE")
        }
        x * 10
    }.onErrorResumeNext { throwable: Throwable ->
        return@onErrorResumeNext ObservableSource {
            Log.d("onErrorResumeNext", throwable.localizedMessage)
        }
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

In onErrorResumeNext() operator, it handles the throwable generated by Java code. Here, in the above code the output will be,

D/onNext: 10
D/onErrorResumeNext: Its a NPE

Once the onNext generates the first int after multiplying from 10 in map operator and when 2 comes up it throws the NPE Exception. It won't be moving to onError directly and the error would be handled in onErrorResumeNext and we can perform the rest actions in that.

In my case i am just showing a log, but for above both cases we can log it to some crashlytics or some other service

onErrorResumeNext replaces the current stream with an entirely new Observable

doOnError

doOnError, is a side-effect operator where user will see the error even before the exception being occured.

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        if (it == 2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .doOnError {
        Log.d("doOnError", "Exception Occured")
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }
        override fun onSubscribe(d: Disposable) {
        }
        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }
        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

In the above code snippet, we are calling two differenet side-effect operators doOnNext and doOnError.

In short, side-effect operators are the ones which gets called even before the main subscriber methods being called like onNext , onError.

So, here doOnNext gets called before onNext being called and same for doOnError. When we run the above code,

D/onNext: 1
D/doOnError: Exception Occured
D/onError: Exception on 2

We will see the above output, where first 1 is being printed from onNext and then when the value becomes 2, Exception occured. Since, we called a side-effect operator, doOnError gets called first before the onError method.

Here you can show, some sort of error message to user like Toast or a dialog.

onErrorReturnItem

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        if (it == 2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .onErrorReturnItem(10)
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }
        override fun onSubscribe(d: Disposable) {
        }
        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }
        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

In onErrorReturnItem operator, we return an item whenever any exception or error occured followed by onComplete. The output of the above code is,

D / onNext: 1
D / onNext: 10
D / onComplete: onComplete

Where, first value( i.e. 1 ) gets printed from onNext and we get Exception when the value is 2 in doOnNext. But since we are using onErrorReturnItem, it will return an item when the exception occured of the same data type. Here, we have integer data type so it will return an item of type integer in onNext and then it will complete the call.

We can use this operator, to pass some default value of same type when there is an exception.

Note: The positioning of onErrorReturnItem matters as if we call it above doOnNext, it wouldn't be called after the Observable Exception and then onError will be called directly.

onErrorReturn

We sometimes need to produce the default item when an error or exception . So onErrorReturn provides us a throwable and a lambda to return.

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        if (it == 2) {
            throw (RuntimeException("Exception on 2"))
        }
    }
    .onErrorReturn { t: Throwable ->
        Log.d("onComplete", t.localizedMessage)
        5
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

Here, it will return 5 in onNext after getting the error.

D / onComplete: Exception on 2
D / onNext: 5
D / onComplete: onComplete

This is the output after running the app. It returns a value 5 in onNext after catching the exception and then completes the flow.

If we don't pass a value in onErrorReturn operator, we will get a default value from the operator (27 in my case).

onErrorReutn replaces onError with a single onNext(value) followed by onCompleted()

retry

in this operator, we re-subscribe the obserable and give it a chance to get called again.

For example : You have an API call which failed because of fluctuating internet connection as it will throw an error. We will retry the api call using the retry operator.

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        if (it == 2) {
            throw (RuntimeException("Exception on 2"))
        }
    }.retry(4)
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            Log.d("onComplete", "onComplete")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onNext(i: Int) {
            Log.d("onNext", i.toString())
        }

        override fun onError(e: Throwable) {
            Log.d("onError", e.localizedMessage)
        }
    })

Here, in the code we have retried 4 times. So, every time it will get an error it will retry itself till the fourth chance and then finally it will throw the exception

The output of the following is,

D/onNext: 1
D/onNext: 1
D/onNext: 1
D/onNext: 1
D/onError: Exception on 2
Note : If we don't pass 4, it will keep on retrying unless it is successful. In our case it will make retry itself infintely as we won't be successful.

This is all about how to handle errors in RxJava.

Understand by Example

Now, we will discuss handling error in Rxjava while doing a mock API call. In this example, we will use Zip operator to zip two observables.

These observables will be mock reponse of two different APIs. Thet are,

val apiResponseOne = ArrayList<String>()
val apiResponseTwo = ArrayList<String>()

apiResponseOne.add("Response of API-1")
apiResponseTwo.add("Response of API-2")
val observableOne = Observable.just(apiResponseOne)
    .map {
        if (it.size == 1) {
            Log.d(TAG, "NullPointerException")
            throw NullPointerException("Its a NPE")
        }
        it
    }
    .onErrorReturn { ArrayList() }
val observableTwo = Observable.just(apiResponseTwo)
    .map { it }
    .onErrorReturn { ArrayList() }

Now, we have two different observables which will return the two dummy array-list as API response.

In the observableOne, we get an Exception because the size is one and as it gets an error it will return an empty list.

In observableTwo, we would get apiResponseTwo arraylist as return value as Observable.

Now, we will zip both the observables

Observable.zip(
    observableOne,
    observableTwo,
    BiFunction<ArrayList<String>, ArrayList<String>, ArrayList<String>> { outputOne, outputTwo ->
        Log.d(TAG, outputFunction(outputOne, outputTwo).size.toString())
        return@BiFunction outputFunction(outputOne, outputTwo)
    }
).subscribe(object : Observer<ArrayList<String>> {
    override fun onComplete() {

    }
    override fun onSubscribe(d: Disposable) {
    }

    override fun onNext(data: ArrayList<String>) {
        for (individualData in data) {
            Log.d(TAG, "data : $individualData")
        }
    }

    override fun onError(e: Throwable) {
    }
})

and outputFunction looks like,

private fun outputFunction(outputOne: ArrayList<String>, outputTwo: ArrayList<String>): ArrayList<String> {
    val output = ArrayList<String>()
    output.addAll(outputOne)
    output.addAll(outputTwo)
    return output
}

In the above code, we are zipping the output of two operators and returning as a single arraylist by adding both arraylist as one.

As, observableOne will return a empty arraylist as it will throw an exception and observableTwo will return a list of one string. We will merge it and it will return a new output .

The output is,

D/MainActivity: NullPointerException
D/MainActivity: 1
D/MainActivity: data : Response of API-2

First, the Log gets printed of the map operator of observable one where it returns an empty list because of an exception , then the log of BiFunction gets printed with the size of the merged output list i.e. 1 as because we would have only Respone of API-2 in the list which we can verify from the Log of onNext.

The purpose to explain this example is that even if any API call fails in a Rx operation, we would still be able to complete task without halting the operation or stopping the app.

Click here If you want to understand , how Zip Operator works

Important Note

  • In Map operator, we can also log error using Exception.Propaget()
Observable.just("Hello MindOrks")
  .map(input -> {
    try {
      return //sometask;
    } catch (Throwable t) {
      throw Exceptions.propagate(t);
    }
  })

Here, Exception.Propagate() wraps the throwable to throw an error.

  • In FlatMap operator, as we return Observable. So, when we get any error we can handle it using,
Observable.just("Hello MindOrks")
  .flatMap(input -> {
    try {
      return Observable.just(/*perform-work*/);
    } catch (Throwable t) {
      return Observable.error(t);
    }
  })

Happy learning

Team MindOrks :)