iOSエンジニアのつぶやき

毎朝8:30に iOS 関連の技術について1つぶやいています。まれに釣りについてつぶやく可能性があります。

【Android】ReactiveX の恩恵を受けたスレッド管理を学ぶ

RxKotlin で実行する Observable のスレッドをどのように切り替えるのか調査したので、調べた内容を簡単にまとめておこうかと思います🦅

f:id:yum_fishing:20210201110324p:plain:w200

それではやっていく

まずは、Android プラットフォーム上のメインスレッドで処理を実行できるようにしてみます。Android 固有のスレッドを使用するため iOS における RxCocoa のように、RxAndroid というバイナリが必要になるので依存関係を追加します。

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

※ 執筆時点では 3.0.0 が最新でした。詳しくはこちらをご参照ください.

ということで、RxAndroid のインストールが完了したので validation という Observable の結果をメインスレッドで実行してみます。

        presenter.validation()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                println("success")
            }, {
                println(it.localizedMessage)
            }, {
                println("completed")
            })
            .addTo(compositeDisposable)

AndroidiOS と同じく、UI などを描画するためのスレッドであるメインスレッドをアプリ起動時に作成します。iOS に限らず Android でも実行結果をメインスレッドで実行するという処理は比較的多いユースケースかと思いますので、今回は ObservableExtension メソッドとしてメインスレッドでの実行を下記のように切り出しました。

fun <T> Observable<T>.observeOnMain(): Observable<T> {
    return observeOn(AndroidSchedulers.mainThread())
}

こうすることで、毎回スレッドを指定せずにメソッドを呼び出すだけでメインスレッドで実行できるようになります。

        presenter.validation()
            .observeOnMain()
            .subscribe({
                println("success")
            }, {
                println(it.localizedMessage)
            }, {
                println("completed")
            })
            .addTo(compositeDisposable)

非同期処理はどの Scheduler で実行する?

結論から述べるとネットワーク通信やデータベース操作など CPU を多く使用しない処理などには Schedulers.io() を使用します。前述の例で validation を非同期スケジューラで実行すると下記のようになります。

        presenter.validation()
            .subscribeOn(Schedulers.io())
            .observeOnMain()
            .subscribe({
                println("success")
            }, {
                println(it.localizedMessage)
            }, {
                println("completed")
            })
            .addTo(compositeDisposable)

ReactiveX における subscribeOnobserveOn の違いについては以前の記事でも紹介しているので、ぜひ参考にしてみてください。

yamato8010.hatenablog.com

ちなみに、こちらの記事もお勧めです。

medium.com

その他 Scheduler

上記で紹介したもの以外の Schedulers の簡単な紹介は下記のようになります。

newThread()

呼び出し毎に新しいスレッドを作成し、実行するスケジューラです。基本的に、他のスレッドでの処理を大幅に遅延させるような処理でない場合は使用する必要はないかと思われます🤔 また、ここで作成されたスレッドは再利用されることはありません。

computation()

計算作業用のスケジューラで実行します。これはループ処理やコールバックの処理などをするためのスケジューラのようです。RxSwift では、ConcurrentDispatchQueueScheduler(qos: .background) などでコールバック後の処理なども一緒くたに実行していましたが、RxJava では下記のように、計算作業の io スケジューラでの実行明確に禁止しているので computation() を使うように気をつけたいですね👀

Do not perform computational work on this scheduler. Use computation() instead.

Unhandled errors will be delivered to the scheduler Thread's Thread.UncaughtExceptionHandler.

参照: http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html#io--

ちょっとサンプルをかくのが面倒だったので参照させていただきました!

Observable.create(/* */)
        .observeOn(Schedulers.computation())
        .map(/* */) // ここは計算用スレッドで実行される
        .observeOn(Schedulers.io())
        .subscribe(/* */); // ここはI/Oスレッドで実行される

参照: https://tech.mokelab.com/android/libs/RxJava/thread.html

immediate()

現在のスレッドで即座にタスクを実行するためのスケジューラです。

trampoline()

先入先出法で、タスクを実行していくためのスケジューラです。

今回紹介した以外にもいくつか Scheduler はあるので、詳しくは下記を参照してみてください。

reactivex.io

という感じで本日も以上になります🍺

参考

その他の記事

yamato8010.hatenablog.com

yamato8010.hatenablog.com

yamato8010.hatenablog.com