ReactiveStream

Reactive Stream #


해당 프로젝트에서는 WebFlux를 사용하고있다. webFlux의 동작원리를 이해하기 위해서는 reactiveStream에 대해서 알아야하기 때문에 이번 포스팅은 리액티브스트림의 기본개념을 설명 해보려한다.

Reactive Stream이란 무엇인가? #

리액티브 스트림은 비동기 데이터스트림을 처리하기 위한 표준이라고 볼수있다. 내가 이해한바로는 옵저버 패턴과 비슷하게 구독자와 생산자를 구독하여 데이터를 받고있다. 하지만 옵저버와 다르게 더 정교하다.

그럼 옵저버와 뭐가 다를까?

Observer vs ReactiveStream #

둘의 공통점은 데이터를 전달하는 방식이다.
pushing방식으로 구독자에게 데이터를 전달하고있다.
토비의 webflux강의를 보면 리액티브스트림은 Interable의 쌍대성이라고 표현하는데 좀 더 풀어서 설명하자면

  • Interable : 소비자 데이터를 요청할때마다 생산자가 데이터를 제공하는 방식이다. 즉 pulling 동작방식이다. 코드를 보면 소비자가 next()를 호출하여 다음 데이터를 끌어오고있다.
public static void main(String[] args) throws IllegalAccessException {
        List<String> fruits = new ArrayList<>();
        fruits.add("사과");
        fruits.add("바나나");
        fruits.add("오렌지");
        fruits.add("포도");

        Iterator<String> iterator = fruits.iterator();

        while(iterator.hasNext()) {
            String fruit = iterator.next();
            System.out.println(fruit);
        }
    }
  • Observer : 생산자가 이벤트 또는 데이터가 준비되었을시에 바로 구독자에게 데이터를 제공하는 방식이다. pushing방식으로 동작한다. 코드를 보면 옵저버에 등록해놓으면 구독자가 원하지도 않는데 데이터를받고있다.
    public static void main(String[] args) throws IllegalAccessException {
        FruitStore store = new FruitStore();

        store.addObserver(new FruitBuyer("구매자1"));
        store.addObserver(new FruitBuyer("구매자2"));

        store.addFruit("사과");
        store.addFruit("바나나");
        store.addFruit("오렌지");
    }

//출력값
구매자1 새로운 과일을 받았습니당 사과
구매자2 새로운 과일을 받았습니당 사과
구매자1 새로운 과일을 받았습니당 바나나
구매자2 새로운 과일을 받았습니당 바나나
구매자1 새로운 과일을 받았습니당 오렌지
구매자2 새로운 과일을 받았습니당 오렌지

Interable 와 observer 의 차이는 pulling방식과 pushing방식이라고 볼수있는데 이 observer 패턴과 리액티브 스트림은 데이터를 전달방식이 같다고 볼수있다

그럼 왜 리액티브스트림은 push방식을 채택하였을까?
기본적으로 리액티브스트림은 비동기데이터 스트림을 효율적으로 다루기 위해서이다.
비동기 데이터 스트림에서는 데이터가 언제 준비될지 예측하기 어렵다. 데이터가 즉시 소비자에게 전달할수있다.
결론적으로 non-blocking으로 동작하기에 소비자는 데이터를 기다리는 동안 다른 작업을 수행할수있다.
Non-blocking 아키텍쳐의 이점을 살리고자 push방식을 채택했다고 볼 수 있다.

하지만 이 과정에서 생산자가 구독자의 상태나 생산자의 처리속도를 전혀 배려하지 않는다. 구독자가 처리할수있는것보다 더 많은 데이터가 들어오면 데이터손실, 또는 성능저하 문제가 발생할수있다. 그래서 바로 구독자를 배려하기 위해 리액티브 스트림 이 나왔다고 볼수있다.

즉 차이점은 리액티브 스트림은 백프레셔를 지원한다..
구독자는 자신이 처리할수있는 데이터를 요청하고 생산자는 해당 요청에 맞게 데이터를 구독자에게 전달하고 구독자가 데이터를 처리할수없는 상태(error)가 되었을때 생산자도 데이터의 발행이 중단된다.

여기서 토비가 말한 “Iterable과 reactiveStream은 쌍대성(Duality)이다.” 의 의미를 알수있다.
소비자가 생산자에게 요청하는 pull방식은 비슷한 구조를 가지고있지만, 데이터 소비방식에서는 push방식이기에 상반되어 이중성을 형성한다.

즉 데이터의 전달은 push방식, 백프레셔를 통해 pull방식을 혼합된것이다.

참고문서

Reactive Stream의 표준 인터페이스 #

표준 인터페이스는 여기서 자세하게 볼 수 있다.
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification

리액티브 스트림 동작과정 #

8b90dbfc-04f1-48a7-bf31-3030a4516a22

프로세스 #

  1. Subscriber 는 Publisher에게 subscribe()를 호출한다 -> 나 너 구독할래
  2. Publisher는 Subscription 객체를 생성해서 onSubscribe() 로 Subscriber 에게 준다 -> ㅇㅋ 너 구독완료됐어
  3. Subscriber는 Subscription객체를 통해서 request(n) 을 보내 n개 만큼 Publisher에게 요청한다 -> n개 줘
  4. Publisher는 요청된 데이터가 준비되면 onNext() 메서드를 통해서 필요한 Subscriber에게 데이터를 준다 -> 난 그동안 생성했으니까 n개 줄게
  5. 3,4 번 반복하다가 Publisher의 더이상 줄 데이터가 없으면 onComplete()를 호출해서 스트림의 종료를 Subscriber에게 전달한다 -> 이제 데이터 생산된거 없어
    만약 3,4번 반복하다가 Subscriber나 Publisher에서 에러가 발생하면 Publisher가 이를 감지하고 onError()를 호출하여 구독자에게 알리고 스트림을 종료한다 -> 너/나 에러났던데 나 그럼 스트림 종료한다?