[스프링 배치] Step 사용하기

조갱·2025년 3월 3일
0

스프링 배치

목록 보기
6/9

Step 아키텍쳐 톺아보기

사진으로 보는 아키텍처

StepBuilderFactory

  • StepBuilder 를 생성하기 위한 팩토리 클래스.
  • stepBuilderFactory.get(stepName) 을 통해 StepBuilder 를 생성한다.

StepBuilder

  • StepBuilder 는 직접적으로 Step 생성을 담당하지 않는다.
  • StepBuilder 는 다양한 메소드를 통해 하위 StepBuilder 를 생성하는 역할을 한다.
  • 실제 Step 생성은 하위 StepBuilder 에 위임한다.
  • TaskletStepBuilder (TaskletStep 생성)
    • 가장 기본적인 기능을 가진 Builder 클래스이다.
  • SimpleStepBuilder (TaskletStep 생성)
    • Tasklet 을 상속받는, chunk 기반의 작업을 수행하는 ChunkOrientedTasklet 을 생성한다.
  • PartitionStepBuilder (PartitionStep 생성)
    • 멀티스레드 방식으로 Job을 실행한다.
  • JobStepBuilder (JobStep 생성)
    • Step 안에서 새로운 Job 을 실행한다.
  • FlowStepBuilder (FlowStep 생성)
    • Step 안에서 Flow 를 실행한다.

StepBuilder 소스코드

public class StepBuilder extends StepBuilderHelper<StepBuilder> {
	// 생성자로 전달하는 name 이 stepName 이 된다.
	public StepBuilder(String name) {
		super(name);
	}

	// Tasklet 을 사용하여 TaskletStepBuilder 를 반환한다.
    // Tasklet : 작업을 수행하는 비즈니스 로직
	public TaskletStepBuilder tasklet(Tasklet tasklet) {
		return new TaskletStepBuilder(this).tasklet(tasklet);
	}

	// 몇개 단위로 잘라서 작업할건지를 의미하는 chunkSize 를 받아서 SimpleStepBuilder 를 반환한다.
    // Generic - I : ItemReader 에서 읽어들이는 데이터의 타입
    // Generic - O : ItemWriter 에서 쓰는 데이터의 타입.
    // chunk 기반의 ItemReader, ItemProcessor, ItemWriter 에 대해서는 나중에 별도 소개
	public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
		return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
	}

	// CompletionPolicy 인터페이스를 통해 chunk가 완료되는 시점을 커스텀하게 구현 가능하다.
    // 고정된 chunkSize 뿐만 아니라 다양한 조건을 설정할 수 있다.
	public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
		return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
	}

	// Partitioner 를 통해 파티션을 나누어 멀티스레드로 수행되는 PartitionStepBuilder 를 생성한다.
	public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
		return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
	}

	// 작업 분할이 필요 없거나, 병렬 처리 없이 단일 스레드에서 수행하는 경우 step 만 등록한다.
    // PartitionStep 의 구조를 유지하거나, 추후 병렬처리로 확장가능성이 있을 때 사용
	public PartitionStepBuilder partitioner(Step step) {
		return new PartitionStepBuilder(this).step(step);
	}

	// 별도의 Job 을 실행시키는 Step인 JobStepBuilder 를 반환한다.
	public JobStepBuilder job(Job job) {
		return new JobStepBuilder(this).job(job);
	}

	// Flow 를 실행시키는 Step 인 FlowStepBuilder 를 반환한다.
	public FlowStepBuilder flow(Flow flow) {
		return new FlowStepBuilder(this).flow(flow);
	}
}

TaskletStep 생성하기

가장 기본적으로 사용되며, Tasklet 타입의 구현체들을 제어하는 Step

tasklet(Tasklet) 을 사용하여 생성

@Bean
public Step taskletStep() {
	return stepBuilderFactory.get(“batchStep") // StepBuilder 를 반환
		.tasklet(Tasklet) // TaskletStepBuilder 를 반환
		.startLimit(10) // 해당 Step 이 실패한 경우 재실행 가능한 최대 횟수 정의 (df: Int.MAX)
		.allowStartIfComplete(true) // Step 의 성공/실패 여부와 상관 없이 재시작 가능하게 설정
		.listener(StepExecutionListener) // Step의 라이프사이클 내에서 이벤트 리스너가 필요한 경우 등록
		.build(); // TaskletStep 반환
}

가장 간단한 TaskletStep

@Bean
public Step taskletStep() {
	return stepBuilderFactory.get(“batchStep")
		.tasklet(Tasklet)
		.build();
}

SimpleStep 생성하기

데이터를 Chunk 단위로 쪼개서 처리할 때 사용하는 Step. 대용량 처리에 적합

chunk(ChunkSize) 를 사용하여 생성

@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
		.chunk<InType, OutType>(100) // SimpleStepBuilder 를 반환
		.reader(ItemReader) // ItemReader 설정, ItemReader<InType> 을 반환하는 Reader
        .processor(ItemProcessor) // ItemProcessor 설정, ItemProcessor<InType, OutType> 을 반환하는 Processor
        .writer(ItemWriter) // ItemWriter 설정, ItemWriter<OutType> 을 반환하는 Writer
        .stream(ItemStream()) // 재시작 데이터를 관리하는 콜백에 대한 스트림 등록
		.readerIsTransactionalQueue() // Item 이 JMS, Message Queue Server 와 같은 트랜잭션 외부에서 읽혀지고 캐시할 것인지 여부, 기본값은 false
		.listener(ChunkListener) // Chunk 프로세스가 진행되는 특정 시점에 콜백 제공받도록 ChunkListener 설정
        .build(); // ChunkOrientedTasklet 를 가지는 TaskletStep 반환

chunk(CompletionPolicy) 를 사용하여 생성

  • 종료 정책에 따라 종료 여부를 결정할 수 있도록 CompletionPolicy 를 설정할 수 있다.
  • 실행 횟수, 완료 시기, 오류 발생시 수행 할 작업에 대한 반복여부 결정
@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
		.chunk<InType, OutType>(CompletionPolicy) // SimpleStepBuilder 를 반환
		... // 나머지는 위에 chunk(ChunSize) 와 동일하여 별도 작성 안함.

가장 간단한 SimpleStep

@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
		.chunk<InType, OutType>(100)
		.reader(ItemReader)
        .writer(ItemWriter)
        .build();

reader 와 writer 는 반드시 있어야 한다. 없으면 에러가 발생한다.
SimpleStepBuilder 에서 tasklet 을 만드는 createTasklet 메소드를 보면 아래와 같다.

@Override
protected Tasklet createTasklet() {
	Assert.state(reader != null, "ItemReader must be provided");
	Assert.state(writer != null, "ItemWriter must be provided");
	...
}

PartitionStep 생성하기

Step의 동작을 멀티 스레드로 병렬 처리할 때 사용하는 Step.
PartitionStep 은 추후에 멀티 스레드 프로세싱으로 별도로 자세하게 다룬다.

partitioner(stepName, Partitioner) 를 사용하여 생성

  • partioner 에서 stetName 을 명시했는데 step 을 별도로 받는 이유?
    • stepName 은 SpringBean 을 찾기 위한 파라미터가 아니라, 내부적으로 Partition 이 사용할 StepExecution 의 이름으로 사용된다.
    • 별도로 받은 Step은 각 파티션이 사용할 Step을 지정하기 위해 사용된다. (필수!)
@Bean
public Step partitionStep() {
    return this.stepBuilderFactory.get("masterStep") // PartionStepBuilder 를 반환
        .partitioner("slaveStep", ColumnRangePartitioner()) // Partioner 지정
        .step(slaveStep()) // 각 파티션이 실행할 Step. TaslketStep, SimpleStep 등 가능
        .gridSize(3) // 몇 개의 파티션으로 나눌 것인지?
        .taskExecutor(ThreadPoolTaskExecutor()) // 스레드 풀 실행자 지정
        .build()
}

partitioner(Step) 를 사용하여 생성

  • partioner 를 받지 않는데, 어떻게 동작될까?
    • Spring Batch 내부의 StepExecutionSplitterTaskExecutorPartitionHandler 를 통해 자동 파티셔닝 된다.
    • 지정한 gridSize 에 맞게 StepExecution 을 자동으로 생성하고 ExecutionContext 를 할당하여 병렬로 실행된다.
@Bean
public Step partitionStep() {
    return this.stepBuilderFactory.get("masterStep")
        .partitioner(slaveStep()) // Partioner를 지정하며 PartionStepBuilder 를 반환
        .gridSize(3) // 몇 개의 파티션으로 나눌 것인지?
        .taskExecutor(ThreadPoolTaskExecutor()) // 스레드 풀 실행자 지정
        .build()
}

JobStep 생성하기

Step 내에서 Job을 실행할 때 사용하는 Step

job(Job) 를 사용하여 생성

@Bean
public Step jobStep() {
	return this.stepBuilderFactory.get("step")
		.job(job()) // JobStepBuilder 반환
		.launcher(jobLauncher) // Job 을 실행할 JobLauncher설정
		.parametersExtractor(jobParametersExtractor()) // Step의 ExecutionContext를 Job이 실행되는 데 필요한 JobParameters로 변환
		.build();

FlowStep 생성하기

Step 내에서 Flow 를 실행할 때 사용하는 Step

flow(Flow) 를 사용하여 생성

@Bean
public Step flowStep() {
	return this.stepBuilderFactory.get("step")
		.flow(myFlow()) // FlowStepBuilder 반환
		.build();
profile
A fast learner.

0개의 댓글