RxJava2のnullの扱いとOptional

RxJava 2.0 で Observable による null の emission が非サポートになったので、その点に関してのメモ。

Javaでnullをemitしてみる

Observable<String> source = Observable.create(emitter -> {
    try {
        emitter.onNext("Alpha");
        emitter.onNext("Beta"); // io.reactivex.exceptions.OnErrorNotImplementedException: The mapper function returned a null value.
        emitter.onNext("Gamma");
        emitter.onNext("Delta");
        emitter.onNext("Epsilon");
        emitter.onComplete();
    } catch (Throwable e) {
        emitter.onError(e);
    }
});
source.map(s -> s.length() >= 5 ? s.length() : null).subscribe(s -> System.out.println("RECEIVED: " + s));

emitter.onNext("Beta"); のところで例外がスローされる。

JavaでOptionalを使ってみる

        Observable<String> source = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Throwable e) {
                emitter.onError(e);
            }
        });
        source.map(s -> Optional.ofNullable(s.length() >= 5 ? s.length() : null)).subscribe(optional -> System.out.println("RECEIVED: " + (optional.orElse(null))));

Optional でラップすることにより null を emit していないので例外はスローされず、以下のように出力される。

RECEIVED: 5
RECEIVED: null
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7

kotlinで普通にSequencesを使った場合

        val source = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        source.map { s -> if (s.length >= 5) s.length else null }.forEach { s -> println("RECEIVED: " + s) }

普通に null が扱えるので、例外はスローされず以下のように出力される。

RECEIVED: 5
RECEIVED: null
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7