RxKotlin
で実行する Observable
のスレッドをどのように切り替えるのか調査したので、調べた内容を簡単にまとめておこうかと思います🦅
それではやっていく
まずは、Android
プラットフォーム上のメインスレッドで処理を実行できるようにしてみます。Android
固有のスレッドを使用するため iOS
における RxCocoa
のように、RxAndroid
というバイナリが必要になるので依存関係を追加します。
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
※ 執筆時点では 3.0.0
が最新でした。詳しくはこちらをご参照ください.
ということで、RxAndroid
のインストールが完了したので validation
という Observable
の結果をメインスレッドで実行してみます。
presenter.validation() .observeOn(AndroidSchedulers.mainThread()) .subscribe({ println("success") }, { println(it.localizedMessage) }, { println("completed") }) .addTo(compositeDisposable)
Android
も iOS
と同じく、UI
などを描画するためのスレッドであるメインスレッドをアプリ起動時に作成します。iOS
に限らず Android
でも実行結果をメインスレッドで実行するという処理は比較的多いユースケースかと思いますので、今回は Observable
の Extension
メソッドとしてメインスレッドでの実行を下記のように切り出しました。
fun <T> Observable<T>.observeOnMain(): Observable<T> { return observeOn(AndroidSchedulers.mainThread()) }
こうすることで、毎回スレッドを指定せずにメソッドを呼び出すだけでメインスレッドで実行できるようになります。
presenter.validation() .observeOnMain() .subscribe({ println("success") }, { println(it.localizedMessage) }, { println("completed") }) .addTo(compositeDisposable)
非同期処理はどの Scheduler で実行する?
結論から述べるとネットワーク通信やデータベース操作など CPU
を多く使用しない処理などには Schedulers.io()
を使用します。前述の例で validation
を非同期スケジューラで実行すると下記のようになります。
presenter.validation() .subscribeOn(Schedulers.io()) .observeOnMain() .subscribe({ println("success") }, { println(it.localizedMessage) }, { println("completed") }) .addTo(compositeDisposable)
ReactiveX
における subscribeOn
と observeOn
の違いについては以前の記事でも紹介しているので、ぜひ参考にしてみてください。
ちなみに、こちらの記事もお勧めです。
その他 Scheduler
上記で紹介したもの以外の Schedulers
の簡単な紹介は下記のようになります。
newThread()
呼び出し毎に新しいスレッドを作成し、実行するスケジューラです。基本的に、他のスレッドでの処理を大幅に遅延させるような処理でない場合は使用する必要はないかと思われます🤔 また、ここで作成されたスレッドは再利用されることはありません。
computation()
計算作業用のスケジューラで実行します。これはループ処理やコールバックの処理などをするためのスケジューラのようです。RxSwift
では、ConcurrentDispatchQueueScheduler(qos: .background)
などでコールバック後の処理なども一緒くたに実行していましたが、RxJava
では下記のように、計算作業の io
スケジューラでの実行明確に禁止しているので computation()
を使うように気をつけたいですね👀
Do not perform computational work on this scheduler. Use computation() instead.
Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler.
参照: http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html#io--
ちょっとサンプルをかくのが面倒だったので参照させていただきました!
Observable.create(/* */) .observeOn(Schedulers.computation()) .map(/* */) // ここは計算用スレッドで実行される .observeOn(Schedulers.io()) .subscribe(/* */); // ここはI/Oスレッドで実行される
参照: https://tech.mokelab.com/android/libs/RxJava/thread.html
immediate()
現在のスレッドで即座にタスクを実行するためのスケジューラです。
trampoline()
先入先出法で、タスクを実行していくためのスケジューラです。
今回紹介した以外にもいくつか Scheduler
はあるので、詳しくは下記を参照してみてください。
という感じで本日も以上になります🍺
参考
- http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html#io%28%29
- https://stackoverflow.com/a/58006339/14219079