TIL(사전캠프)

리액티브 스트림즈(2024-06-26)

note994 2024. 6. 26. 18:01

1. 코드로 보는 리액티브 스트림즈 컴포넌트

 

1.1 Publisher

코드2-1 Publisher 인터페이스

코드 2-1은 Publisher 인터페이스 코드인데, 코드 형태는 subscribe 메서드 하나만 구현하면 되는 수준으로 매우 단순하다.

 

subscribe 메서드는 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.

 

그런데 Publisher 인터페이스 코드를 보면서 다음과 같은 의문이 들 수 있다.

 

'Publisher는 데이터를 생성하고 통지하는 역할을 하고, Subscriber는 Publisher가 통지하는 데이터를 전달받기 위해 구독을 한다고 이해했는데, 왜 Subscriber가 아닌 Publisher에 subscribe 메서드가 정의되어 있는가?'

 

Kafka같은 메시지 기반 시스템에서 Pub/Sub 모델을 이해하고있다면 의문이 들 수 있다.

 

리액티브 스트림즈에서의 Publisher/Subscriber와 Kafka에서의 Publisher/Subscriber는 의미가 조금 다르다.

 

Kafka의 경우 Publisher와 Subscriber 중간에 메시지 브로커가 있고 이 브로커 내에 여러 개의 토픽이 존재하는데, Publisher와 Subscriber는 브로커에 있는 특정 토픽을 바라보는 구조로 이루어져 있다. 그래서 Kafka에서의 Publisher와 Subscriber는 각각 브로커 내의 특정 토픽만 바라보면 되기 때문에 Publisher는 특정 토픽으로 메시지 데이터를 전송하기만 하면 되고, Subsciber는 특정 토픽을 구독하고 해당 토픽에 전다로디는 메시지 데이터를 전달받기만 하면 된다. 이는 Publisher와 Subscriber의 느슨한 결합 구조라고 볼 수 있다.

 

리액티브 스트림즈의 경우 Publisher와 Subscriber는 개념상으로는 Subscriber가 구독하는 것이 맞는데 실제 코드상에서는 코드2-1처럼 Publisher가 subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어진다.


1.2 Subscriber

코드2-2 Subscriber 인터페이스

코드 2-2는 Subscriber 인터페이스의 코드이다. Subscriber 인터페이스는 총 네 개의 메서드를 구현해야 한다.

 

- onSubscribe : 구독 시작 시점에 어떤 처리를 하는 역할을 한다. 여기서의 처리는 Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미하는데, 이것은 onSubscribe 메서드의 파라미터로 전달되는 Subscripion 객체를 통해서 이루어진다.

 

- onNext : Publisher가 통지한 데이터를 처리하는 역할을 한다.

 

- onError : Publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할을 한다.

 

- onComplete : Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드이다. 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야 한다면 onComplete 메서드에서 처리 코드를 작성하면 된다.


 

1.3 Subscription

코드2-3 Subscription 인터페이스

 

Subscription 인터페이스는 Subscriber가 구독한 데이터의 개수를 요청하거나 또는 데이터 요청의 취소, 즉 구독을 해지하는 역할을 한다.

 

Subscripton 인터페이스는 두 개의 메서드를 구현해야 한다. request 메서드를 통해서 Publisher에게 데이터의 개수를 요청할 수 있고 cancel 메서드를 통해서 구독을 해지할 수 있다.

 

예전에 설명한 Publisher와 Subscriber의 동작 과정을 리액티브 스트림즈의 컴포넌트 코드 관점에서 간략하게 다시 설명하면 다음과 같다.

 

1. Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메서드의 파라미터로 전달한다.

 

2. Publisher 내부에서는 전달받은 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드를 호출하면서 Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달한다.

 

3. 호출된 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher에게 요청한다.

 

4. Publisher는 Subscriber로부터 전달받은 요청 개수만큼의 데이터를 onNext 메서드를 호출해서 Subscriber에게 전달한다.

 

5. Publisher는 통지할 데이터가 더 이상 없을 경우 onComplete 메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알린다.


1.4 Processor

코드2-4 Processor 인터페이스

Processor는 별도로 구현해야 하는 메서드가 없다. 다른 인터페이스들과 다른 점은 Subscriber 인터페이스와 Publisher 인터페이스를 상속한다는 것이다. 이는 2장, 표 2-1 리액티브 스트림즈 컴포넌트에서 설명한 대로 Processor가 Publisher와 Subscriber의 기능을 모두 가지고 있기 때문이다.