Spring Batch 의 기본

JoJo Green·2023년 2월 26일
0
post-thumbnail

개념


  • Spring 에서 제공하는 Batch 프레임워크
  • Batch 란
    • 데이터를 실시간으로 처리하는 것이 아닌 특정 시점에 일괄적으로 한번에 처리하는 작업
    • 특징
      • 대량의 데이터를 다룬다.
      • 특정 시점에 작업을 실행한다.
      • 일괄적으로 처리한다.
  • Spring Batch 의 특징
    • 최적화와 파티셔닝 기술을 통해 대용량 데이터에 대한 고성능 Batch 작업이 가능하다.
    • Chunk 단위로 작업할 수 있고 작업이 실패했을 때 처음부터가 아닌 실패한 Chunk 부터 재실행한다.
    • 중복 실행을 막기 위해 동일한 파라미터로 성공한 이력이 있는 Batch 는 실행 시 예외를 발생시킨다.
    • Spring Batch 에서는 Batch Job 들을 관리하지만 Job 실행시키는 기능을 지원하진 않는다.
      • Scheduler, Quartz, Jenkins 등을 이용해서 Batch Job 을 실행시켜야한다.

구조 및 용어


용어

Job

  • Batch 작업의 최상위 개념
  • Batch 작업의 전체 과정을 하나의 단위로 만들어 놓은 객체
  • 일반적인 Batch 작업이라고 하면 Job 을 의미한다.

JobInstance

  • Job 의 실행 단위
  • Job 을 실행시킬때 마다 JobInstance 가 생성된다.
  • JobParameters 로 JobInstance 를 구분할 수 있다.
    • 특정 Job 을 13시에 실행하면 파라미터에서 실행 시간이 13시인 JobInstance 가 생성되고 14시에 실행하면 실행 시간 파라미터가 14시인 JobInstance 가 생성된다.

JobParameters

  • JobInstance 를 구분하는 기준
  • Job 내부에서 JobParameters 의 값을 동적으로 사용할 수 있다.
    • 주로 실행 시점에 대한 정보를 JobParameters 로 부터 받아 Step 등에서 많이 사용한다.
  • String, Double, Long, Date 4가지 타입만 지원한다.

JobExecution

  • JobInstance 의 실행 시도에 대한 정보를 가진 객체
    • 실행 상태, 시작 시간, 종료시 간, 생성 시간 등
  • 특정 JobInstance 가 작업을 실패하여 재실행하는 경우 동일한 JobInstance 에 대한 작업이지만 JobExecution 은 실패에 대한 객체, 재시도에 대한 객체 각각 전부 생성된다.

Step

  • Job 의 작업을 정의하고 순차적인 단계를 캡슐화한 객체
  • 실질적인 Batch 작업에 대해 정의하고 제어한다.
  • Job 은 최소 1개 이상의 Step 으로 이루어져있다.

StepExecution

  • Step 의 실행 시도에 대한 정보를 가진 객체
    • read 수, write 수, commit 수, skip 수 등
  • JobExecution 과 동일하게 실제 시작이 될 때만 생성된다.

ExecutionContext

  • JobExecutionContext, StepExecutionContext 가 존재한다.
  • Job 내부에서 각 작업 단계들이 공유할 수 있는 데이터를 가진 객체이다.
    • Step 간 Data 공유가 가능
  • JobExecutionContext 는 commit 시점에 저장된다.
  • StepExecutionContext 는 Step 의 실행 사이사이에 저장된다.
  • Job 실패시 ExecutionContext 의 정보를 통해 마지막 실행 값을 재구성할 수 있다.

JobLauncher

  • Job 과 JobParameters 로 Job 을 실행시키는 객체

JobRepository

  • 모든 Batch 처리 관련 정보를 담고 있는 저장소이자 매커니즘
  • Job 이 실행되면 JobRepository 에 JobInstance, Excution, ExecutionContext 정보가 생성되고 필요한경우 조회해서 사용할 수 있다.

Tasklet Step 과 Chunk Step


  • Spring Batch 의 Step 은 Taklet 기반 Step 과 Chunk 기반 Step 두 종류가 있다.

Tasklet Step

  • Tasklet 로 구성된 Step
  • 하나의 트랜잭션으로 작업을 모두 처리한다.
  • 단순한 작업을 처리하는 Step 에서 사용한다.

Tasklet

  • 하나의 처리 메서드를 가진 함수형 인터페이스

Chunk Step

  • Chunk 단위로 작업을 처리하는 Step
  • 100개에 데이터에 대해서 Chunk Size 를 10으로 작업한다면 작업을 10번으로 나눠서 처리한다.
    • 10번의 Transaction 으로 작업을 처리

Chunk

  • Spring Batch 에서 한번의 Transaction 으로 처리되는 아이템의 수를 의미한다.
  • Batch 작업 중 실패가 발생했을 때 Chunk 단위로 롤백하고 해당 Chunk 에서 부터 재시작한다.

Chunk Step 의 구조

  • Chunk Step 은 3가지로 구성되어 있다.
  • ItemReader
    • Batch 작업에 사용할 Item 을 읽어오는 인터페이스
    • 상황에 맞는 다양한 구현체가 존재한다.
      • Chunk 단위로 작업하기 위해서는 기본적으로 페이징해서 Item 을 읽어와야 하기 때문에 JdbcPagingItemReader<T> 같은 ItemReader 를 사용한다.
  • ItemProcessor
    • Reader 에서 읽어온 Item 의 데이터로 작업을 처리하는 인터페이스
    • 대표적으로 ItemProcessor<I, O> 같은 인터페이스로 I 타입의 객체를 받아 O 타입의 객체로 반환 처리한다.
  • ItemWriter
    • Reader 로 조회했거나 Processor 로 처리한 데이터로 Write 작업을 하는 인터페이스
    • 처리 결과물에 따라 Database 에 CUD 를 할 수 도 있고 Message Queue 에 Message 를 전송할 수도 있다.
    • Reader 처럼 상황에 맞는 다양한 구현체가 존재한다.
      • Database 에 CUD 하기 위해서는 일반적으로 JdbcBatchItemWriterBuilder<T> 같은 ItemWriter 를 사용한다.
    • Chunk Size 단위로 작업이 처리된다.

Paging Size 와 Chunk Size

  • 대용량 Batch 작업에서 Chunk 단위로 작업하기 위해서는 Item 을 읽어올 때 Paging 처리해서 읽어와야한다.
  • 그래서 PagingItemReader 설정시 Page Size 와 Chunk Size 값을 설정해준다.
    • Page Size 를 설정하면 알아서 offset, limit 를 지정해서 순차적으로 조회한다.
  • 이때 Paging Size 가 10이고 Chunk Size 가 20일 경우 2번의 읽기 뒤에 Transaction 작업이 수행된다.
    • 한번의 Transaction 을 수행하기 위해서 2번의 읽기 쿼리가 발생되는 상황
  • 그래서 Paging Size 와 Chunk Size 는 일치시켜야 가장 좋은 Batch 성능을 낼 수 있다.
    • 특별한 이유가 없다면 동일하게 설정한다.

PagingItemReader 사용 시 주의사항

  • Paging Size 만큼 데이터를 조회해 오는데 순서가 보장되어 있어야 하기 때문에 반드시 Order By 로 데이터를 정렬해야 한다.

사용 방법


의존성

implementation("org.springframework.boot:spring-boot-starter-batch")

기본 설정

  • @EnableBatchProcessing 을 선언해야 한다.
    @EnableBatchProcessing
    @SpringBootApplication
    class BatchApplication
    
    fun main(args: Array<String>) {
        runApplication<BatchApplication>(*args)
    }

Tasklet Step

/**
 * Tasklet Step 으로 Job 구성하기
 * */
@Configuration
class TaskletStepJobConfig(
    val jobBuilderFactory: JobBuilderFactory,
    val stepBuilderFactory: StepBuilderFactory
) {

    @Bean
    fun job(): Job {
        return jobBuilderFactory["taskletStepJob"]
            .start(step1())
            .next(step2())
            .build()
    }

    @Bean
    fun step1(): Step {
        return stepBuilderFactory["step1"]
            .tasklet { contribution, chunkContext ->
                println(">> This is tasklet step1 <<")
                RepeatStatus.FINISHED
            }
            .build()
    }

    @Bean
    fun step2(): Step {
        return stepBuilderFactory["step2"]
            .tasklet { contribution, chunkContext ->
                println(">> This is tasklet step2 <<")
                RepeatStatus.FINISHED
            }
            .build()
    }
}

Chunk Step

/**
 * Chunk Step 으로 Job 구성하기
 * -> Person 의 age 가 20 미만이면 isStudent 가 True, 아니면 False 로 업데이트 해주는 Batch Job
 * */
@Configuration
class JdbcChunkStepJobConfig(
    val jobBuilderFactory: JobBuilderFactory,
    val stepBuilderFactory: StepBuilderFactory,
    val dataSource: DataSource
) {
    companion object {
        const val JOB_NAME = "JdbcChunkStepJob"
        const val CHUNK_SIZE = 10;
    }

    @Bean(JOB_NAME)
    fun job(): Job {
        return jobBuilderFactory[JOB_NAME]
            .start(step())
            .build()
    }

    @Bean(JOB_NAME.plus("_STEP"))
    fun step(): Step {
        return stepBuilderFactory[JOB_NAME.plus("_STEP")]
            .chunk<PersonAgeInfo, PersonIsStudentInfo>(CHUNK_SIZE)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build()
    }

    @Bean(JOB_NAME.plus("_READER"))
    fun reader(): JdbcPagingItemReader<PersonAgeInfo> {
        println("BATCH-LOG >>> reader : ${LocalDateTime.now()} <<<")
        return JdbcPagingItemReaderBuilder<PersonAgeInfo>()
            .name(JOB_NAME.plus("_READER"))
            .pageSize(CHUNK_SIZE)
            .fetchSize(CHUNK_SIZE)
            .dataSource(dataSource)
            .queryProvider(query())
            .rowMapper(resultMapper())
            .build()
    }

    @Bean(JOB_NAME.plus("_PROCESSOR"))
    fun processor(): ItemProcessor<PersonAgeInfo, PersonIsStudentInfo> {

        println("BATCH-LOG >>> processor : ${LocalDateTime.now()} <<<")

        return ItemProcessor {
            if (it.age < 20) {
                val personIsStudentInfo = PersonIsStudentInfo(it.id, true)
                println("BATCH-LOG >>> processor - true : $it | $personIsStudentInfo <<<")
                return@ItemProcessor personIsStudentInfo
            } else {
                val personIsStudentInfo = PersonIsStudentInfo(it.id, false)
                println("BATCH-LOG >>> processor - false : $it | $personIsStudentInfo <<<")
                return@ItemProcessor personIsStudentInfo
            }
        }
    }

    @Bean(JOB_NAME.plus("_WRITER"))
    fun writer(): JdbcBatchItemWriter<PersonIsStudentInfo> {

        println("BATCH-LOG >>> writer : ${LocalDateTime.now()} <<<")

        return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
            .dataSource(dataSource)
            .sql("update person set is_student = :studentFlag where person_no = :id")
            .beanMapped()
            .build()
    }

    private fun query(): PagingQueryProvider {
        val spqpfb = SqlPagingQueryProviderFactoryBean()
        spqpfb.setDataSource(dataSource)
        spqpfb.setSelectClause("select person_no, age ")
        spqpfb.setFromClause("from person ")
        spqpfb.setWhereClause("where is_student is null ")
        spqpfb.setSortKeys(mapOf("person_no" to Order.ASCENDING))
        return spqpfb.`object`
    }

    private fun resultMapper() = RowMapper { rs, _ ->
        PersonAgeInfo(
            id = rs.getLong("person_no"),
            age = rs.getInt("age")
        )
    }
}

data class PersonAgeInfo(val id: Long, val age: Int)

data class PersonIsStudentInfo(val id: Long, val studentFlag: Boolean)

@Entity
class Person(
    val name: String,
    val age: Int,
    var isStudent: Boolean?
) {
    @Id
    @Column(name = "person_no")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0L
}

@JopScope, @StepScope

...
		@StepScope
    @Bean("${JOB_NAME}_reader")
    fun reader(
        @Value("#{jobParameters[requestAt]}") requestAt: String?
    ): JdbcPagingItemReader<MemberInfo> {
        println("requestAt >>> $requestAt")
        val format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val requestDateTime = LocalDateTime
            .parse(requestAt, format)
            .atZone(ZoneId.of("Asia/Seoul"))
            .withZoneSameInstant(ZoneId.systemDefault())
        val parameterValues = mapOf("requestAt" to Timestamp.from(requestDateTime.toInstant()))

        return JdbcPagingItemReaderBuilder<MemberInfo>()
            .name("${JOB_NAME}_reader")
            .pageSize(chunkSize)
            .fetchSize(chunkSize)
            .dataSource(dataSource)
            .queryProvider(query())
            .parameterValues(parameterValues)
            .rowMapper(resultMapper())
            .build()
    }
...
  • @JopScope 는 Step 빈을 선언하는 메서드에 선언 가능
  • @StepScope 는 Step 을 구성하는 빈들을 선언하는 메서드에 선언 가능
    • ItemReader, ItemProcessor, ItemWriter
  • @JopScope, @StepScope 를 선언한 빈은 어플리케이션 실행시점이 아닌 사용 시점에 빈을 구성하고 생성된다.
  • Batch 가 실행되는 시점에 생성되는 JobParameters 들을 빈에 LateBinding 할 수 있다.
    • Batch 실행 시간같은 값을 JobParameters 로 사용하려면 LateBinding 해줘야만 한다.
  • Batch 빈 생성에 JobParameters 를 사용하기 위해서는 반드시 선언해줘야하는 Annotation 이다.
  • JobParameters 를 동적으로 할당받아서 Job 에 사용하기 위한 방법이다.

CompositItemWriter

...
    @Bean(JOB_NAME.plus("_COMPOSITE_WRITER"))
    fun compositeWriter(): CompositeItemWriter<PersonIsStudentInfo> {
        return CompositeItemWriterBuilder<PersonIsStudentInfo>()
            .delegates(writer1(), writer2())
            .build()
    }

    @Bean(JOB_NAME.plus("_WRITER1"))
    fun writer1(): JdbcBatchItemWriter<PersonIsStudentInfo> {

        println("BATCH-LOG >>> writer1 : ${LocalDateTime.now()} <<<")

        return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
            .dataSource(dataSource)
            .sql("update person set is_student = :studentFlag where person_no = :id")
            .beanMapped()
            .build()
    }

    @Bean(JOB_NAME.plus("_WRITER2"))
    fun writer2(): JdbcBatchItemWriter<PersonIsStudentInfo> {

        println("BATCH-LOG >>> writer2 : ${LocalDateTime.now()} <<<")

        return JdbcBatchItemWriterBuilder<PersonIsStudentInfo>()
            .dataSource(dataSource)
            .sql("update person set name = 'DONE!!!!' where person_no = :id")
            .beanMapped()
            .build()
    }
...
  • 하나의 Step 에서 여러개의 ItemWriter 들을 사용해야 할 때 사용하는 Writer
  • CompositeItemWriter<T> 빈을 등록하는 메서드를 만들고 구성할 때 .delegates() 메서드를 사용하여 하위 Writer 객체들을 등록하면 사용할 수 있다.

Batch Job 을 실행시키는 방법

  • 기본적으로 어플리케이션을 실행시키면 해당 프로젝트의 Batch Job 들이 실행된다.
    • 어플리케이션 실행 시 Batch 자동 실행 프로퍼티 옵션
      • spring.batch.job.enabled: true
  • 단순하게 어플리케이션 실행으로 Job 을 실행시키면 JobParameters 설정 없이 실행시킨것이기 때문에 한번만 실행되고 그 다음부터는 실행되지 않는다.
    • JobInstance 가 생성되지 않기 때문에
  • JobLauncher 를 사용하면 JobParameters 를 설정해주면서 Job 을 실행시킬 수 있다.
    @Configuration
    @Slf4j
    public class JobScheduler {
    
    	@Autowired
    	private JobLauncher jobLauncher;
    
    	@Autowired
    	private Job ExampleJob;
    
    	@Scheduled(cron = "1 * * * * *")
    	public void jobSchduled() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
    			JobRestartException, JobInstanceAlreadyCompleteException {
    
    		Map<String, JobParameter> jobParametersMap = new HashMap<>();
    		
    		SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
    		Date time = new Date();
    
    		String time1 = format1.format(time);
    
    		jobParametersMap.put("date",new JobParameter(time1));
    
    		JobParameters parameters = new JobParameters(jobParametersMap);
    
    		JobExecution jobExecution = jobLauncher.run(ExampleJob, parameters);
    
    		while (jobExecution.isRunning()) {
    			log.info("...");
    		}
    
    		log.info("Job Execution: " + jobExecution.getStatus());
    		log.info("Job getJobConfigurationName: " + jobExecution.getJobConfigurationName());
    		log.info("Job getJobId: " + jobExecution.getJobId());
    		log.info("Job getExitStatus: " + jobExecution.getExitStatus());
    		log.info("Job getJobInstance: " + jobExecution.getJobInstance());
    		log.info("Job getStepExecutions: " + jobExecution.getStepExecutions());
    		log.info("Job getLastUpdated: " + jobExecution.getLastUpdated());
    		log.info("Job getFailureExceptions: " + jobExecution.getFailureExceptions());
    		
    	}
    }
  • Batch Job 을 일정주기로 실행시켜야 한다면 Spring Scheduler, Quartz, Jenkins 등을 활용해야 한다.
  • 로컬에서는 실행 환경 설정으로 Program arguments 와 Environment variables 를 세팅해서 특정 Batch 를 특정 JobParamters 로 실행시킬 수 있다.

참고 자료


Spring Batch란? 이해하고 사용하기(예제소스 포함)

Spring Batch (1) 개요

[Kotlin] Spring Batch를 사용한 배치 애플리케이션 작성

5. Spring Batch 가이드 - Spring Batch Scope & Job Parameter

JobParameter 활용 방법 (feat. LocalDate 파라미터 사용하기)

https://velog.io/@gongmeda/CompositeItemWriter를-사용하여-하나의-Step에-여러개의-ItemWriter-등록하기

0개의 댓글