지나공 : 지식을 나누는 공간

[RxJava] 2. Flowable, Observable 본문

Tech/Reactive - Rxjava & Reactor

[RxJava] 2. Flowable, Observable

해리리_ 2022. 4. 27. 01:04

둘 다 데이터를 통지하는 생산자인 Flowable 과 Observable을 비교해보자.

Flowable Observable
Reactive Streams 인터페이스를 구현했음. Reactive Streams 인터페이스를 구현한 게 아니라 RxJava 1.x에서 독자적으로 제공하는 클래스임.
Subscriber에서 데이터를 처리한다. Subscriber는 reactive streams의 기본 스펙임. Observer에서 데이터를 처리한다. Observer도 RxJava1.x에서 독자적으로 제공하는 클래스임.
데이터 개수를 제어하는 배압 기능이 있음. 배압 기능이 없음.
배압 기능이 있기 때문에 subscription으로 전달 받는 데이터 개수를 제어할 수 있다. 배압 기능이 없기 대문에 데이터 개수를 제어할 수 없다.
subscription으로 구독을 해지한다. disposable로 구독을 해지한다.

 

1. 배압(Back Pressure)이란?

Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달 받아 처리하는 속도보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능이다.

위 그림처럼 둘의 속도가 달라 처리하는 속도가 통지 속도를 따라가지 못할 때 에러가 발생한다. 데이터를 통지하고 데이터를 구독하고 구독한 데이터를 전달 받아서 처리한다. 

 

2. 두 속도가 달라지면 어떻게 되나?

통지는 1 밀리세컨드, 처리는 1초 즉 1000 밀리세컨드 걸리니까 약 1000배 정도 느리다.

데이터가 들어오는 속도를 처리하는 속도가 따라가지 못하게 subscribe 내부에 처리할 때 1초가 지연되도록 설정했다. 이러면 데이터가 통지되는 것에 비해 처리 속도가 느려서 에러가 발생하는데, 이런 에러 발생을 처리하기 위해서 RxJava에서는 배압 전략이라는 게 제공된다. (MissingBackpressureException이 발생한다.)

 

3. 배압전략 (Backpressure Strategy)

RxJava에서는 Backpressure를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에 대한 배압 전략을 제공한다. BackpressuerStratagy라는 Enum에 정의되어 있다.

  • MISSING 전략
    • 배압을 적용하지 않고 나중에 onBackpressureXXX()로 배압을 적용할 수 있다.
  • ERROR 전략
    • 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureExcecption 에러를 통지한다.
    • 소비자가 생산자의 통지 속도를 따라잡지 못할 때 발생한다.
  • BUFFER 전략 :
    • DROP_LATEST
    • DROP_OLDEST
    • DROP
    • LATEST

3-1. 배압전략 : BUFFER 전략 - DROP_LATEST

버퍼가 가득 찬 시점에 버퍼 내에서 가장 최근에 버퍼로 들어온 데이터를 drop하고, drop된 빈 자리에 대기 중이던 데이터를 버퍼 안으로 채운다. 꽉 채워진 이상태에서 또 다른 데이터가 들어와서 overflow가 난다면 방금 다른 애 빼내고 넣었던 그 데이터가 가장 최근 데이터이니 다시 걔를 뺀다.

로그를 총 4개 출력한다.

  • interval 함수에서 통지한 데이터가 어떤 게 있는 지 로그로 출력한다.
  • onBackpressuerBuffer 함수 내부에서 버퍼가 가득 찼을 때 오버플로우가 발생했다는 로그를 출력한다.
  • 그거 다음에 있는 doOnNext에서는 버퍼 내에서 데이터가 통지될 때 로그를 출력한다. (버퍼에서 데이터가 나갈 때)
  • 소비자 쪽에서 데이터를 전달 받아서 처리할 때도 로그로 출력한다.

그 외에

  • observeOn에서 적힌 buffer size는 데이터를 요청하는 개수다. 데이터를 1개씩 통지해달라고 생산자 쪽에 요청한다.

DROP_LATEAT 전략의 전체 플로우를 한줄한줄 해석해보자.

  1. start
  2. 처음에 0이 통지되고 버퍼에 0이 들어갔다.
  3. 버퍼에 있던 0이 통지됐다. (버퍼에서 나왔다)
  4. 1이 통지됐고 버퍼에 들어갔을 것이다.
  5. 2가 통지됐고 버퍼에 들어갔을 것이다. (버퍼사이즈 2만큼 이제 꽉 찼다.)
  6. 3이 통지됐다. 버퍼에 들어가려고 한다. 그런데 그 순간 버퍼에 들어가는 사이즈에 어긋나니까
  7. overflow라는 로그가 출력되고 전략에 따라 최근에 들어간 2가 버퍼에서 빠지고 3이 버퍼에 들어갔을 것이다. (버퍼 : 1,3)
  8. 소비자가 0을 소비했다.
  9. 버퍼에서 1이 나왔다.
  10. 4가 통지됐고 버퍼에 4가 들어갔다. (버퍼 : 3,4)
  11. 5가 통지됐고 버퍼에 5가 들어가려 한다. (버퍼: 3,4,5. overflow)
  12. overflow 라는 로그가 출력되고 전략에 따라 4가 나오고 그 자리에 5가 들어간다. (버퍼 : 3,5)
  13. 6이 통지됐고 버퍼에 6이 들어가려한다.
  14. overflow라는 로그가 출력되고 전략에 따라 5가 나오고 그 자리에 6이 들어간다. (버퍼 : 3, 6)
  15. 소비자가 아까 버퍼에서 내보낸 그 1을 소비했다.
  16. 버퍼에 있던 3이 나왔다. (버퍼 : 6)
  17. 7이 통지됐고 버퍼에 들어갔다. (버퍼: 6,7)
  18. 8이 통지됐고 버퍼에 들어가려 한다.
  19. overflow라는 로그가 출력되고 전략에 따라 7이 나오고 8이 그 자리에 들어간다. 

 

3-2. 배압전략 : BUFFER 전략 - DROP_OLDEST

버퍼가 가득 찬 시점에 버퍼 내에서 가장 오래 전에 버퍼로 들어온 데이터를 drop하고, 그 자리에 버퍼 밖에서 대기하던 데이터를 채운다.

DROP_OLDEST 전략의 전체 플로우를 한줄한줄 해석해보자.

  1. start
  2. 처음에 0이 통지되고 버퍼에 0이 들어갔다.
  3. 버퍼에 있던 0이 통지됐다. (버퍼에서 나왔다)
  4. 1이 통지됐고 버퍼에 들어갔을 것이다.
  5. 2가 통지됐고 버퍼에 들어갔을 것이다. (버퍼사이즈 2만큼 이제 꽉 찼다.)
  6. 3이 통지됐다. 버퍼에 들어가려고 한다. 그런데 그 순간 버퍼에 들어가는 사이즈에 어긋나니까
  7. overflow라는 로그가 출력되고 전략에 따라 제일 예전에 들어간 1이 버퍼에서 빠지고 3이 버퍼에 들어갔을 것이다. (버퍼 : 2,3)
  8. 소비자가 0을 소비했다.
  9. 버퍼에서 2가 통지됐다.
  10. 4가 통지됐고 버퍼에 4가 들어갔다. (버퍼 : 3,4)
  11. 5가 통지됐고 버퍼에 5가 들어가려 한다. (버퍼: 3,4,5. overflow)
  12. overflow 라는 로그가 출력되고 전략에 따라 3이 나오고 그 자리에 5가 들어간다. (버퍼 : 4,5)
  13. 6이 통지됐고 버퍼에 6이 들어가려한다.
  14. overflow라는 로그가 출력되고 전략에 따라 4가 나오고 그 자리에 6이 들어간다. (버퍼 : 5, 6)
  15. 소비자가 아까 버퍼에서 내보낸 그 2를 소비했다.
  16. 버퍼에 있던 5기 나왔다. (버퍼 : 6)
  17. 7이 통지됐고 버퍼에 들어갔다. (버퍼: 6,7)
  18. 8이 통지됐고 버퍼에 들어가려 한다.
  19. overflow라는 로그가 출력되고 전략에 따라 6이 나오고 8이 그 자리에 들어간다. (버퍼 7,8) 

 

3-3. 배압전략 : BUFFER 전략 - DROP

버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP), 버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 다시 버퍼에 담는다.
위에서는 버퍼를 썼는데 아래 예제는 버퍼가 없기 때문에 통지된 데이터가 처리되지 않았다면 그 이후 통지된 값들은 모두 드롭. 현재 처리 대기 중인 데이터가 있으면 드롭, 없다면 받아들임.

DROP 전략의 전체 플로우를 한줄한줄 해석해보자.

  1. 처음에 0이 통지된다.
  2. 하지만 0에 대한 처리가 아직 되지 않았기 때문에 1은 드롭된다.
  3. 그 다음에 통지된 2조 아직 0이 처리되지 않아서 통지되자마자 드롭된다.
  4. 3도 드롭된다.
  5. 이제 드디어 0이 처리됐다.
  6. 4가 통지됐고 이 시점에는 기존에 처리 중이던 애가 처리가 끝났으니 4를 받아서 처리되길 기다릴 수 있다.
  7. 5가 통지됐는데 4가 처리되지 않았으니 드롭
  8. 6, 7 모두 드롭
  9. 4가 처리됐다.
  10. 이제 처리중인 건 없으니 8 받는다.

3-4. 배압전략 : BUFFER 전략 - LATEST

 
728x90
Comments