rxSwift와 combine의 비동기 흐름은 아래와 같다

 

Observable     →     이벤트 생성     →     전달     →     Observer

 

RxSwift의 경우

Observable     →     이벤트 생성     →     전달     →     Observer
                              subscribe(on:)        observe(on:)

 

이벤트가 생성되는 쪽의 실행 스레드를 지정해주는 것이 subscribe(on:)

생성된 이벤트를 전달받아 처리하는 쪽의 실행 스레드를 지정해주는 것이 observe(on:)이다.

Observable<Int>
     .create { observer in
         // Subscription 영역
     }
     .subscribe(on:) // Subscription 영역의 실행 스레드 지정 (업스트림)
     .observe(on:) // Observing 영역을 지정 (다운 스트림)
     .subscribe { _ in
         // Observing 영역
     }
     .disposed(by: disposeBag)

 

 

subscribe(on:)은 어느곳에서 호출되더라도 이벤트를 방출하는 Subscription영역의 실행 스레드를 변경시키고

observe(on:)은 Observing 영역의 실행스레드를 변경시키기 때문에 호출된 곳의 아래 있는 실행스레드들이 바뀌게 된다

 

 

subscribe(on:)은 여러번 호출되더라도 Subscription영역은 한군데이기 때문에 위치와 관계 없이 가장 처음 호출된것이 적용된다.

observe(on:)은 다른 observe(on:)을 만날때 까지 적용된다

Observable<Int>
    .create { observer in
        // Subscription 영역
        print("create 스레드: \(Thread.current)")
        observer.onNext(1)
        observer.onCompleted()
        return Disposables.create()
    }
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))  // 이것만 적용    
    .observe(on: ConcurrentDispatchQueueScheduler(qos: .background)) // 아래 전부 적용
    .map {
        // Observing 영역
        print("Map 스레드: \(Thread.current)")
        return $0
    }
    .subscribe(on: MainScheduler.instance)
    .observe(on: MainScheduler.instance) // 아래 전부 적용
    .subscribe { _ in
        // Observing 영역
        print("subscribe 스레드: \(Thread.current)")
    }
    .disposed(by: disposeBag)
    
    
Observable<Int>
    .create { observer in
        // Subscription 영역
        print("create 스레드: \(Thread.current)")
        observer.onNext(1)
        observer.onCompleted()
        return Disposables.create()
    }
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))  // 이것만 적용
    .subscribe(on: MainScheduler.instance)
    .observe(on: ConcurrentDispatchQueueScheduler(qos: .background)) // 아래 전부 적용
    .map {
        // Observing 영역
        print("Map 스레드: \(Thread.current)")
        return $0
    }
    .observe(on: MainScheduler.instance) // 아래 전부 적용
    .subscribe { _ in
        // Observing 영역
        print("subscribe 스레드: \(Thread.current)")
    }
    .disposed(by: disposeBag)
    

// 결과
create 스레드: <NSThread: 0x600001750c00>{number = 5, name = (null)}
Map 스레드: <NSThread: 0x600001750c00>{number = 5, name = (null)}
subscribe 스레드: <_NSMainThread: 0x60000170c000>{number = 1, name = main}

 

 

Combine의 경우

Observable     →     이벤트 생성     →     전달     →     Observer
                              subscribe(on:)       receive(on:)

 

combine도 rxSwift와 동일하다 다만 이름이 observe(on:) -> receive(on:)로 변경된것 뿐이다.

Just(1)
    .subscribe(on: DispatchQueue.global()) // 이것만 적용
    .subscribe(on: RunLoop.main)
    .handleEvents(receiveSubscription: { _ in
        print("subscription 스레드 \(Thread.current)")
    })
    .receive(on: RunLoop.main)
    .map { (value) -> Int in
        print("map 스레드 \(Thread.current)")
        return value
    }
    .receive(on: DispatchQueue.global())
    .sink { value in
        print("received 스레드 \(Thread.current)")
    }
    
Just(1)
    .handleEvents(receiveSubscription: { _ in
        print("subscription 스레드 \(Thread.current)")
    })
    .receive(on: RunLoop.main)
    .map { (value) -> Int in
        print("map 스레드 \(Thread.current)")
        return value
    }
	.subscribe(on: DispatchQueue.global()) // 이것만 적용
    .subscribe(on: RunLoop.main)
    .receive(on: DispatchQueue.global())
    .sink { value in
        print("received 스레드 \(Thread.current)")
    }
    
// 결과
subscription 스레드 <NSThread: 0x6000017bf700>{number = 9, name = (null)}
map 스레드 <_NSMainThread: 0x600001710000>{number = 1, name = main}
received 스레드 <NSThread: 0x600001718300>{number = 7, name = (null)}

 

 

 

요약

이벤트 생성 시점 스레드 지정 (가장 처음 호출된 것만 적용 / 업스트림)

  • subscribe(on:) 

이벤트 처리 시점 스레드 지정 (호출된곳에서부터 아래로 전부 영향을 받음 / 다운스트림)

  • RxSwift - observe(on:)
  • Combine - receive(on:)