TIL(사전캠프)

리액티브. (2024-06-28)

note994 2024. 6. 28. 17:25

1. 리액티브 스트림즈의 구현 규칙

리액티브 스트림즈 표준 사양에는 리액티브 스트림즈 컴포넌트를 어떻게 구현해야 되는지에 대한 규칙들이 컴포넌트별로 정의되어있다. 우리가 리액티브 스트림즈를 구현해서 직접 라이브러리를 제작하는 것이 아니기 때문에 모든 규칙들을 살펴보진 않는다.

 

다만 이미 구현된 리액티브 스트림즈 구현체를 올바르게 이해하고 제대로 사용하기 위해서 리액티브 스트림즈의 기본적인 구현 규칙은 알고 있는 것이 좋다.

 

번호 규칙
1 Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
2 Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
3 Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다.
4 Publishr의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
5 Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
6 일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다.
7 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다.

표 2-2 Publisher 구현을 위한 주요 기본 규칙

 

표 2-2 Publisher 구현을 위한 주요 기본 규칙 추가 설명

1. Subscriber가 요청한 것보다 더 많은 개수의 데이터를 Publisher가 보낼 수 없음을 의미한다. 즉, Subscriber의 요청 개수보다 더 적거나 같은 개수의 onNext signal을 통해 데이터를 보낼 수 있다.

 

2. 1번 규칙과 유사하지만 Subscriber의 요청을 마지막으로 처리하는 경우의 규칙이다. 예를 들어 Subscriber의 요청 개수가 5이고 Publisher가 emit할 데이터 개수가 3개밖에 남지 않았다면 남은 3개를 Subscriber에게 emit한 후, 정상적으로 처리가 끝났음을 알리는 onComplete signal을 보내거나 에러가 발생했다면 onError signal을 보낸다.

하지만 이 규칙의 예외가 있는데 Publisher가 처리할 데이터가 끊임없이 발생하는 무한 스트림의 경우, 처리 중 에러가 발생하기 전까지는 종료 자체가 없기 때문에 2번 규칙에 대한 예외적인 경우가 된다. IoT 디바이스 센서에서 발생하는 데이터처럼 끊임없이 발생하는 데이터를 생각하면 된다.

 

3. Publisher가 처리를 진행할 수 없는 상황이 되면 Subscriber에게 알려서 Publisher에서 발생한 실패를 Subscriber가 처리할 수 있는 기회를 가지도록 하는 데 주목적이 있다.

 

4. Publisher가 종료 상태가 되었음을 Subsciber에게 알림으로써 Subscriber가 리소스를 정리하는 등의 후처리를 할 수 있도록 하는 데 그 목적이 있다.

 

5. Publisher가 전송한 onError 또는 onComplete Signal은 구독 취소와 동일한 기능을 한다는 것을 의미한다.

 

6. onError 또는 onComplete signal을 통해 Publisher와 Subscriber 간의 상호작용이 끝났음을 알리는 것이다. 즉, 이미 상호작용이 끝났음을 알렸는데 signal이 발생하는 것은 논리적으로 맞지 않다는 의미이다.

 

7. 구독 취소를 위해 Subscription.cancel()이 호출되었을 때 Publisher가 구독 취소 요청을 준수해야 한다는 의미이다.


번호 규칙
1 Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해
Demand signal을 Publisher에게 보내야 한다.
2 Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안 된다.
3 Subscriber.onComplete 및 Subscriber.OnError(Throwable t)는 Signal을 수신한 후 구독이 취소된 것으로 간주해야 한다.
4 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel()을 호출해야 한다.
5 Subscriber.onSubscribe()은 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다

표 2-3 Subscriber 구현을 위한 주요 기본 규칙

 

표 2-3 Subscriber 구현을 위한 주요 기본 규칙 추가 설명

1. 데이터를 언제, 얼마나 수신할 수 있는지를 결정하는 책임이 Subscriber에게 있다는 것을 확립하는 것이다. 리액티브 스트림즈에서는 한 번에 하나의 데이터를 요청하기보다는 Subscriber가 처리할 수 있는 적절한 상한선만큼의 데이터 개수 요청을 권장한다.

 

2. Subscriber가 완료 signal 또는 에러 signal을 처리하는 동안 Publisher/Subscription과 Subscriber 간의 순환 및 경쟁 조건(Race Condition)을 방지하기 위함이다

 

3. Subscriber가 Publisher의 종료 상태 signal을 인정하고 받아들이도록 하는 것이다. 즉, onComplete 또는 onError signal이 수신되면 구독이 더 이상 유효하지 않게 된다는 것을 의미한다. 즉, onComplete 또는 onError signal이 수신되면 구독이 더 이상 유효하지 않게 된다는 것을 의미한다.

 

4. Subscriber가 구독이 더 이상 필요하지 않을 때 명시적으로 구독을 취소함으로써 해당 구독이 유지하고 있는 리소스를 적절한 시기에 안전하게 해제할 수 있도록 하는 것이다.

 

5. 동일한 구독자가 최대 한 번만 구독할 수 있다는 의미와 같다.


번호 규칙
1 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다.
2 구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다.
3 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다.
4 구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다.
5 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
6 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
7 Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
8 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2의63승-1개의 Demand를 지원해야 한다.

표 2-4 Subscription 구현을 위한 주요 기본 규칙

 

표 2-4 Subscription 구현을 위한 주요 기본 규칙 추가 설명

1. request와 onNext 사이의 상호 재귀로 인해 발생할 수 있는 스택 오버플로를 피하기 위해 request가 다시 호출된다는 것을 분명히 하는 것이다. onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출한다는 의미는 request를 호출하는 스레드와 onNext signal을 보내는 스레드가 동일할 수 있다는 것이다.

 

2. 상식적으로 생각해도 당연한 규칙이다. 구독을 취소했다면 더이상의 요청은 의미가 없어져야 한다.

 

3. 마찬가지로 이미 구독을 취소했는데 또 취소할 수는 없다.

 

4. 잘못된 구현으로 인해 예외 발생 없이 계속 작업이 진행되는 것을 방지하는 것이다. 요청 개수가 조건에 부합하지 않을 경우 onError signal을 전달받아서 예외 처리를 할 수 있음을 알 수 있다.

 

5. Subscription.cancel()의 의미가 구독을 취소한다는 의미에서 그치는 것이 아니라 구독 취소를 하니 signal 보내는 것을 중지하라고 Publisher에게 요청한다는 의미까지 포함함을 알 수 있다.

 

6. 5번 규칙에 대한 더 구체적인 내용으로, Subscription.cancel()을 호출했을 때, Publisher가 signal 전송을 중지할 뿐만 아니라 구독 취소를 요청한 구독자에 대한 참조까지 삭제한다는 것을 알 수 있다. 이 규칙을 통해서 Garbage Collector가 더 이상 유효 하지 않은 구독자의 객체를 수집하여 메모리를 확보할 수 있도록 해준다.

 

7. Subscription의 메서드를 호출했을 때 메서드 내부로 예외가 던져지지 않도록 규정한다. Java에서 일반적으로 어떤 메서드를 호출하여 예외가 발생하면 메서드를 호출한 쪽으로 예외를 던지는데 리액티브 스트림즈에서는 예외가 발생하면 해당 예외를 onError signal과 함께 보내도록 규정한다. (리액티브 스트림즈에서는 메서드 호출 시, 유효한 값 이외에는 어떠한 예외도 던지지 않는다는 의미로 'Return normally'라는 용어를 사용한다.

 

8. 구독자는 한번 요청할 때 무한한 개수의 데이터를 요청할 수 있고, 그 요청을 끝없이 호출할 수 있음을 알 수 있다. 이렇게 무한히 발생하는 데이터의 흐름을 무한 스트림이라고 한다.