๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ฑฐ๋ ๋ฐ๋ณต์ ์ธ ์์ ์ ์คํํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์คํ๋ง ๋ฐฐ์น์ ๋ํด์ ์์๋ณด์.
์๋์ ์ฑ ์ ์ฐธ๊ณ ํ์๋ค.
๐ก ์คํ๋ง ๋ฐฐ์น ์๋ฒฝ ๊ฐ์ด๋ 2/e
Spring Batch๋ ๋๋์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ ๊ฒฝ๋ํ๋ ํ๋ ์์ํฌ๋ก, ๋ฐ๋ณต์ ์ธ ์์ ์ ์ํํ๋ ์ผ๊ด ์ฒ๋ฆฌ(Batch Processing) ์์ ์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ ์ฃผ๊ธฐ์ ์ธ ์ ๋ฌด ์ฒ๋ฆฌ ๋ฑ์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๊ณ , ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ์ ํฉํ ๋ถ์ฐ ๋ฐฉ์์ ์ฒ๋ฆฌ๋ฅผ ์ง์ํ๋ค.
Spring Batch๋ ๋ง์ ์์ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ผ๋ฉฐ, ๋ค์ํ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ํธ๋์ญ์ ๊ด๋ฆฌ, ์ฌ์๋ ๊ธฐ๋ฅ ๋ฑ์ด ์ด์ ํด๋น๋๋ค.
Spring Batch๋ ๋ฐฉ๋ํ ์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค. ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ๋ถ์ฐ ์ฒ๋ฆฌํ ์ ์์ด์, ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ์ ํฉํ๋ค.
Spring Batch๋ ํธ๋์ญ์ ๊ด๋ฆฌ๋ฅผ ์ง์ํ๋ค. ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ค ์คํจํ ์์ ์ ๋กค๋ฐฑํ์ฌ ๋ฐ์ดํฐ ์ผ๊ด์ฑ์ ์ ์งํ ์ ์๋ค.
Spring Batch๋ ์์ ์ค ์คํจํ ๊ฒฝ์ฐ, ์์ ์ ์ฌ์๋ํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ๋ํ, ์ฌ์๋ ํ์๋ฅผ ์ค์ ํ ์ ์๋ค.
Spring Batch๋ ๊ทธ ์์ฒด๋ง์ผ๋ก Job์ ์คํํ ์ ์๋ค.
Configuration Bean์ ํตํด ํด๋น Job์ด ์คํ๋ ๊ฒฝ์ฐ์ ์ด๋ค ์ผ์ ์ฒ๋ฆฌํ ์ง ์ธํ ํ ๋ฟ ์ด๋ฅผ ์คํ ์์ผ์ฃผ๊ธฐ ์ํด์ ยBatch Scheduler
๊ฐ ํ์ํ๋ค.
JobRepository
๋ ๋ฐฐ์น๋ฅผ ์ํํ๋๋ฐ ํ์ํ ์ ๋ฐ์ ์ธ Object๋ฅผ ๋ชจ๋ ํฌํจํ๊ณ ์๋ค. (Job, JobLauncher, Step, ๋ฑ)
๊ทธ๋ฆฌ๊ณ ์ด๋ฅผ ํตํดย ๋ฐฐ์น ์ํ๊ณผ ๊ด๋ จ๋ ์์น ๋ฐ์ดํฐ์ Job์ Status๋ฅผ ์ ์ง/๊ด๋ฆฌ
ํ๋ฉฐย ๋ฐฐ์น์ ๊ด๋ จ๋ CRUD ์ฒ๋ฆฌ๋ฅผ ํ๋ ์ญํ
์ ์ํํ๋ค.
Job
์ย ๋ฐฐ์น ์ฒ๋ฆฌ ๊ณผ์ ์ ํ๋์ ๋จ์๋ก ๋ง๋ค์ด ๋์ ๊ฐ์ฒด์ด๋ค.
๋ฐฐ์น์ฒ๋ฆฌ ๊ณผ์ ์ ์์ด ์ต์๋จ Object๋ผ๊ณ ๋ณผ ์ ์๋ค.
JobLauncher
๋ ๋ง๊ทธ๋๋กย Job์ ์คํ์ํค๋ ์ญํ ์ ๋ด๋นํ๋ค.
Job & JobParameters๋ฅผ param์ผ๋ก ๋ฐ๊ณ ๋ฐฐ์น ์ํ ํ, JobExecution์ ๋ฐํํ๋ค.
Spring Batch์์๋ JobLauncherApplicationRunner ํด๋์ค๊ฐ ์๋์ผ๋ก JobLauncher๋ฅผ ์คํํ๊ธฐ ๋๋ฌธ์ย ์ฐ๋ฆฌ๊ฐ ์ง์ ์ปจํธ๋กคํ ์ผ์ ์๋ค๊ณ ๋ณผ ์ ์๋ค.
Step
์ Job์ ๊ตฌ์ฑํ๋ ๋
๋ฆฝ์ ์ธ ์์
๋จ์์ด๋ค. ์ค์ ๋ฐฐ์น๊ฐ ์คํ๋๋ ์ฒ๋ฆฌ๋ฅผ ์ ์ํ๊ณ ์ ์ดํ๋๋ฐ ํ์ํ ๋ชจ๋ ์ ๋ณด๋ฅผ ๊ฐ์ง๊ณ ์๋ Object๋ผ๊ณ ๋ณผ ์ ์๋ค.
Step์ย Tasklet
ย /ย Chunk
ย ๊ธฐ๋ฐ์ผ๋ก ์ํ๋๋ฉฐ ์ด๋ ์ ์ ๊ฐ ์ ํํด์ ์ฌ์ฉํ๊ฒ ๋๋ค.
Tasklet vs Chunk
Tasklet: ํ ๊ฐ์ง ์ด์์ CRUD๊ฐ ๋ฐ์(=๋น์ฆ๋์ค ๋ก์ง)ํ๋ task์ ๋ํด ์ผ๊ด์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ, ์ฑํํ๋ค. (๋ณต์กํ ๋ก์ง์ ์ํํด์ผ ํ๋ job์ผ ๊ฒฝ์ฐ, ์ฑํ)
Chunk: chunk ๋จ์๋ก ์ฒ๋ฆฌํ ๋ชจ๋ record๋ฅผ ์ญ ์ฝ์ด๋ค์ธ ํ, ๋ชจ๋ ์ฝ์ด๋ค์ด๋๋ฐ ์ฑ๊ณตํ๋ฉด ํ๋ฒ์ Writeํ๋ ๋ฐฉ์ (๋์ฉ๋ ๋ฐ์ดํฐ์ ๋ํด ๋จ์ ์ฒ๋ฆฌํ ๊ฒฝ์ฐ, ์ฑํ)
ItemReader
๋ ๋ง ๊ทธ๋๋ก ๋ฐฐ์น๋ฅผ ์ํํ๋๋ฐ ์์ด ๋์์ด ๋๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์ด๋๋ฐ ์ฌ์ฉ๋๋ Object์ด๋ค. ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์ด๋ ๋ฐฉ๋ฒ์๋ DB Connection์ ํตํด ๋ถ๋ฌ์ฌ ์๋ ์์ง๋ง File I/O๋ฅผ ํตํด ๋ถ๋ฌ์ค๊ธฐ๋ ํ๋ค.
ItemProcessor
๋ Read์ Write์ ์ค๊ฐ ๋จ๊ณ์์ย ๊ฐ๊ณต(์ฒ๋ฆฌ) ์ญํ ์ ์ํํ๋ค.
๊ทธ๋ฆฌ๊ณ ย ์ด ๋จ๊ณ๋ ํ์๊ฐ ์๋๊ธฐ๋ ํ๋ค.ย Read์ Write๋ง์ผ๋ก ์ํ๋ ๊ธฐ๋ฅ์ ์ํํ ์ ์๋ค๋ฉด ItemProcessor๋ ๊ณผ๊ฐํ Skipํด๋ ๋๋ค.
ItemWriter
๋ Batch์ ๋ง์ง๋ง ๋จ๊ณ์ด๋ค. ๋ง ๊ทธ๋๋ก ์ฐ๋ฆฌ๊ฐ ์ฒ๋ฆฌํ๊ณ ์ ํ๋ ๋ฐ์ดํฐ์ ๋ํด ์ต์ข
์ ์ผ๋ก ๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ฅผ ์ถ๋ ฅํ๋ ์ญํ ์ ์ํํ๋ค.
์ฐ๋ฆฌ๋ ItemWriter๋ฅผ ํตํด ๋ค์ย DB์ ํด๋น ๋ด์ฉ์ ์ ์ฅ
ํ ์๋ ์๊ณ ํด๋น ๋ฐ์ดํฐ๋ฅผย ๋ค๋ฅธ ํ๋ก์ ํธ๋ก API ํธ์ถ
์ํฌ ์๋,ย Kafka๋ฅผ ํตํด msg ๋ฐํ
์ํฌ ์๋ ์๋ค.
๊ทธ๋ผ ๊ฐ๋จํ Spring Batch์ ์ฉ์ด ์ ๋ฆฌ๋ ํด๋ดค๊ฒ ๋ค ์ด์ ์์ค์ฝ๋๋ฅผ ๋ง๋ค์ด๋ณด๊ธฐ๋ก ํ์.
์ฐ์ , ๋๋ย Spring Initializr
(https://start.spring.io)๋ฅผ ํตํด SpringBoot ํ๋ก์ ํธ๋ฅผ ํ๋ ๋ง๋ค์์ผ๋ฉฐ ๊ธฐ๋ณธ Spec์ ์๋์ ๊ฐ๋ค.
@SpringBootApplication
@EnableBatchProcessing
public class JobPracticeApplication {
public static void main(String[] args) {
SpringApplication.run(JobPracticeApplication.class, args);
}
}
ํ๋ก์ ํธ ์์ฑ ์, ์ต์ด ๋ฉ์ธ Application ํด๋์ค์ ์์ ๊ฐ์ดย @EnableBatchProcessing
ย ์ ๋
ธํ
์ด์
์ ์ถ๊ฐํด์ค๋ค.
@EnableBatchProcessing
: ๋ฐฐ์น ๊ธฐ๋ฅ ํ์ฑํ
ํน์ Job์ ์ํํ๊ธฐ ์ํ ํด๋์ค ํ์ผ์ ์์ฑํ๋๋ก ํ์.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyFirstJobConfiguration {
...
}
Spring Batch 5.0 ๋ถํฐย JobBuilderFactory
๋ deprecated๋์๋ค.
https://docs.spring.io/spring-batch/docs/current/api/deprecated-list.html
์ด๋ก ์ธํด ๊ธฐ์กด์ ํ์ฉํ๋ ๋ฐฉ์์ ์ฌ์ฉํ๊ธฐ ์ด๋ ต๊ฒ ๋์๋ค.
๊ทธ๋์ย JobRepository
๋ฅผ ํ์ฉํ๋ ๋ฐฉ์์ผ๋ก ๋ฐ๊ฟ์ค ํ์๊ฐ ์๋ค.
(AS-IS)
...
@Bean
public Job myFirstJob(Step step) {
return this.jobBuilderFactory.get("myFirstJob")
.start(myFirstStep)
.build();
}
...
(TO-BE)
...
@Bean
public Job myFirstJob(JobRepository jobRepository){
return new JobBuilder("myFirstJob", jobRepository)
.start(myFirstStep(jobRepository))
.build();
}
...
์์ ์, job์ย ๋จ์ผ Step์ผ๋ก ๊ตฌ์ฑํ๋๋ก ํ๊ฒ ๋ค.
์ด์ Job์ ํฌํจ๋๋ Step์ ์์ฑํด์ผ ํ๋ค.
StepBuilderFactory
ย ์ญ์ Spring Batch 5.0์ด์๋ถํฐ deprecated๋์ด ๋ค๋ฅธ ๋ฐฉ์์ผ๋ก ์ฒ๋ฆฌํด์ผ ํ๋ค.
(AS-IS)
...
public Step myFirstStep() {
return stepBuilderFactory.get("myFirstStep")
.<String, String>chunk(1000)
.reader(itemReader())
.writer(itemWriter())
.build();
}
...
(TO-BE)
...
@Bean
public Step myFirstStep(JobRepository jobRepository){
return new StepBuilder("myFirstStep",jobRepository)
.<String, String>chunk(1000,transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
...
Step์ chunk ๊ธฐ๋ฐ์ผ๋ก ๋์ํ๋๋ก ๊ตฌ์ฑํ์๊ณ ํ๋ฒ์ ๋ค์ด์ฌ ์ ์๋ chunk size๋ 1000์ผ๋ก ์ค์ ํ๋ค.
๋ค์์ผ๋ก Batch๋ฅผ ํตํด ์ฒ๋ฆฌํ๊ณ ์ ํ๋ ๋ฐ์ดํฐ๋ฅผ Readํ๋ ItemReader ๋ถ๋ถ์ ๋ง๋ค์ด๋ณด์.
...
@Bean
public ItemReader<String> itemReader(){
return new ItemReader<String>() {
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return "Read OK";
}
};
}
...
์์ ๊ฐ์ด ๊ตฌ์ฑํ ์ ์๊ณ ์ค์ง์ ์ผ๋กย DB ํต์
ย /ย File I/O
๋ฅผ ํตํด ์ฒ๋ฆฌํ ๊ฒฝ์ฐ, ์ด ๋ถ๋ถ์ ๋ก์ง์ ๊ตฌํํ๋ฉด ๋๋ค.
(์๋๋ MyBatis๋ฅผ ํ์ฉํ๋ ๊ฒฝ์ฐ, ์์์ด๋ค.)
...
public MyBatisCursorItemReader<Object> itemReader() {
String strQueryId = "์ฟผ๋ฆฌ ๊ฒฝ๋ก ๊ธฐ์ฌ";
return new MyBatisCursorItemReaderBuilder<Object>()
.sqlSessionFactory(sqlSessionFactory)
.queryId(strQueryId)
.parameterValues(parameterValues)
.saveState(false)
.build();
}
...
๋ง์ง๋ง์ผ๋ก ๋ฐฐ์น๋ฅผ ํตํด ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ์ ๋ํ ์ถ๋ ฅ์ ๋ด๋นํ๋ ItemWriter ๋ถ๋ถ์ด๋ค.
...
@Bean
public ItemWriter<String> itemWriter(){
return strList -> {
strList.forEach(
str -> log.info("str: {}", str)
);
};
}
...
ItemReader์์ ์ฒ๋ฆฌํ ๋ฐ์ดํฐ๊ฐ String Listํํ๋ก ๋ด๊ฒจ ๋ค์ด์ค๋ฉด ํด๋น str๋ฅผ ๋ก๊ทธ ์ถ๋ ฅํ๋ ํํ๋ก ๊ฐ๋จํ ๊ตฌ์ฑํด๋ณด์๋ค.
์ค์ง์ ์ผ๋ก ํ์ฉํ๊ฒ ๋๋ค๋ฉด Writer ๋ถ๋ถ์์ ํด๋น ๋ฐ์ดํฐ๊ฐ ํ์ํ ๊ณณ์ผ๋ก API ์ ์ก์ ํ ์๋ ์๊ณ DB์ ๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ์๋ ์๊ฒ ๋ค.
๐กย JobInstance๋ ํ ๋ฒ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋๋ฉด ๋ค์ ์คํ์ํฌ ์ ์๋ค. JobInstance๋ ์ก ์ด๋ฆ๊ณผ ์ ๋ฌ๋ ์๋ณ ํ๋ผ๋ฏธํฐ๋ก ์๋ณ๋๋ฏ๋ก, ๋์ผํ ์๋ณ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฌ์ฉํ๋ ์ก์ ํ ๋ฒ๋ง ์คํํ ์ ์๋ค.
๋ ๋ง์ ๋ด์ฉ์ ๊นํ๋ธ๋ฅผ ์ฐธ๊ณ ํ์.
github : https://github.com/Jungmin-Dev/SpringBatch
๐ก๊ธฐ๋ณธ์ ์ธ ์ฒญํฌ ๊ตฌ์ฑ@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkBasedJob() {
return this.jobBuilderFactory.get("chunkBasedJob")
.start(chunkStep())
.build();
}
@Bean
public Step chunkStep() {
return this.stepBuilderFactory.get("chunkStep")
.<String, String>chunk(1000)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ListItemReader<String> itemReader() {
List<String> items = new ArrayList<>(100000);
for (int i = 0; i < 100000; i++) {
items.add(UUID.randomUUID().toString());
}
return new ListItemReader<>(items);
}
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
for (String item : items) {
System.out.println(">> current item = " + item);
}
};
}
public static void main(String[] args) {
SpringApplication.run(ChunkJob.class, args);
}
๐กTasklet, Chunk ๊ฐ๋จ ๋น๊ต
๐กย ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ณผ์ ์ด tasklet์์์ ํ๋ฒ์ ์ด๋ค์ง๋ค.
๋ฐฐ์น ์ฒ๋ฆฌ๊ณผ์ ์ด ์ฌ์ด ๊ฒฝ์ฐ ์ฝ๊ฒ ์ฌ์ฉ๋๋ฉฐ, ๋๋์ฒ๋ฆฌ ๊ฒฝ์ฐ ๋ ๋ณต์กํด์ง ์ ์๋ค.
๐กย chunksize ๋จ์๋ก ๋ฐ์ดํฐ๊ฐ ํ์ด์ง์ฒ๋ผ ์ฒ๋ฆฌ๋๋ค.
๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ ์ฌ์ฉ๋๋ฉฐ, reader / processor / writer ๋ก ๊ตฌ๋ถ๋์ด ์ฒ๋ฆฌ๋๋ค.
(reader์ writer๋ ํ์์ด๋ฉฐ, processor๋ ์ฌ์ฉ์ํด๋ ๋๋ค.)
: (ํ์ผ/DB) ๋ฐ์ดํฐ(item)๋ฅผ ์ฝ์ด์ค๋ฉฐ, reader์์์๋ ํ์ด์ง์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ bean๋ค์ด ์๋ค.
ItemReader, MybatisPagingItemReader, JpaPagingItemReader, ...
: reader์์ ์ฝ์ด์จ ๋ฐ์ดํฐ(item)๋ฅผ ํํฐ/์ฒ๋ฆฌ ํ๊ณ write๋ก ๋ณด๋ด๋ ์ญํ .
item์ ํํฐ ๋์ค null๋ก ๋ฆฌํดํ๋ฉด, ๊ทธ item์ write๋ก ์ ๋ฌ๋์ง ๋ชปํ๋ค.
ex) 10๊ฐ read ํ processor์์ 4๊ฐ ํํฐ๋ง ํด์ 6๊ฐ๋ง ๋ฆฌํดํ๋ฉด write์์๋ 6๊ฐ๋ง ์ฒ๋ฆฌํ๋ค.
: processor์์ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ค(items : List)์ ํ์ผ์ด๋ DB์ ์ ์ฌํ๋ ์ญํ .
writer๋ ๊ธฐ๋ณธ์ผ๋ก List๋จ์๋ก ์ฒ๋ฆฌ๋๋ฉฐ, List๋ chunksize์ ์ํด ์ฒ๋ฆฌ๋๋ค.
private String JOBNAME = "testTaskChunkJob";
@Bean
public JobtestTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1())
.build();
}
@Bean
public SteptaskStep1(){
return stepBuilderFactory.get("taskStep1")
.tasklet(tasklet())
.build();
}
private Tasklettasklet(){//tasklet์ผ๋ก ๋ชจ๋ ์ฒ๋ฆฌ
return (contribution, chunkContext) -> {
List<String> items = getItems();
log.info("items : "+ items.toString());
return RepeatStatus.FINISHED;
};
}
private List<String>getItems() {
List<String> items =new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " test!");
}
return items;
}
๋จ์ํ๊ฒ 100๊ฐ์ ์ซ์๋ฅผ ๊ฐ์ ธ์ log๋ฅผ ์ถ๋ ฅํ๋ tasklet์ด๋ค.
tasklet์ ํจ์์์์ ๋ชจ๋ ์ฒ๋ฆฌ๋์ 1~100๊น์ง ๋ชจ๋ ์ถ๋ ฅ๋๋ค.
๊ฒฐ๊ณผ :
items : [0 test!, 1 test!, 2 test!, 3 test!, ... 99 test!]
private String JOBNAME = "testTaskChunkJob";
@Bean
public JobtestTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1())//tasklet ์ฒ๋ฆฌ
.next(this.chunkStep1())//chunk ์ฒ๋ฆฌ
.build();
}
// ...
@Bean
public StepchunkStep1() {//chunksize๋ก ์ฒ๋ฆฌ
return stepBuilderFactory.get("chunkStep1")
.<String,String>chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemReader<String>itemReader() {
return new ListItemReader<>(getItems());
}
private ItemProcessor<String, String>itemProcessor() {
return item -> item + " now processor!!";
}
private ItemWriter<String>itemWriter() {
return items -> log.info("### writer : " + items.toString());
}
chunksize๋ก ์ฒ๋ฆฌ ์, Step์ .chunksize(ํฌ๊ธฐ) ๋ก ์ ์ํ๋ฉฐ ์์ ์ฒ๋ฆฌ๋ ํ์ ์ ์ ์ํ๋ค.
.<String,String>chunk(10)
<String,String>
์์ย String์ย reader์์ ์ฝ์ ๋ฐ์ดํฐ์ ํ์ ์ด๊ณ
๋ค์ย String์ย writer์์ ๋ฐ์ ๋ฐ์ดํฐ์ ํ์ ์ด๋ค.
๋ฐ๋ผ์ processor ์ฒ๋ฆฌ ํจ์์์๋ ์/๋ค ํ์ ์ ๋ชจ๋ ์ ์ธํด์ ํํฐ๋ ๋ฐ์ดํฐ ๋ณํ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.
private ItemReader<String>itemReader() {
returnnew ListItemReader<>(getItems());
}
reader์์ ๋ฆฌํด์ผ๋ก ์ ์ธํ ListItemReader๋ spring batch์์ ์ ๊ณตํ๋ item์ฒ๋ฆฌ ํด๋์ค์ด๋ค.
getItems์์ ๊ฐ์ ธ์จ List์ LIstItemReader์์ ์์์ item๋ฐํํด์ค๋ค.
private ItemProcessor<String, String>itemProcessor() {
return item -> item + " now processor!!";
}
processor์์๋ ์์ ๋งํ๊ฒ๊ณผ ๊ฐ์ดย <reader์์ ์ฝ์ ํ์ , writer์ ๋ณด๋ผ ํ์ >ย ์ผ๋ก ์ ์ํด ์ฒ๋ฆฌํ๋ค.
์ง๊ธ์ ๊ทธ๋ฅ ์ฝ์ item์ ๋ฌธ์์ด๋ง ๋ํด์ ์ฒ๋ฆฌ.
private ItemWriter<String>itemWriter() {
return items -> log.info("### writer : " + items.toString());
}
writer์์ ์ด์ง๋ง,ย List๋จ์๋ก ์ฒ๋ฆฌ๋๋ฉฐ,
items๋ฅผ ๋ฐ์ log๋ฅผ ์ถ๋ ฅํ๋ฉด chunksize=10๊ฐ์ฉ ์ฝ์ String์ด List๋ก ์ถ๋ ฅ๋๋ค.
๊ฒฐ๊ณผ :
writer : [0 test! now processor!!, 1 test! now processor!!, ... 9 test! now processor!!]
writer : [10 test! now processor!!, 11 test! now processor!!, ... 19 test! now processor!!]
...
writer : [90 test! now processor!!, 91 test! now processor!!, ... 99 test! now processor!!]
log๋ฅผ ์ด 10๊ฐ ์ถ๋ ฅํ์ผ๋ฉฐ, ๊ฐ ๋ก๊ทธ๋ items๋ฅผ ์ถ๋ ฅํ๋ค.
reader์์ 10๊ฐ์ฉ ์ฝ๊ณ List ์ผ๋ก items ๊ฐ writer์์ chunksize=10๋งํผ ์ฒ๋ฆฌ๋๋ค.
@Bean
public JobtestTaskChunkJob(){
return jobBuilderFactory.get(JOBNAME)
.incrementer(new RunIdIncrementer())
.start(this.taskStep1())
.next(this.chunkStep1())
.next(this.taskPagingStep1())//tasklet์ผ๋ก ํ์ด์ง ์ฒ๋ฆฌ
.build();
}
//...
private TaskletpagingTasklet() {
List<String> items = getItems();
return (contribution, chunkContext) -> {
//stepexecution : ์ฝ์ item์ ์ ์ฅ
StepExecution stepExecution = contribution.getStepExecution();
int chunksize = 10;
int readCnt = stepExecution.getReadCount();
int idx = readCnt + chunksize;
if(idx > items.size()){
return RepeatStatus.FINISHED;
}
//sublist : list ์ค๊ฐ ๋ฐ์ดํฐ ์ฝ๊ธฐ
List<String> sublist = items.subList(readCnt,idx);
log.info("### sublist size : " + sublist.toString());
stepExecution.setReadCount(idx);//read count ์ฝ์๋งํผ ๋ค์ ์ ์ฅreturn RepeatStatus.CONTINUABLE;
};
}
tasklet์ contributution์์ StepExecution ์ผ๋ก step์ด ์ผ๋ง๋ ์ฒ๋ฆฌ๋๊ณ ์๋์ง์ ์ ๋ณด๋ฅผ ์ ์ ์๋ค.
(chunksize๋ฅผ 10์ผ๋ก ์ ์ํ๊ณ readCount๋ก ํ์ด์ง์ ์ฒ๋ฆฌ)
tasklet์ผ๋ก chunksize=10์ผ๋ก ์ ํด ์ฒ๋ฆฌํ ์ ์์ง๋ง, ์์ค๊ฐ ๊ธธ์ด์ง๊ณ ๋ณต์กํด์ง์ ๋ณผ ์ ์๋ค.
๊ฒฐ๊ณผ :
sublist size : [0 test!, 1 test!, 2 test!, ... 9 test!]
sublist size : [10 test!, 11 test!, 12 test!, ... 19 test!]
...
sublist size : [90 test!, 91 test!, 92 test!, ... 99 test!]
๐กSpring Batch + Job ์คํ ์์
REST ๋ฐฉ์์ผ๋ก ์ก ์คํ
Kotlin
@RestController
class RestJobController(
@Autowired
private val jobLauncher: JobLauncher,
@Autowired
private val jobExplorer: JobExplorer,
@Qualifier("job1") @Autowired
private val job: Job, // Job ๋น์ ์ง์ ์ฃผ์
) {
@PostMapping(path = ["/run"])
@Throws(Exception::class)
fun runJob(@RequestBody request: RestJobDto): ExitStatus {
// JobParametersBuilder ์ getNextJobParameters ๋ฉ์๋๋ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฆ๊ฐ ์ํจ๋ค ex) run.id=1 ์ดํ run.id=2 ..
val jobParameters = JobParametersBuilder(
request.getJobParameters(),
jobExplorer
)
.getNextJobParameters(job)
.toJobParameters()
return jobLauncher.run(job, jobParameters).exitStatus
}
}
Java
@RestController
public class RestJobController {
private JobLauncher jobLauncher;
private JobExplorer jobExplorer;
private Job job;
@Autowired
public RestJobController(
JobLauncher jobLauncher,
JobExplorer jobExplorer,
@Qualifier("job1") Job job
) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.job = job;
}
@PostMapping(path = ["/run"])
public ExitStatus runJob(@RequestBody RestJobDto request) throws Exception {
JobParameters jobParameters = new JobParametersBuilder(
request.getJobParameters(),
jobExplorer
)
.getNextJobParameters(job)
.toJobParameters();
return jobLauncher.run(job, jobParameters).getExitStatus();
}
}
Kotlin
data class RestJobDto(
@JsonProperty("name")
var name: String? = null,
@JsonProperty("jobParameters")
var jobParamsProperties: Properties? = null
) {
fun getJobParameters(): JobParameters {
val jobParametersBuilder = JobParametersBuilder()
// jobParamsProperties๊ฐ null์ด ์๋ ๊ฒฝ์ฐ์๋ง ์ถ๊ฐ
if (jobParamsProperties != null) {
for ((key, value) in jobParamsProperties!!) {
jobParametersBuilder.addString(key.toString(), value.toString())
}
}
return jobParametersBuilder.toJobParameters()
}
}
Java
@Data
@NoArgsConstructor
public class RestJobDto {
@JsonProperty("name")
private String name;
@JsonProperty("jobParameters")
private Properties jobParamsProperties;
public JobParameters getJobParameters() {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
// jobParamsProperties๊ฐ null์ด ์๋ ๊ฒฝ์ฐ์๋ง ์ถ๊ฐ
if (jobParamsProperties != null) {
for (Entry<Object, Object> entry : jobParamsProperties.entrySet()) {
jobParametersBuilder.addString(entry.getKey().toString(), entry.getValue().toString());
}
}
return jobParametersBuilder.toJobParameters();
}
}
์ ์ก ๋ฐ์ดํฐ :
*{
"name" : "job",
"jobParameters" : {
"foo": "bar",
"baz": "quix",
"requestDate":"20231108"
}
}*
Kotlin
@Configuration
class RestJob1 {
private final val chunkSize = 5
@Bean(name = ["job1"])
fun textJob1_batchBuild(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Job {
return JobBuilder("job", jobRepository)
// "Job"์๋ ์ผ๋ฐ์ ์ผ๋ก job parameters๋ฅผ ์์ฑํ๋ incrementer๊ฐ ํ์ํฉ๋๋ค. ์ด incrementer๋ batch job์ ์คํ์ ์๋ณํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
.incrementer(RunIdIncrementer())
.start(textJob1_batchStep1(jobRepository, transactionManager))
.build()
}
@Bean
@Primary
fun textJob1_batchStep1(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Step {
return StepBuilder("textJob1", jobRepository)
.allowStartIfComplete(true) // ๋ฐ๋ณต ์์
์ ์ํ ์คํ
์ค์
.chunk<OneDto, OneDto>(chunkSize, transactionManager)
.reader(textJob1_FileReader(null))
.writer { oneDto ->
oneDto.forEach { item ->
println(item.one)
}
}
.build()
}
@Bean
@StepScope
fun textJob1_FileReader(@Value("#{jobParameters[requestDate]}") requestDate: String?): FlatFileItemReader<OneDto> {
val flatFileItemReader: FlatFileItemReader<OneDto> = FlatFileItemReader<OneDto>()
flatFileItemReader.setResource(ClassPathResource("sample/textJob1_input.txt"))
flatFileItemReader.setLineMapper { line, lineNumber -> OneDto("$lineNumber == $line ${requestDate} ") }
return flatFileItemReader
}
}
Java
@Configuration
public class RestJob1 extends DefaultBatchConfigurer {
private final int chunkSize = 5;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean(name = "job1")
@Primary
public Job textJob1_batchBuild(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(textJob1_batchStep1(jobRepository, transactionManager))
.build();
}
@Bean
public Step textJob1_batchStep1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("textJob1")
.allowStartIfComplete(true)
.<OneDto, OneDto>chunk(chunkSize)
.reader(textJob1_FileReader(null))
.writer(oneDtoList -> oneDtoList.forEach(item -> System.out.println(item.getOne())))
.build();
}
@Bean
@StepScope
public FlatFileItemReader<OneDto> textJob1_FileReader(@Value("#{jobParameters[requestDate]}") String requestDate) {
FlatFileItemReader<OneDto> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new ClassPathResource("sample/textJob1_input.txt"));
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setColumns(new Range[] { new Range(1, 4), new Range(5, 8) });
tokenizer.setNames("one", "two");
tokenizer.setStrict(false);
flatFileItemReader.setLineMapper((line, lineNumber) -> {
OneDto oneDto = new OneDto();
oneDto.setOne(line.substring(0, 4));
oneDto.setTwo(line.substring(4, 8));
oneDto.setRequestDate(requestDate);
return oneDto;
});
return flatFileItemReader;
}
}
@RestController
class RestJobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Qualifier("job1")
@Autowired
private Job job;
@PostMapping(path = "/run")
public ExitStatus runJob(@RequestBody RestJobDto request) throws Exception {
JobParameters jobParameters = new JobParametersBuilder(request.getJobParameters(), jobExplorer)
.getNextJobParameters(job)
.toJobParameters();
return jobLauncher.run(job, jobParameters).getExitStatus();
}
}
๐ชSQLSyntaxErrorException ์๋ฌ ํด๊ฒฐ
๊ธฐ๋ณธ์ ์ผ๋ก H2 DB๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ์ Boot๊ฐ ์คํ๋ ๋ ์๋์ผ๋ก ์์ฑํด์ฃผ์ง๋ง,ย MySQL์ด๋ Oracle๊ณผ ๊ฐ์ DB๋ฅผ ์ฌ์ฉํ ๋๋ ์๋์ผ๋ก ์์ฑ๋์ง ์๋๋ค.ย ๋๋ MySQL์ ์ฌ์ฉํ์๊ธฐ ๋๋ฌธ์ ๋ฐฐ์น๊ฐ ์คํจํ ๊ฒ์ด๋ค.
MySQL์ ์ฌ์ฉํ๋ ค๋ฉด ๋ฉํ ํ ์ด๋ธ์ ์ง์ ์์ฑํ๋ฉด ๋๋ค.
์๊น ์ถ๊ฐํ Spring Batch ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ org.springframework.batch.core ํด๋์ ๊ฐ๋ณด๋ฉด schema-mysql.sql ํ์ผ์ด ์์ ๊ฒ์ด๋ค.
์ด ํ์ผ์ ์์ฑ๋ SQL์ ๊ทธ๋๋ก ์คํํ๋ฉด ๋ฉํ ํ
์ด๋ธ์ ์์ฑํ ์ ์๋ค.
์์ฑ ํ์ ๋ค์ ์ ํ๋ฆฌ์ผ์ด์
์ ์คํํด๋ณด๋ฉด!
์ถ์ฒ : https://velog.io/@cho876/Spring-Batch-job-%EC%83%9D%EC%84%B1 - [Spring Batch] ๊ฐ๋ ๋ฐ Job ์์ฑ
์ถ์ฒ : https://choisblog.tistory.com/80 - Spring Batch - Tasklet, Chunk ๊ฐ๋จ ๋น๊ต
์ถ์ฒ : https://velog.io/@clevekim/Spring-Batch%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80 - Spring Batch๋ ๋ฌด์์ธ๊ฐ?
๋์์ด ๋์์ด์!๐