직접적으로 Step 생성을 담당하지 않는다.
public class StepBuilder extends StepBuilderHelper<StepBuilder> {
// 생성자로 전달하는 name 이 stepName 이 된다.
public StepBuilder(String name) {
super(name);
}
// Tasklet 을 사용하여 TaskletStepBuilder 를 반환한다.
// Tasklet : 작업을 수행하는 비즈니스 로직
public TaskletStepBuilder tasklet(Tasklet tasklet) {
return new TaskletStepBuilder(this).tasklet(tasklet);
}
// 몇개 단위로 잘라서 작업할건지를 의미하는 chunkSize 를 받아서 SimpleStepBuilder 를 반환한다.
// Generic - I : ItemReader 에서 읽어들이는 데이터의 타입
// Generic - O : ItemWriter 에서 쓰는 데이터의 타입.
// chunk 기반의 ItemReader, ItemProcessor, ItemWriter 에 대해서는 나중에 별도 소개
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
}
// CompletionPolicy 인터페이스를 통해 chunk가 완료되는 시점을 커스텀하게 구현 가능하다.
// 고정된 chunkSize 뿐만 아니라 다양한 조건을 설정할 수 있다.
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
}
// Partitioner 를 통해 파티션을 나누어 멀티스레드로 수행되는 PartitionStepBuilder 를 생성한다.
public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
}
// 작업 분할이 필요 없거나, 병렬 처리 없이 단일 스레드에서 수행하는 경우 step 만 등록한다.
// PartitionStep 의 구조를 유지하거나, 추후 병렬처리로 확장가능성이 있을 때 사용
public PartitionStepBuilder partitioner(Step step) {
return new PartitionStepBuilder(this).step(step);
}
// 별도의 Job 을 실행시키는 Step인 JobStepBuilder 를 반환한다.
public JobStepBuilder job(Job job) {
return new JobStepBuilder(this).job(job);
}
// Flow 를 실행시키는 Step 인 FlowStepBuilder 를 반환한다.
public FlowStepBuilder flow(Flow flow) {
return new FlowStepBuilder(this).flow(flow);
}
}
가장 기본적으로 사용되며, Tasklet 타입의 구현체들을 제어하는 Step
@Bean
public Step taskletStep() {
return stepBuilderFactory.get(“batchStep") // StepBuilder 를 반환
.tasklet(Tasklet) // TaskletStepBuilder 를 반환
.startLimit(10) // 해당 Step 이 실패한 경우 재실행 가능한 최대 횟수 정의 (df: Int.MAX)
.allowStartIfComplete(true) // Step 의 성공/실패 여부와 상관 없이 재시작 가능하게 설정
.listener(StepExecutionListener) // Step의 라이프사이클 내에서 이벤트 리스너가 필요한 경우 등록
.build(); // TaskletStep 반환
}
@Bean
public Step taskletStep() {
return stepBuilderFactory.get(“batchStep")
.tasklet(Tasklet)
.build();
}
데이터를 Chunk 단위로 쪼개서 처리할 때 사용하는 Step. 대용량 처리에 적합
@Bean
public Step simpleStep() {
return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
.chunk<InType, OutType>(100) // SimpleStepBuilder 를 반환
.reader(ItemReader) // ItemReader 설정, ItemReader<InType> 을 반환하는 Reader
.processor(ItemProcessor) // ItemProcessor 설정, ItemProcessor<InType, OutType> 을 반환하는 Processor
.writer(ItemWriter) // ItemWriter 설정, ItemWriter<OutType> 을 반환하는 Writer
.stream(ItemStream()) // 재시작 데이터를 관리하는 콜백에 대한 스트림 등록
.readerIsTransactionalQueue() // Item 이 JMS, Message Queue Server 와 같은 트랜잭션 외부에서 읽혀지고 캐시할 것인지 여부, 기본값은 false
.listener(ChunkListener) // Chunk 프로세스가 진행되는 특정 시점에 콜백 제공받도록 ChunkListener 설정
.build(); // ChunkOrientedTasklet 를 가지는 TaskletStep 반환
@Bean
public Step simpleStep() {
return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
.chunk<InType, OutType>(CompletionPolicy) // SimpleStepBuilder 를 반환
... // 나머지는 위에 chunk(ChunSize) 와 동일하여 별도 작성 안함.
@Bean
public Step simpleStep() {
return this.stepBuilderFactory.get("batchStep") // StepBuilder 를 반환
.chunk<InType, OutType>(100)
.reader(ItemReader)
.writer(ItemWriter)
.build();
reader 와 writer 는 반드시 있어야 한다. 없으면 에러가 발생한다.
SimpleStepBuilder 에서 tasklet 을 만드는 createTasklet 메소드를 보면 아래와 같다.
@Override
protected Tasklet createTasklet() {
Assert.state(reader != null, "ItemReader must be provided");
Assert.state(writer != null, "ItemWriter must be provided");
...
}
Step의 동작을 멀티 스레드로 병렬 처리할 때 사용하는 Step.
PartitionStep 은 추후에 멀티 스레드 프로세싱으로 별도로 자세하게 다룬다.
StepExecution 의 이름으로 사용
된다.@Bean
public Step partitionStep() {
return this.stepBuilderFactory.get("masterStep") // PartionStepBuilder 를 반환
.partitioner("slaveStep", ColumnRangePartitioner()) // Partioner 지정
.step(slaveStep()) // 각 파티션이 실행할 Step. TaslketStep, SimpleStep 등 가능
.gridSize(3) // 몇 개의 파티션으로 나눌 것인지?
.taskExecutor(ThreadPoolTaskExecutor()) // 스레드 풀 실행자 지정
.build()
}
StepExecutionSplitter
와 TaskExecutorPartitionHandler
를 통해 자동 파티셔닝 된다.StepExecution
을 자동으로 생성하고 ExecutionContext
를 할당하여 병렬로 실행된다.@Bean
public Step partitionStep() {
return this.stepBuilderFactory.get("masterStep")
.partitioner(slaveStep()) // Partioner를 지정하며 PartionStepBuilder 를 반환
.gridSize(3) // 몇 개의 파티션으로 나눌 것인지?
.taskExecutor(ThreadPoolTaskExecutor()) // 스레드 풀 실행자 지정
.build()
}
Step 내에서 Job을 실행할 때 사용하는 Step
@Bean
public Step jobStep() {
return this.stepBuilderFactory.get("step")
.job(job()) // JobStepBuilder 반환
.launcher(jobLauncher) // Job 을 실행할 JobLauncher설정
.parametersExtractor(jobParametersExtractor()) // Step의 ExecutionContext를 Job이 실행되는 데 필요한 JobParameters로 변환
.build();
Step 내에서 Flow 를 실행할 때 사용하는 Step
@Bean
public Step flowStep() {
return this.stepBuilderFactory.get("step")
.flow(myFlow()) // FlowStepBuilder 반환
.build();