Spring Batch 삽질기 3탄

Miz·2021년 5월 29일
1

board-toy-project

목록 보기
6/6

대부분 스프링 배치의 내용은 '기억보다는 기록을' 블로그를 참고하여 공부하였습니다.
배치에 대해 공부하고자 하시는분은 이 글 보다 아래 블로그를 보는게 훨씬 도움이 됩니다
https://jojoldu.tistory.com/

1탄 https://velog.io/@miz/Spring-Batch-%EC%82%BD%EC%A7%88%EA%B8%B0-1%ED%83%84
2탄 https://velog.io/@miz/Spring-Batch-%EC%82%BD%EC%A7%88%EA%B8%B0-1%ED%83%84

코드는 GitHub에 있습니다.

1. 기존상황

  • 2탄에서 @Component로 지정한 DataShareBean을 통해 step간 데이터 공유를 쉽게 만들었습니다.
    - 추가적으로 처음 작성할때 ConcurrentHashMap을 잘못사용했던 점을 고쳤습니다.

2. 개선사항

  • 이제 Step간에 병렬로 처리할때 발생하는 필드변수의 문제점은 해결하였으니 진짜 병렬로 처리해보고 성능 개선을 해보겠습니다.
  • 테스트를 위해서 1탄에서 사용한 Pay데이터를 10개, 10만개, 100만개 순으로 늘려서 적용해보았습니다.
  • 다양한 병렬 처리기능을 SpringBatch에서 지원하지만, 저는 아래에서 jojoldu님의 글을 참조하여 파티셔닝으로 처리해보겠습니다.

    https://jojoldu.tistory.com/550?category=902551

3. 시작

  • 파티셔닝
    - 매니저(마스터)를 이용해 데이터를 쪼개서 나눈 다음 파티션에서 슬레이브가 독립적으로 동작한다.

    • 데이터를 더 작은 Chunk 단위로 쪼개서 적용한다.
    • 기존의 코드에 대한 변경 없이 추가적인 코드 작성으로 병렬처리가 가능하다.

    4. JobConfiguration Class

@Slf4j
@Configuration
@RequiredArgsConstructor
public class PayTotalJobPartitionConfiguration {

    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    public static final String JOB_NAME = "PayTotalJobPartitionJob";
    public static final String BEAN_PREFIX = JOB_NAME + "_";

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;
    private final PayRepository payRepository;
    private final TotalPayRepository totalPayRepository;
    private final DataShareBean<Long> dataShareBean;

    private final static int chunkSize = 1000;
    private final static int poolSize = 20;

    @Bean(name = JOB_NAME+"_taskPool")
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("partition-thread");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }

    @Bean(name = JOB_NAME+"_partitionHandler")
    public TaskExecutorPartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
        partitionHandler.setStep(step1()); // (2)
        partitionHandler.setTaskExecutor(executor()); // (3)
        partitionHandler.setGridSize(poolSize); // (4)
        return partitionHandler;
    }

    @Bean(JOB_NAME)
    public Job job() {
        //listener beforeJob으로 변경
        //dataShareBean.putData("totalAmount", 0L);
        return jobBuilderFactory.get(JOB_NAME)
                .listener(listener())
                .start(step1Manager())
                .next(step2(null))
                //.preventRestart()
                .build();
    }

    @Bean(BEAN_PREFIX + "listener")
    public JobExecutionListener listener() {
        return new TotalJobListener(dataShareBean);
    }

    @Bean(name = JOB_NAME +"_step1Manager")
    public Step step1Manager() {
        return stepBuilderFactory.get("step1.manager") // (1)
                .partitioner("step1", partitioner(null)) // (2)
                .step(step1()) // (3)
                .partitionHandler(partitionHandler()) // (4)
                .build();
    }

    @Bean(name = JOB_NAME +"_partitioner")
    @StepScope
    public PayIdRangePartitioner partitioner(
            @Value("#{jobParameters[requestDate]}") String requestDate) {

        return new PayIdRangePartitioner(payRepository,requestDate);
    }

    @Bean(BEAN_PREFIX + "step")
    public Step step1() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
                .<Pay, Pay>chunk(chunkSize)
                .reader(reader(null,null))
                .writer(writer())
                .build();
    }

    @Bean(BEAN_PREFIX + "reader")
    @StepScope
    public JpaPagingItemReader<Pay> reader(
        @Value("#{stepExecutionContext[minId]}") Long minId,
        @Value("#{stepExecutionContext[maxId]}") Long maxId
    ) {
        Map<String, Object> params = new HashMap<>();
        params.put("minId", minId);
        params.put("maxId", maxId);
        return new JpaPagingItemReaderBuilder<Pay>()
                .name(BEAN_PREFIX + "reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
                .parameterValues(params)
                .build();
    }

    @Bean(BEAN_PREFIX + "writer")
    public ItemWriter<Pay> writer() {
        return list -> {
            for(Pay pay : list) {
                dataShareBean.addData("totalAmount",pay.getAmount());
            }
        };
    }

    @Bean(BEAN_PREFIX + "step2")
    @JobScope
    public Step step2(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return stepBuilderFactory.get(BEAN_PREFIX + "step2")
                .tasklet((contribution, chunkContext) -> {
                TotalPay(dataShareBean.getTotal(),requestDate);
                    TotalPay totalPay = new TotalPay(dataShareBean.getData("totalAmount"),requestDate);

                    totalPayRepository.save(totalPay);

                    return RepeatStatus.FINISHED;
                }).build();
    }


}

4.1 코드별 소개

  • PartionHandler
    @Bean(name = JOB_NAME+"_partitionHandler")
    public TaskExecutorPartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
        partitionHandler.setStep(step1()); // (2)
        partitionHandler.setTaskExecutor(executor()); // (3)
        partitionHandler.setGridSize(poolSize); // (4)
        return partitionHandler;
    }
  1. TaskExecutorPartitionHandler 을 사용하여 멀티쓰레드로 활용할 수 있게 합니다.
  2. 사용할 step을 설정해줍니다.
  3. 멀티쓰레드를 실행하기 위해 executor를 설정합니다.
  4. gridSize와 멀티쓰레드를 같은 사이즈로 쓰기 위해 poolSize로 동일하게 사용합니다.

  • executor : TaskExcutor
    @Bean(name = JOB_NAME+"_taskPool")
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("partition-thread");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }
  1. partition을 멀티쓰레드로 돌릴수있게 poolsize등을 설정합니다.

  • StepManager()
    @Bean(name = JOB_NAME +"_step1Manager")
    public Step step1Manager() {
        return stepBuilderFactory.get("step1.manager") // (1)
                .partitioner("step1", partitioner(null)) // (2)
                .step(step1()) // (3)
                .partitionHandler(partitionHandler()) // (4)
                .build();
    }
  1. manager의 이름을 정합니다. 지금상황은 step1의 매니저이기때문에 "step1.manager"로 지정합니다.
  2. step1에 사용될 Partitioner 구현체를 등록합니다.
  3. 파니셔닝 될 step을 등록한다
  4. 파티셔닝을 관리한 handler를 등록합니다.

  • partitioner
    @Bean(name = JOB_NAME +"_partitioner")
    @StepScope
    public PayIdRangePartitioner partitioner(
            @Value("#{jobParameters[requestDate]}") String requestDate) {

        return new PayIdRangePartitioner(payRepository,requestDate); //(1)
    }
  1. 저는 요청날짜를 기반으로 그 날의 데이터들을 총합하는 배치를 하기 때문에 요청날짜를 넣어줍니다.

  • PayIdRangePartitioner
@Slf4j
@RequiredArgsConstructor
public class PayIdRangePartitioner implements Partitioner {

    private final PayRepository payRepository;
    private final String requestDate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Long max = payRepository.findMaxId(requestDate);
        Long min = payRepository.findMinId(requestDate);

        Long targetSize = (max - min) / gridSize + 1;
        log.info("max,min = {} {}", max,min);
        Map<String, ExecutionContext> result = new HashMap<>();

        long number = 0;
        long start = min;
        long end = start + targetSize - 1;

        while (start <= max) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + number, value);

            if (end >= max) {
                end = max;
            }

            value.putLong("minId", start);
            value.putLong("maxId", end);
            log.info("partion min max = {} {}",start,end);
            start += targetSize;
            end += targetSize;
            number++;
        }

        return result;
    }
}
  • 요청 날짜의 모든 Pay 의 max Id 와 Min Id를 가져 온뒤 gridSize(poolSize)에 맞게 쪼개어서 Map<String, ExecutionContext>을 반환합니다.
  • 생성된 ExecutionContext에 맞춰 Worker Step들이 생성되어 그들의 Step Executions이 됩니다.

  • itemReader
    @Bean(BEAN_PREFIX + "reader")
    @StepScope
    public JpaPagingItemReader<Pay> reader(
        @Value("#{stepExecutionContext[minId]}") Long minId,
        @Value("#{stepExecutionContext[maxId]}") Long maxId
    ) {
        Map<String, Object> params = new HashMap<>();
        params.put("minId", minId);
        params.put("maxId", maxId);
        return new JpaPagingItemReaderBuilder<Pay>()
                .name(BEAN_PREFIX + "reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
                .parameterValues(params)
                .build();
    }
  • itemReader에서는 위에서 생성해준 Step Executions에 있는 max id와 min id를 받아옵니다.

  • TotalJobListener
@RequiredArgsConstructor
@Slf4j
public class TotalJobListener extends JobExecutionListenerSupport {

    private final DataShareBean dataShareBean;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.STARTED) {
            log.info("start Job & initial dataShareBean.totalAmount");
            dataShareBean.putData("totalAmount",0L);
        }
    }
}
  • JobListener를 생성해 Job 시작전에 DataSourceBean의 map에 값을 초기화해줍니다.
  • 특정 배치잡에만 필요한 listener로 사용하기 위해서 따로 빈으로 등록하지 않고 configuration class에서 빈으로 등록합니다. 필요한 dataShareBean은 등록할때 DI 해줍니다.

5.결과

  • 테스트 한 결과입니다.
  • 데이터 10만개
    - single : 323053831084 ns (약 320초)
    • gridSize 5 : 8366703292 ns (약 8초)
  • 데이터 100만개
    - single : x ns (약 20분) -> 기록을 하지 않고 돌려서 약 20분 넘게 걸렸습니다.
    • gridSize 5 : 18277477625 ns (약 18초)
    • gridSize 20 : 12726939792 na (약 12초)
  • 데이터 백만개에서 엄청난 차이가 발생합니다.

6. 주의사항

  • gridSize, poolSize 를 10개 이상으로 가져갈때 주의사항입니다.
  • SpringBoot hikari pool의 max pool size는 default가 10입니다.
  • 설정 변경없이 10개이상으로 실행할시 sql 관련된 에러가 발생합니다.
  • 해결방법
    - properties 설정 변경
spring:
  datasource:
    hikari:
      maximumPoolSize : 20

7. 의문점

  1. step을 두개로 나누어서 TotalPay를 저장하는게 좋은 방법인가?
  2. 차라리 step을 한개로 두고 Listener의 afterJob에서 처리하는게 좋을지?
  • 좋은 방법이 생긴다면 수정하겠습니다.
profile
2년차 백엔드 개발자

0개의 댓글