지나공 : 지식을 나누는 공간
[RxJava] 2. Flowable, Observable 본문
둘 다 데이터를 통지하는 생산자인 Flowable 과 Observable을 비교해보자.
Flowable | Observable |
Reactive Streams 인터페이스를 구현했음. | Reactive Streams 인터페이스를 구현한 게 아니라 RxJava 1.x에서 독자적으로 제공하는 클래스임. |
Subscriber에서 데이터를 처리한다. Subscriber는 reactive streams의 기본 스펙임. | Observer에서 데이터를 처리한다. Observer도 RxJava1.x에서 독자적으로 제공하는 클래스임. |
데이터 개수를 제어하는 배압 기능이 있음. | 배압 기능이 없음. |
배압 기능이 있기 때문에 subscription으로 전달 받는 데이터 개수를 제어할 수 있다. | 배압 기능이 없기 대문에 데이터 개수를 제어할 수 없다. |
subscription으로 구독을 해지한다. | disposable로 구독을 해지한다. |
1. 배압(Back Pressure)이란?
Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달 받아 처리하는 속도보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능이다.
위 그림처럼 둘의 속도가 달라 처리하는 속도가 통지 속도를 따라가지 못할 때 에러가 발생한다. 데이터를 통지하고 데이터를 구독하고 구독한 데이터를 전달 받아서 처리한다.
2. 두 속도가 달라지면 어떻게 되나?
데이터가 들어오는 속도를 처리하는 속도가 따라가지 못하게 subscribe 내부에 처리할 때 1초가 지연되도록 설정했다. 이러면 데이터가 통지되는 것에 비해 처리 속도가 느려서 에러가 발생하는데, 이런 에러 발생을 처리하기 위해서 RxJava에서는 배압 전략이라는 게 제공된다. (MissingBackpressureException이 발생한다.)
3. 배압전략 (Backpressure Strategy)
RxJava에서는 Backpressure를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에 대한 배압 전략을 제공한다. BackpressuerStratagy라는 Enum에 정의되어 있다.
- MISSING 전략
- 배압을 적용하지 않고 나중에 onBackpressureXXX()로 배압을 적용할 수 있다.
- ERROR 전략
- 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureExcecption 에러를 통지한다.
- 소비자가 생산자의 통지 속도를 따라잡지 못할 때 발생한다.
- BUFFER 전략 :
- DROP_LATEST
- DROP_OLDEST
- DROP
- LATEST
3-1. 배압전략 : BUFFER 전략 - DROP_LATEST
버퍼가 가득 찬 시점에 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 drop하고, drop된 빈 자리에 대기 중이던 데이터를 버퍼 안으로 채운다. 꽉 채워진 이상태에서 또 다른 데이터가 들어와서 overflow가 난다면 방금 다른 애 빼내고 넣었던 그 데이터가 가장 최근 데이터이니 다시 걔를 뺀다.
로그를 총 4개 출력한다.
- interval 함수에서 통지한 데이터가 어떤 게 있는 지 로그로 출력한다.
- onBackpressuerBuffer 함수 내부에서 버퍼가 가득 찼을 때 오버플로우가 발생했다는 로그를 출력한다.
- 그거 다음에 있는 doOnNext에서는 버퍼 내에서 데이터가 통지될 때 로그를 출력한다. (버퍼에서 데이터가 나갈 때)
- 소비자 쪽에서 데이터를 전달 받아서 처리할 때도 로그로 출력한다.
그 외에
- observeOn에서 적힌 buffer size는 데이터를 요청하는 개수다. 데이터를 1개씩 통지해달라고 생산자 쪽에 요청한다.
DROP_LATEAT 전략의 전체 플로우를 한줄한줄 해석해보자.
- start
- 처음에 0이 통지되고 버퍼에 0이 들어갔다.
- 버퍼에 있던 0이 통지됐다. (버퍼에서 나왔다)
- 1이 통지됐고 버퍼에 들어갔을 것이다.
- 2가 통지됐고 버퍼에 들어갔을 것이다. (버퍼사이즈 2만큼 이제 꽉 찼다.)
- 3이 통지됐다. 버퍼에 들어가려고 한다. 그런데 그 순간 버퍼에 들어가는 사이즈에 어긋나니까
- overflow라는 로그가 출력되고 전략에 따라 최근에 들어간 2가 버퍼에서 빠지고 3이 버퍼에 들어갔을 것이다. (버퍼 : 1,3)
- 소비자가 0을 소비했다.
- 버퍼에서 1이 나왔다.
- 4가 통지됐고 버퍼에 4가 들어갔다. (버퍼 : 3,4)
- 5가 통지됐고 버퍼에 5가 들어가려 한다. (버퍼: 3,4,5. overflow)
- overflow 라는 로그가 출력되고 전략에 따라 4가 나오고 그 자리에 5가 들어간다. (버퍼 : 3,5)
- 6이 통지됐고 버퍼에 6이 들어가려한다.
- overflow라는 로그가 출력되고 전략에 따라 5가 나오고 그 자리에 6이 들어간다. (버퍼 : 3, 6)
- 소비자가 아까 버퍼에서 내보낸 그 1을 소비했다.
- 버퍼에 있던 3이 나왔다. (버퍼 : 6)
- 7이 통지됐고 버퍼에 들어갔다. (버퍼: 6,7)
- 8이 통지됐고 버퍼에 들어가려 한다.
- overflow라는 로그가 출력되고 전략에 따라 7이 나오고 8이 그 자리에 들어간다.
3-2. 배압전략 : BUFFER 전략 - DROP_OLDEST
버퍼가 가득 찬 시점에 버퍼 내에서 가장 오래 전에 버퍼로 들어온 데이터를 drop하고, 그 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
DROP_OLDEST 전략의 전체 플로우를 한줄한줄 해석해보자.
- start
- 처음에 0이 통지되고 버퍼에 0이 들어갔다.
- 버퍼에 있던 0이 통지됐다. (버퍼에서 나왔다)
- 1이 통지됐고 버퍼에 들어갔을 것이다.
- 2가 통지됐고 버퍼에 들어갔을 것이다. (버퍼사이즈 2만큼 이제 꽉 찼다.)
- 3이 통지됐다. 버퍼에 들어가려고 한다. 그런데 그 순간 버퍼에 들어가는 사이즈에 어긋나니까
- overflow라는 로그가 출력되고 전략에 따라 제일 예전에 들어간 1이 버퍼에서 빠지고 3이 버퍼에 들어갔을 것이다. (버퍼 : 2,3)
- 소비자가 0을 소비했다.
- 버퍼에서 2가 통지됐다.
- 4가 통지됐고 버퍼에 4가 들어갔다. (버퍼 : 3,4)
- 5가 통지됐고 버퍼에 5가 들어가려 한다. (버퍼: 3,4,5. overflow)
- overflow 라는 로그가 출력되고 전략에 따라 3이 나오고 그 자리에 5가 들어간다. (버퍼 : 4,5)
- 6이 통지됐고 버퍼에 6이 들어가려한다.
- overflow라는 로그가 출력되고 전략에 따라 4가 나오고 그 자리에 6이 들어간다. (버퍼 : 5, 6)
- 소비자가 아까 버퍼에서 내보낸 그 2를 소비했다.
- 버퍼에 있던 5기 나왔다. (버퍼 : 6)
- 7이 통지됐고 버퍼에 들어갔다. (버퍼: 6,7)
- 8이 통지됐고 버퍼에 들어가려 한다.
- overflow라는 로그가 출력되고 전략에 따라 6이 나오고 8이 그 자리에 들어간다. (버퍼 7,8)
3-3. 배압전략 : BUFFER 전략 - DROP
DROP 전략의 전체 플로우를 한줄한줄 해석해보자.
- 처음에 0이 통지된다.
- 하지만 0에 대한 처리가 아직 되지 않았기 때문에 1은 드롭된다.
- 그 다음에 통지된 2조 아직 0이 처리되지 않아서 통지되자마자 드롭된다.
- 3도 드롭된다.
- 이제 드디어 0이 처리됐다.
- 4가 통지됐고 이 시점에는 기존에 처리 중이던 애가 처리가 끝났으니 4를 받아서 처리되길 기다릴 수 있다.
- 5가 통지됐는데 4가 처리되지 않았으니 드롭
- 6, 7 모두 드롭
- 4가 처리됐다.
- 이제 처리중인 건 없으니 8 받는다.
3-4. 배압전략 : BUFFER 전략 - LATEST
'Tech > Reactive - Rxjava & Reactor' 카테고리의 다른 글
Reactive Stream과 구현체 : Project Reactor, RxJava, WebFlux (2) | 2022.05.09 |
---|---|
[RxJava] 1. Reactive Streams (0) | 2022.04.07 |
[RxJava] 0. 리액티브 프로그래밍 + 마블다이어그램 (4) | 2022.04.07 |