탄력성은 반응형 시스템에서 매우 중요한 관점이다. 반응형 시스템은 실패하는 동안에도 응답 가능한 상태로 남아있어야 한다. 시스템은 에러를 잘 다루며, 사용자의 요청에 적시에 응답해야 한다. 이러한 요구사항은 효율적인 에러처리 메커니즘 없이는 달성하는것이 불가능하다.
이러한 요구사항에 대하여 Reactor는 에러 처리와 관련된 다양한 연산자를 제공한다. 이번 글에서는 이와 관련하여 Reactor 를 사용하여 에러를 발생하는 부분과, 발생한 에러를 처리하는 방법에 대하여 살펴보자.
예외는 아래와 같은 조건에서 발생할 수 있다.
@Test
void throwException() {
Flux<Object> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) throw new RuntimeException("Value out of bounds");
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.subscribe(System.out::println);
}
ErrorCallbackNoImplemented
예외를 발생시킨다.4660046610375530309
7540113804746346429
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Value out of bounds
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Value out of bounds
Caused by: java.lang.RuntimeException: Value out of bounds
@Test
void throwSubscriberException() {
Flux<Object> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) throw new RuntimeExeption("Value out of bounds");
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.subscribe(x -> {
throw new RuntimeException("Subscriber threw error");
});
}
ErrorCallbackNoImplemented
예외를 발생시킨다.5월 12, 2022 3:33:04 오후 org.junit.platform.launcher.core.EngineDiscoveryOrchestrator lambda$logTestDescriptorExclusionReasons$7
정보: 0 containers and 7 tests were Method or class mismatch
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Subscriber threw error
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Subscriber threw error
Caused by: java.lang.RuntimeException: Subscriber threw error
위의 두 경우에서 보듯, Reactor는 동일한 방식으로 에러를 다룬다. 구독자는 반드시 에러를 다루는 함수를 제공해서 Stream이 성공적으로 종료되도록 해야한다.
아래의 방법은 위의 방법과는 다른 방법으로 예외를 던진다.
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.error(new RuntimeException("Value out of bounds"));
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.subscribe(
System.out::println,
System.out::println
);
2880067194370816120
4660046610375530309
7540113804746346429
java.lang.RuntimeException: Value out of bounds
Checked 예외의 경우 발행자와 구독자 모두 발생시킬 수 없다. 이때 Exceptions 클래스가 제공하는 propagate 메서드를 사용하여 unchecked 예외로 변경하여 전달할 수 있다.
void raiseCheckedException() throws IOException {
throw new IOException("Raising checkedException");
}
@Test
void checkedException {
Flux<Long> fibonacciGenerator = Flux.generator(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
try { raiseCheckedException(); }
catch(IOException e) { Exceptions.propagate(e); }
return Tuples.of(state.getT1(), state.getT1() + state.getT2());
});
fibonacciGenerator.subscribe(
System.out.println,
Exceptions.unwrap
);
}
Reactor는 모든 생명주기 마다 콜백을 설정할 수 있다. 여기에는 에러 콜백과 관련된 Hook도 설정할 수 있다.
doOnError Hook은 에러를 소비하고 의도한 동작을 수행할 수 있게 한다.만약 에러 콜백이 아래와 같이 설정되어 있다면 동시에 수행된다.
@Test
void doOnErrorHook() {
fibonacciRuntimeExceptionGenerator
.doOnError(System.out::println)
.subscribe(
System.out::println,
e -> e.printStackTrace());
}
java.lang.RuntimeException: Value out of bounds
java.lang.RuntimeException: Value out of bounds
at chapter9.GeneratingErrors.lambda$new$1(GeneratingErrors.java:16)
onCompletion 또는 onError 이벤트 같이 스트림을 종료할 때 동작을 수행하는 Hook이다.
@Test
void doOnTerminatorHook() {
fibonacciRuntimeExceptionGenerator
.doOnTerminate(() -> System.out.println("Terminated")
.subscribe(
System.out::println,
Throwable::printStackTrace
);
}
...
2880067194370816120
4660046610375530309
7540113804746346429
Terminated
java.lang.RuntimeException: Value out of bounds
...
이 Hook 은 스트림이 완전히 종료되고 나서 동작을 수행한다.
@Test
void doFinallyHook() {
fibonacciRuntimeExeptionGenerator
.doFinally(x -> System.out.println("invoking finally"))
.subscribe(
System.out::println,
Throwable::printStackTrace
);
}
...
2880067194370816120
4660046610375530309
7540113804746346429
java.lang.RuntimeException: Value out of bounds
...
invoking finally
doFinally Hook의 대체제로 Flux.using API를 사용할 수 있다. 이 메서드는 공급자를 위해 매핑된 리소스를 설정한다.
마찬가지로 람다식을 설정할 수 있으며, 이를 사용하여 스트림의 종료에 따라 공급자의 리소스를 설정하낟.
@Test
void usingAPI() {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long> of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
Closeable closeable = () -> System.out.println("closing the stream);
Flux.usting(
() -> closeable,
x -> fibonacciGenerator,
e -> {
try { e.close(); }
catch(Exception e) { throw Exceptions.propagate(e); }
}).subscribe(System.out::println);
}
...
2880067194370816120
4660046610375530309
7540113804746346429
closing the stream