RxSwift で非同期処理を合成しよう
今回は同じ型の非同期処理を RxSwift でまとめる際に使用する concat
と merge
の使い方と挙動を簡単にまとめとこうと思います。
concat
concat()
は複数の非同期処理を渡された順番で順次処理を行っていきます。サンプルは次のようになります。
let ob1 = Observable<String>.create { observer -> Disposable in // 3秒後にイベントを流す DispatchQueue.main.asyncAfter(deadline: .now() + 3) { observer.onNext("ob1") observer.onCompleted() } return Disposables.create() } let ob2 = Observable<String>.create { observer -> Disposable in // 2秒後にイベントを流す DispatchQueue.main.asyncAfter(deadline: .now() + 2) { observer.onNext("ob2") observer.onCompleted() } return Disposables.create() } Observable.of(ob1, ob2) .concat() .subscribe(onNext: { str in print(str) }) .disposed(by: disposeBag) // 出力: // // ob1 // ob2 //
また、Observable
の合成は下記のように書くこともできます。
Observable.concat(ob1, ob2)
merge
merge()
は複数の非同期処理を並列に実行することができます。つまり、Observable の渡される順番などが関係なく処理が早く終わった順にストリームに流れます。下記がサンプルコードになります。
let ob1 = Observable<String>.create { observer -> Disposable in // 3秒後にイベントを流す DispatchQueue.main.asyncAfter(deadline: .now() + 3) { observer.onNext("ob1") observer.onCompleted() } return Disposables.create() } let ob2 = Observable<String>.create { observer -> Disposable in // 2秒後にイベントを流す DispatchQueue.main.asyncAfter(deadline: .now() + 2) { observer.onNext("ob2") observer.onCompleted() } return Disposables.create() } Observable.of(ob1, ob2) .merge() .subscribe(onNext: { str in print(str) }) .disposed(by: disposeBag) // 出力: // // ob2 // ob1 //
じゃあ異なる型の Observable はどうなるの?
基本的には、ストリームのイベントを逐次検知する必要がある場合は Observable の型を統一して、concat なり merge なりを使用する必要があります。並列で処理を実行して全ての処理が完了したタイミングで値を参照する場合は、zip
という関数が用意されていますが、直列で実行が完了した値を参照したい場合は flatMap
なり、concat
なりを使って実装する感じでしょうか🤔(こんな方法があるよってやつがあれば教えてください🥺)
直列で逐次イベントを検知
enum Container { case string(String) case int(Int) } let ob1 = Observable<String>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 3) { observer.onNext("ob1") observer.onCompleted() } return Disposables.create() }.map { Container.string($0) } let ob2 = Observable<Int>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 2) { observer.onNext(2) observer.onCompleted() } return Disposables.create() }.map { Container.int($0) } Observable.of(ob1, ob2) .concat() .subscribe(onNext: { c in switch c { case .string(let str): print("string: \(str)") case .int(let num): print("int: \(num)") } }) .disposed(by: disposeBag) // 出力: // // string: ob1 // int: 2 //
並列で逐次イベントを検知
enum Container { case string(String) case int(Int) } let ob1 = Observable<String>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 3) { observer.onNext("ob1") observer.onCompleted() } return Disposables.create() }.map { Container.string($0) } let ob2 = Observable<Int>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 2) { observer.onNext(2) observer.onCompleted() } return Disposables.create() }.map { Container.int($0) } Observable.of(ob1, ob2) .merge() .subscribe(onNext: { c in switch c { case .string(let str): print("string: \(str)") case .int(let num): print("int: \(num)") } }) .disposed(by: disposeBag) // 出力: // // int: 2 // string: ob1 //
並列で完了イベントを検知
zip
を使うと上記2つの方法とは違い型を統一する必要がないので、よりシンプルに実装することができます。
let ob1 = Observable<String>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 3) { observer.onNext("ob1") observer.onCompleted() } return Disposables.create() } let ob2 = Observable<Int>.create { observer -> Disposable in DispatchQueue.main.asyncAfter(deadline: .now() + 2) { observer.onNext(2) observer.onCompleted() } return Disposables.create() } Observable.zip(ob1, ob2) .subscribe(onNext: { str, num in print("string: \(str), int: \(num)") }) .disposed(by: disposeBag) // 出力: // // string: ob1, int: 2 //
参考
- https://rxmarbles.com/
- https://stackoverflow.com/questions/39050059/rxswift-merge-different-kind-of-observables
- https://qiita.com/Tueno@github/items/099d287217b38c314e1e
- https://tech-blog.sgr-ksmt.org/2016/04/26/rx_concat_merge/