에러를 처리할때, 프로세스를 종료하지 않고 대체할 이벤트 또는 데이터를 발행하고 계속 진행하고 싶을 경우가 있을 수 있다. 여기서는 이러한 목적을 달성하는 방법을 알아보자.
Reactor는 예외가 발생했을 때, onERrorReturn 연산자로 fallback 값을 반환하도록 설정할 수 있다.
@Test
void onErrorReturn() {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0)
sink.error(new RuntimeException("Value out of bounds"));
else
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.onErrorReturn(0L)
.subscribe(System.out::println);
}
0
1
1
2
3
5
8
13
21
34
...
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429
0
onErrorReturn
를 여러번 설정하고, 에러 클래스를 인자로 넘겨주면 에러 타입마다 다른 값을 반환할 수 있다. 이때, 하위타입의 에러도 잡아서 처리하기 때문에 순서가 중요하다.
@Test
void onErrorReturn2() {
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0)
sink.error(new IllegalARgumentException("Value out of bounds"));
else
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator
.onErrorReturn(IllegalArgumentException.class, -1L)
.onErrorReturn(RuntimeExcepiton.class, 0L)
.subscribe(System.out::println);
}
Fallback 값 대신에 Stream을 받아서 로직을 수행한다.
@Test
void onErrorResume() {
fibonacciWithRuntimeExceptionGenerator
.onErrorResume(x -> Flux.just(0L, -1L, 2L)
.subscribe(System.out::println);
}
0
1
1
2
...
2880067194370816120
4660046610375530309
7540113804746346429
0
-1
-2
전파되는 예외의 타입을 변환할 수 있는 연산자다.
@Test
void onErrorMap() {
fibonacciWithRuntimeExceptionGenerator
.onErrorMap(x -> new IllegalArgumentException("Publisher threw error", x))
.subscribe(
System.out::println,
System.out::println
);
}
...
2880067194370816120
4660046610375530309
7540113804746346429
java.lang.IllegalArgumentException: Publisher threw error
외부 API를 호출하는 등의 작업을 수행할 경우 API 에서 에러가 발생하면 응답을 받지 못하고 스트림은 무한정 대기를 하게 됩니다. 이를 방지하기 위해서는 스트림이 대기하는 시간을 설정하고 그에 따른 알맞은 처리를 하는 로직을 작성해야 합니다.
이러한 작업을 위해서 Reactor는 timeout()
연산자를 제공합니다. 이 연산자는 대기중인 작업을 특정 시간이 지나면 자동으로 실패하도록 처리합니다. → 지정된 시간이 만료되면 자동으로 에러 콜백으로 넘어갑니다.
@Test
void timeout() InterrtuptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
fibonacciRuntimeExceptionGenerator
.delayElements(Duration.ofSeconds(1))
.timeout(Duration.ofMillis(500))
.subscribe(
System.out::println,
e -> {
System.out.println(e);
countDownLatch.countDown();
});
countDownLatch.await();
}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 500ms in 'concatMap' (and no fallback has been configured)
이 경우 예외를 던지지 않는다. 결과적으로 스트림이 종료되지 않고 계속 진행된다.
@Test
void timeout2() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
fibonacciRuntimeExceptionGenerator
.delayElements(Duration.ofSeconds(1))
.timeout(Duration.ofMillis(500), Flux.just(-1L))
.subscribe(e -> {
System.out.println(e);
countDownLatch.countDown();
});
countDownLatch.awailt();
}
-1
대기 상태에서 실패할 경우 재시도할 횟수를 설정한다. 이때, 실패한 지점부터가 아닌 처음부터 다시 모든 이벤트를 발행한다.
@Test
void retry() {
CountDownLatch countDownLatch = new CountDownLatch(1);
fibonacciRuntimeExceptionGenerator
.retry(1)
.subscribe(
System.out::println,
e -> {
System.out.println("received : " + e);
countDownLatch.countDown();
},
countDownLatch::countDown);
}
0
1
1
2
...
2880067194370816120
4660046610375530309
7540113804746346429
0
1
1
2
..
2880067194370816120
4660046610375530309
7540113804746346429
received : java.lang.RuntimeException: Value out of bounds