멀티스레드 Step 환경에서 ItemStreamReader 를 thread-safe 안전하게 감싸주며 여러 스레드가 동시에 read() 를 호출하더라도 동기화(synchronized) 처리되는 동작을 보장한다.
SynchronizedItemStreamReader 는 확장 모듈이 아니라 spring-batch-core 라이브러리에 포함되어 있는 클래스이다. starter-batch 를 사용하고 있다면 바로 사용하면 된다. (batch-core 가 자동으로 포함되어 있음)
JpaCursorItemReader, JdbcCursorItemReader, FlatFileItemReader 등
SynchronizedItemStreamReader 는 직접 데이터를 읽지 않고 delegate 에게 반드시 위임해야 한다. delegate 를 지정하지 않은 null 인 상태에서 read() 를 호출하면 NullPointerException 이 발생하게 된다.
SynchronizedItemStreamReader 는 void 반환값이 없으므로 객체 생성 -> setDelegate() -> return 순서로만 사용해줘야 된다.
@Bean
public Step sendNotificationStep() {
return new StepBuilder("sendNotificationStep", jobRepository)
.<NotificationEntity, NotificationEntity>chunk(CHUNK_SIZE, transactionManager)
.reader(sendNotificationItemReader()) // 여기서 syncReader 사용
.writer(sendNotificationItemWriter) // 알람 전송 로직
.taskExecutor(new SimpleAsyncTaskExecutor()) // 멀티스레드 실행
.build();
}
@Bean
public SynchronizedItemStreamReader<NotificationEntity> sendNotificationItemReader() {
JpaCursorItemReader<NotificationEntity> itemReader =
new JpaCursorItemReaderBuilder<NotificationEntity>()
.name("sendNotificationItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("select n from NotificationEntity n where n.event = :event and n.sent = :sent")
.parameterValues(Map.of("event", NotificationEvent.BEFORE_CLASS, "sent", false))
.build();
// Reader 가 읽어온 Chunk 단위의 NotificationEntity 리스트를 차례로 Writer 에 전달하기 위한 래퍼
SynchronizedItemStreamReader<NotificationEntity> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate(itemReader); // 실제 DB 읽기 로직을 delegate 로 연결
return syncReader;
}
delegate 인 JpaCursorItemReader 가 DB 에서 NotificationEntity 를 순서대로 가져옴 -> Writer 로 데이터가 청크 단위로 전달됨
여러 스레드가 동시에 reader.read() 호출 가능
SynchronizedItemStreamReader 는 내부적으로 이렇게 되어 있어서
public synchronized T read() {
return delegate.read();
}
여러 스레드가 접근해도 하나의 스레드가 각각 delegate.read() 를 실행해서 DB 접근이 꼬이지 않음