해커톤 프로젝트 - Spring batch 관련 : PublicServiceReservationInfoInsertJobConfig

Chooooo·2023년 9월 1일
0

TIL

목록 보기
8/22
post-thumbnail

😎 PublicServiceReservationInfoInsertJobConfig

  • 전체 코드
@Slf4j
//@Configuration
@RequiredArgsConstructor
public class PublicServiceReservationInfoInsertJobConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager platformTransactionManager;

    private final WebClientService webClientService;
    private final PublicServiceReservationRepository repository;

    @Bean
    public Job publicServiceReservationInfoInsertJob(Step fetchLastUpdated, Step insertPublicServiceReservationStep) {
        log.info("[PublicServiceReservationInfoInsertJobConfig] Job Launched");

        return new JobBuilder("publicServiceReservationInfoInsertJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(fetchLastUpdated).on("CONTINUABLE")
                .to(insertPublicServiceReservationStep)
                .next(fetchLastUpdated)
                .from(fetchLastUpdated).on("COMPLETED")
                .end()
                .end()
                .build();
    }

    @Bean
    @JobScope
    public Step fetchLastUpdated(Tasklet fetchLastUpdatedTasklet) {
        return new StepBuilder("fetchLastUpdated", jobRepository)
                .tasklet(fetchLastUpdatedTasklet, platformTransactionManager)
                .build();
    }

    @Bean
    @JobScope
    public Step insertPublicServiceReservationStep(ItemReader<WebClientDTO> publicServiceReservationReader,
                                                   ItemProcessor<WebClientDTO, List<PublicServiceReservation>> publicServiceReservationProcessor,
                                                   ItemWriter<List<PublicServiceReservation>> publicServiceReservationWriter) {
        log.info("[PublicServiceReservationInfoInsertJobConfig] Insert Info Step Launched");

        return new StepBuilder("insertPublicServiceReservationStep", jobRepository)
                .<WebClientDTO, List<PublicServiceReservation>>chunk(10, platformTransactionManager)
                .reader(publicServiceReservationReader)
                .processor(publicServiceReservationProcessor)
                .writer(publicServiceReservationWriter)
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<WebClientDTO> publicServiceReservationReader() {
        return new PublicServiceReservationReader(webClientService);
    }

    @Bean
    @StepScope
    public ItemProcessor<WebClientDTO, List<PublicServiceReservation>> publicServiceReservationProcessor() {
        Set<String> serviceIds = repository.findAllDistinctServiceId();
        return item -> item.toDto().stream()
                .filter(i -> !serviceIds.contains(i.getServiceId()))
                .map(PublicServiceReservation::fromDto)
                .collect(Collectors.toList());
    }

    @Bean
    @StepScope
    public ItemWriter<List<PublicServiceReservation>> publicServiceReservationWriter() {
        return items -> items.forEach(i -> repository.saveAllAndFlush(i));
    }

}

😀 먼저 PublicServiceReservationReader

@Slf4j
@RequiredArgsConstructor
public class PublicServiceReservationReader implements ItemReader<WebClientDTO> {

    private final WebClientService webClientService;

    private static int size = 10;
    private static int startIndex = 1;
    private static int lastIndex = -1;
    private static boolean IS_END = false;

    @Override
    public WebClientDTO read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if (IS_END) {
            return null;
        }

        WebClientDTO response = webClientService.getPublicServiceReservation(startIndex, startIndex + size - 1);
        if (lastIndex == -1) {
            lastIndex = response.fetchListTotalCount();
        }

        startIndex += size;
        if (startIndex > lastIndex) {
            IS_END = true;
        }

        return response;
    }
}

ItemReader<WebClientDTO> 인터페이스를 구현한다. 이 ItemReader는 WebClientService를 사용하여 공공 서비스 예약 정보를 읽어온다.

⚽ size, startIndex, lastIndex, IS_END는 읽어올 데이터의 크기와 현재 읽는 위치, 마지막 인덱스, 작업이 끝났는지 여부를 관리하기 위한 변수!

read() 메서드는 실제로 데이터를 읽어오는 메서드.

  • IS_END가 true인 경우, 작업이 이미 끝났으므로 null을 반환하여 읽기를 종료한다.

  • webClientService.getPublicServiceReservation()를 사용하여 공공 서비스 예약 정보를 가져온다. startIndex부터 startIndex + size - 1까지의 데이터를 가져오며, 처음 호출 시 lastIndex가 -1이라면 전체 데이터의 개수를 가져와 lastIndex에 저장
    startIndex을 업데이트하고, startIndex이 lastIndex를 넘어서면 IS_END를 true로 설정하여 데이터 읽기가 종료되었음을 표시한다.

😀 FetchLastUpdatedTasklet

@Slf4j
@Component
@RequiredArgsConstructor
public class FetchLastUpdatedTasklet implements Tasklet {
    private final PublicReservationLastUpdatedRepository repository;
    private boolean updatedToday = false;
    private LocalDate today = LocalDate.now();

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        PublicReservationLastUpdated publicReservationLastUpdated = repository.findFirstByPublicReservationTypeOrderByLastUpdatedDesc(PublicReservationType.GENERAL)
                .orElseGet(() -> PublicReservationLastUpdated.of(0L, PublicReservationType.GENERAL, LocalDate.MIN));
        LocalDate lastUpdated = publicReservationLastUpdated.getLastUpdated();

        if(lastUpdated.equals(today)){
            log.info("[FetchLastUpdatedTasklet] [lastUpdated == today] Already updated for today.");
            contribution.setExitStatus(ExitStatus.COMPLETED);
            return RepeatStatus.FINISHED;
        }
        if(updatedToday){
            log.info("[FetchLastUpdatedTasklet] [updatedToday == true] Already updated for today.");
            repository.saveAndFlush(PublicReservationLastUpdated.of(PublicReservationType.GENERAL, today));

            contribution.setExitStatus(ExitStatus.COMPLETED);
            return RepeatStatus.FINISHED;
        }

        updatedToday = true;
        contribution.setExitStatus(new ExitStatus("CONTINUABLE"));

        return RepeatStatus.FINISHED;
    }
}

⚽ Tasklet 인터페이스를 구현. 이 Tasklet은 공공 서비스 예약 정보의 최근 업데이트를 확인하고 처리한다.

👻 execute() 메서드는 실제 작업을 수행하는 메서드.

  • repository.findFirstByPublicReservationTypeOrderByLastUpdatedDesc(PublicReservationType.GENERAL)를 사용하여 최근 업데이트된 공공 서비스 예약 정보를 가져온다.

  • lastUpdated와 today를 비교하여 이미 오늘 업데이트가 된 경우 작업을 완료.
    updatedToday가 true인 경우에도 작업을 완료합니다.
    그 외의 경우 CONTINUABLE 상태로 표시하여 다음 작업을 진행할 수 있도록 한다.

⚽ 이렇게 코드는 배치 작업에서 필요한 데이터를 읽어오거나 업데이트하고, 작업의 상태를 관리하는 역할

😀 PublicServiceReservationInfoInsertJobConfig 전체 해석

  • 해당 배치는 공공 서비스 예약 정보를 처리하는 spring batch 작업을 설정.

publicServiceReservationInfoInsertJob 메서드: Job을 정의하는 메서드이다. JobBuilder를 사용하여 Job을 생성하고, 두 개의 Step(fetchLastUpdated와 insertPublicServiceReservationStep)를 순차적으로 실행하도록 설정. 또한 incrementer를 사용하여 Job 파라미터를 증가시키고, 실행이 가능한지 여부에 따라 다음 Step을 지정한다.

fetchLastUpdated 메서드: 첫 번째 Step인 fetchLastUpdated를 정의하는 메서드입니다. tasklet을 사용하여 작업을 수행하며, platformTransactionManager를 통해 트랜잭션 관리를 합니다.

insertPublicServiceReservationStep 메서드: 두 번째 Step인 insertPublicServiceReservationStep을 정의하는 메서드이다. 이 Step은 공공 서비스 예약 정보를 읽고 처리하는 역할을 합니다. chunk 크기는 10으로 설정되어 한 번에 10개의 아이템을 처리하며, ItemReader, ItemProcessor, ItemWriter를 설정합니다.

publicServiceReservationReader 메서드: 공공 서비스 예약 정보를 읽어오는 ItemReader를 정의하는 메서드입니다. PublicServiceReservationReader 클래스를 사용하여 데이터를 읽어옵니다.

publicServiceReservationProcessor 메서드 : 읽어온 데이터를 처리하는 ItemProcessor를 정의하는 메서드입니다. 중복을 제거하고 DTO 객체를 PublicServiceReservation 객체로 변환하여 반환합니다.

publicServiceReservationWriter 메서드: 처리된 공공 서비스 예약 정보를 저장하는 ItemWriter를 정의하는 메서드입니다. repository를 통해 데이터를 저장하고 즉시 플러시합니다.

profile
back-end, 지속 성장 가능한 개발자를 향하여

0개의 댓글