Reactor: Error 1 - Generating Errors

xellos·2022년 5월 21일
1

JAVA-Reactor

목록 보기
7/11

소개

탄력성은 반응형 시스템에서 매우 중요한 관점이다. 반응형 시스템은 실패하는 동안에도 응답 가능한 상태로 남아있어야 한다. 시스템은 에러를 잘 다루며, 사용자의 요청에 적시에 응답해야 한다. 이러한 요구사항은 효율적인 에러처리 메커니즘 없이는 달성하는것이 불가능하다.

이러한 요구사항에 대하여 Reactor는 에러 처리와 관련된 다양한 연산자를 제공한다. 이번 글에서는 이와 관련하여 Reactor 를 사용하여 에러를 발생하는 부분과, 발생한 에러를 처리하는 방법에 대하여 살펴보자.

에러 발생

예외는 아래와 같은 조건에서 발생할 수 있다.

  • 발행자가 값을 생성하는 도중에 예외를 발생시키는 경우
  • 구독자가 이벤트를 처리하는 과정에서 예외를 발생시키는 경우

1) 예외 발생 예제

발행자가 예외를 발생시키는 경우

@Test
void throwException() {
	Flux<Object> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long>of(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) throw new RuntimeException("Value out of bounds");
            else sink.next(state.getT1());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator.subscribe(System.out::println);
}
  • 결과
    에러와 관련한 어떠한 설정도 되어있지 않기 때문에 구독자는 ErrorCallbackNoImplemented 예외를 발생시킨다.
4660046610375530309
7540113804746346429
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Value out of bounds
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Value out of bounds
Caused by: java.lang.RuntimeException: Value out of bounds

구독자가 예외를 발생시키는 경우

@Test
void throwSubscriberException() {
	Flux<Object> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long>(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) throw new RuntimeExeption("Value out of bounds");
            else sink.next(state.getT1());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator.subscribe(x -> {
    	throw new RuntimeException("Subscriber threw error");
    });
}
  • 결과
    발행자에서 와 마찬가지로 에러와 관련한 어떠한 설정도 되어있지 않기 때문에 구독자는 ErrorCallbackNoImplemented 예외를 발생시킨다.
512, 2022 3:33:04 오후 org.junit.platform.launcher.core.EngineDiscoveryOrchestrator lambda$logTestDescriptorExclusionReasons$7
정보: 0 containers and 7 tests were Method or class mismatch
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Subscriber threw error
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Subscriber threw error
Caused by: java.lang.RuntimeException: Subscriber threw error

위의 두 경우에서 보듯, Reactor는 동일한 방식으로 에러를 다룬다. 구독자는 반드시 에러를 다루는 함수를 제공해서 Stream이 성공적으로 종료되도록 해야한다.


예외를 던지는 다른 방법: sink.error(e) 메서드

아래의 방법은 위의 방법과는 다른 방법으로 예외를 던진다.

  • 이번에는 예외에 반응하는 콜백을 추가하였다.
Flux<Long> fibonacciGenerator = Flux.generate(
	() -> Tuples.<Long, Long>of(0L, 1L),
    (state, sink) -> {
    	if(state.getT1() < 0) sink.error(new RuntimeException("Value out of bounds"));
        else sink.next(state.getT1());
        return Tuples.of(state.getT2(), state.getT1() + state.getT2());
    });
    
fibonacciGenerator.subscribe(
	System.out::println,
    System.out::println
);
  • 결과
2880067194370816120
4660046610375530309
7540113804746346429
java.lang.RuntimeException: Value out of bounds

Checked Exception

Checked 예외의 경우 발행자와 구독자 모두 발생시킬 수 없다. 이때 Exceptions 클래스가 제공하는 propagate 메서드를 사용하여 unchecked 예외로 변경하여 전달할 수 있다.

void raiseCheckedException() throws IOException {
	throw new IOException("Raising checkedException");
}

@Test
void checkedException {
	Flux<Long> fibonacciGenerator = Flux.generator(
    	() -> Tuples.<Long, Long>of(0L, 1L),
        (state, sink) -> {
        	try { raiseCheckedException(); }
            catch(IOException e) { Exceptions.propagate(e); }
            return Tuples.of(state.getT1(), state.getT1() + state.getT2());
        });
        
    fibonacciGenerator.subscribe(
    	System.out.println,
        Exceptions.unwrap
    );
}

2) Hook

Reactor는 모든 생명주기 마다 콜백을 설정할 수 있다. 여기에는 에러 콜백과 관련된 Hook도 설정할 수 있다.

doOnError Hook

doOnError Hook은 에러를 소비하고 의도한 동작을 수행할 수 있게 한다.만약 에러 콜백이 아래와 같이 설정되어 있다면 동시에 수행된다.

@Test
void doOnErrorHook() {
	fibonacciRuntimeExceptionGenerator
    	.doOnError(System.out::println)
        .subscribe(
        	System.out::println,
            e -> e.printStackTrace());
}
  • 결과
java.lang.RuntimeException: Value out of bounds
java.lang.RuntimeException: Value out of bounds
	at chapter9.GeneratingErrors.lambda$new$1(GeneratingErrors.java:16)

doOnTerminate Hook

onCompletion 또는 onError 이벤트 같이 스트림을 종료할 때 동작을 수행하는 Hook이다.

  • 어떤 종류의 input 을 제공하지 않고 그저 제공되즞 람다를 실행한다.
@Test
void doOnTerminatorHook() {
	fibonacciRuntimeExceptionGenerator
    	.doOnTerminate(() -> System.out.println("Terminated")
        .subscribe(
        	System.out::println,
            Throwable::printStackTrace
        );
}
  • 결과
    종료 콜백이 먼저 발생하고, 에러처리 콜백이 그 다음으로 발생한 것을 확인할 수 있다.
...
2880067194370816120
4660046610375530309
7540113804746346429
Terminated
java.lang.RuntimeException: Value out of bounds
...

doFinally Hook

이 Hook 은 스트림이 완전히 종료되고 나서 동작을 수행한다.

@Test
void doFinallyHook() {
	fibonacciRuntimeExeptionGenerator
    	.doFinally(x -> System.out.println("invoking finally"))
        .subscribe(
        	System.out::println,
            Throwable::printStackTrace
        );
}
  • 결과
...
2880067194370816120
4660046610375530309
7540113804746346429
java.lang.RuntimeException: Value out of bounds
...
invoking finally

Flux.using

doFinally Hook의 대체제로 Flux.using API를 사용할 수 있다. 이 메서드는 공급자를 위해 매핑된 리소스를 설정한다.
마찬가지로 람다식을 설정할 수 있으며, 이를 사용하여 스트림의 종료에 따라 공급자의 리소스를 설정하낟.

  • try-with-resource 의 개념으로 이해하면 된다.
@Test
void usingAPI() {
	Flux<Long> fibonacciGenerator = Flux.generate(
    	() -> Tuples.<Long, Long> of(0L, 1L),
        (state, sink) -> {
        	if(state.getT1() < 0) sink.complete();
            else sink.next(state.getT1());
            return Tuples.of(state.getT2(), state.getT1() + state.getT2());
        });
        
    Closeable closeable = () -> System.out.println("closing the stream);
    Flux.usting(
    	() -> closeable,
        x -> fibonacciGenerator,
        e -> {
        	try { e.close(); }
            catch(Exception e) { throw Exceptions.propagate(e); }
        }).subscribe(System.out::println);
}
  • 결과
...
2880067194370816120
4660046610375530309
7540113804746346429
closing the stream

0개의 댓글