RxJava の subscribe の例外ハンドリングが適用されない例外への対応メモ

RxJava の subscribe メソッドの例外ハンドリングを記述していてもアプリが落ちるケースがあるのでとりあえずメモ。

source: git@github.com:beyondseeker/chrono3_rx_grobal_error_handling.git

subscribe() による例外ハンドリングをすり抜ける例

下記のようなコードを Android アプリの中で実行した場合、subscribe メソッドの例外ハンドリングでは対応できず、アプリが落ちる。

/**
 * subscribe() による例外ハンドリングをすり抜ける例
 */
@Test
fun example1() {
    val disposable = CompositeDisposable()
    disposable.add(
        Completable
            .fromCallable {
                // この sleep 中に disposable.dispose() が呼ばれ、このスレッドが interrupt され java.lang.InterruptedException
                // がスローされるが、既に dispose されているので subscribe() にてこの例外をハンドリングできない。
                Thread.sleep(200)
            }
            .subscribeOn(Schedulers.newThread())
            .subscribe({ onSuccess(disposable) }, ::onError)
    )

    Thread.sleep(100)

    disposable.dispose()

    Thread.sleep(300)
}

例外はこんな感じで io.reactivex.exceptions.UndeliverableException がスローされてます。

Exception in thread "RxNewThreadScheduler-1" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.InterruptedException: sleep interrupted
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:42)
	at io.reactivex.Completable.subscribe(Completable.java:2309)
	at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.objectfanatics.chrono3.ExampleUnitTest$example1$1.call(ExampleUnitTest.kt:26)
	at com.objectfanatics.chrono3.ExampleUnitTest$example1$1.call(ExampleUnitTest.kt:14)
	at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:36)
	... 11 more

subscribe() による例外ハンドリングをすり抜けた例外をキャッチする例

下記のコードは、例外を握りつぶしている。
Androidアプリの場合は RxJavaPlugins#setErrorHandler() を Application#onCreate() 辺りで実行しておけばいいんでしょうかね。

/**
 * subscribe() によるハンドリングをすり抜た例外を catch する例
 */
@Test
fun example2() {
    RxJavaPlugins.setErrorHandler { throwable -> println("RxJava Global Error: ${throwable.message}") }
    example1()
}

private fun onSuccess(disposable: Disposable) {
    println("onSuccess: isDisposed = ${disposable.isDisposed}")
}

private fun onError(error: Throwable) {
    println("onError: ${error.message}")
}

dispose された時点以降は onSuccess も onError も呼ばれる必要はないというのであれば、特に問題なさそうな気もする。