Spring Batch์—์„œ Job๊ณผ Step๐Ÿ› 

๋ฌธ์ง€์€ยท2022๋…„ 1์›” 6์ผ
0
post-thumbnail

Job

๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ๊ณ ์œ ํ•˜๋ฉฐ ์ˆœ์„œ๊ฐ€ ์ง€์ •๋œ ์Šคํ…์˜ ๋ชฉ๋ก

Job runner

์žก์˜ ์‹คํ–‰์€ Job runer์—์„œ ์‹œ์ž‘๋œ๋‹ค.

  • CommandLineJobRunner : ์Šคํฌ๋ฆฝํŠธ or ๋ช…๋ นํ–‰ ์ด์šฉ
  • JobRegistryBackgroundJobRunner : ์ž๋ฐ” ํ”„๋กœ์„ธ์Šค ๋‚ด์—์„œ ์ฟผ์ธ ๋‚˜ JMX ํ›„ํฌ ๊ฐ™์€ ์Šค์ผ€์ค„๋Ÿฌ ์ด์šฉ
  • JobLauncherCommandLineRunner : Spring Boot ์„œ๋ฒ„๊ฐ€ ์˜ฌ๋ผ๊ฐˆ ๋•Œ ๋ชจ๋“  Job ํƒ€์ž…์˜ ๋นˆ์„ ์‹คํ–‰

Job -> JobInstance -> JobExecution
JobInstanace๋Š” ์„ฑ๊ณต์ ์œผ๋กœ ์ˆ˜ํ–‰๋œ JobExecution์ด ์žˆ๋‹ค๋ฉด ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผ. ํ•œ๋ฒˆ ์™„๋ฃŒ๋˜๋ฉด ๋‹ค์‹œ ์‹คํ–‰ ๋ถˆ๊ฐ€

Job ๊ตฌ์„ฑํ•˜๊ธฐ

application.yml ํŒŒ์ผ

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/batch?serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=

# spring batch์™€ ๊ด€๋ จ๋œ table๋“ค์ด rdbms์— ์ž๋™์œผ๋กœ ์ƒ์„ฑ
spring.batch.jdbc.initialize-schema=always

java ํŒŒ์ผ

// spring batch์— ํ•„์š”ํ•œ bean๋“ค autowired ๊ฐ€๋Šฅ
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchApplication
{

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1()
    {
    	// step์ด FINISHED ์ƒํƒœ๋กœ ์™„๋ฃŒ๋˜์–ด์•ผ ์„ฑ๊ณต์œผ๋กœ ์ธ์‹
        return this.stepBuilderFactory.get("step1")
            .tasklet(
            	// stepContribution : ์•„์ง ์ปค๋ฐ‹๋˜์ง€ ์•Š์€ ํ˜„์žฌ ํŠธ๋žœ์žญ์…˜์— ๋Œ€ํ•œ ์ •๋ณด
                // chunckContext : ์‹คํ–‰ ์‹œ์ ์˜ job ์ƒํƒœ ์ œ๊ณต
                ((stepContribution, chunkContext) -> {
                    System.out.println("Hello World!");
                    return RepeatStatus.FINISHED;
                })
            ).build();
    }

    // ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ€๋™ ์‹œ job์œผ๋กœ ์ธ์‹๋˜๋Š” bean๋“ค ์ž๋™ ์‹คํ–‰
    @Bean
    public Job job()
    {
    	// step1์„ ํฌํ•จํ•˜๋Š” job ์ƒ์„ฑ
        return this.jobBuilderFactory.get("job")
            .start(step1())
            .build();
    }

    public static void main(String[] args)
    {
        SpringApplication.run(SpringBatchApplication.class, args);
    }

}

job ์‹คํ–‰ -> step1 ์‹คํ–‰ -> hello world ์ถœ๋ ฅ -> job ์™„๋ฃŒ(status : COMPLETED)


COMPLETED๋กœ job์ด ์™„๋ฃŒ๋˜์—ˆ์œผ๋ฏ€๋กœ ๋˜‘๊ฐ™์€ job๊ณผ parameter๋กœ ์‹คํ–‰ํ•  ๊ฒฝ์šฐ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.


RDBMS์—๋Š” batch์™€ ๊ด€๋ จ๋œ ํ…Œ์ด๋ธ”๋“ค์ด ์ƒ์„ฑ
ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์—๋Š” ์‹คํ–‰ํ–ˆ๋˜ job๊ณผ step์— ๋Œ€ํ•œ ๋‚ด์šฉ๋“ค์ด ์ €์žฅ๋œ๋‹ค.

Job Parameter

Job์— parameter๋ฅผ ์ „๋‹ฌํ•˜๋Š” ๊ฒƒ์€ ๋‘๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

  • Chunck Context
    @Bean
    public Tasklet helloWorldTasklet()
    {
        return ((stepContribution, chunkContext) -> {

            // chunckContext๋ฅผ ํ†ตํ•ด ์‹คํ–‰ ์‹œ์ ์˜ job ์ƒํƒœ์—์„œ parameter ๊ฐ€์ ธ์˜ค๊ธฐ
            // map<String, Object>์ด๋ฏ€๋กœ type casting ํ•„์š”
            String name = (String) chunkContext.getStepContext()
                .getJobParameters()
                .get("name");

            System.out.printf("Hello World! %s%n", name);
            return RepeatStatus.FINISHED;
        });
    }
  • Late Binding
    @StepScope
    @Bean
    public Tasklet helloWorldTasklet(@Value("#{jobParameters['name']}") String name)
    {
        return ((stepContribution, chunkContext) -> {
            System.out.printf("Hello World! %s%n", name);
            return RepeatStatus.FINISHED;
        });
    }

Late Binding ๋ฐฉ์‹์—๋Š” Step์ด๋‚˜ Job Scope๋ฅผ ๊ฐ€์ ธ์•ผ ํ•œ๋‹ค.

StepScope, JobScope๋ž€?
bean ์ƒ์„ฑ ์‹œ์ ์„ Step, Job ์‹œ์ ์œผ๋กœ ๋Šฆ์ถ˜๋‹ค. ๋งŒ์•ฝ ์„œ๋ฒ„๋ฅผ ์˜ฌ๋ฆด ๋•Œ tasklet์„ ํ•œ๋ฒˆ์— ๋ชจ๋‘ ์ƒ์„ฑ์‹œํ‚ค๋ฉด ํ•œ tasklet์— ๋Œ€ํ•ด์„œ ๋™์‹œ์— ์—ฌ๋Ÿฌ step๋“ค์ด ์‹คํ–‰๋˜๋ฉด์„œ ์นจ๋ฒ”๋‹นํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด์„œ ์‚ฌ์šฉํ•œ๋‹ค.

Job Parameter์˜ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ

    @Bean
    public CompositeJobParametersValidator validator()
    {
        CompositeJobParametersValidator validator = new CompositeJobParametersValidator();

        DefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator(
            new String[]{"fileName"},
            new String[]{"name"}
        );

        defaultJobParametersValidator.afterPropertiesSet();

        // ์—ฌ๋Ÿฌ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ๋ฅผ ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ์—ฌ๋Ÿฌ๊ฐœ์˜ validator๋ฅผ CompositeJobParametersValidator์— ์ถ”๊ฐ€
        validator.setValidators(
            Arrays.asList(
            	// ๋ฏธ๋ฆฌ ๋งŒ๋“ค์–ด๋‘” ParamerterValidator
                new ParameterValidator(),
                // ์œ„์—์„œ ๋งŒ๋“  defaultJobParametersValidator
                defaultJobParametersValidator
            )
        );

        return validator;
    }

parameter ์ด์šฉํ•ด์„œ job ์—ฌ๋Ÿฌ๋ฒˆ ์‹คํ–‰ํ•˜๊ธฐ

incrementer ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•

    @Bean
    public Job job()
    {
        return this.jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(step1())
            .build();
    }

job repository์—์„œ BATCH_JOB_EXECUTION_PARAMS์„ ์‚ดํŽด๋ณด๋ฉด run.id๊ฐ€ ์ฆ๊ฐ€ํ•˜๊ณ  ์žˆ๋Š” ๊ฑธ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค!

๊ทธ ์™ธ์—๋„ DailyJobTimestamper๋ฅผ ์ด์šฉํ•ด์„œ ํ˜„์žฌ ์‹œ๊ฐ„์„ ์ด์šฉํ•ด์„œ ์žก์„ ๋ฐ˜๋ณต ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

ํ˜„์žฌ ์šฐ๋ฆฌํŒ€ ์ฝ”๋“œ๋ฅผ ์‚ดํŽด๋ณด๋‹ˆ incrementer๋ฅผ ์‚ฌ์šฉํ•˜์ง„ ์•Š๊ณ  ํ˜„์žฌ ์‹œ๊ฐ„์œผ๋กœ ์žก ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ƒ์„ฑํ•ด์„œ ๋„˜๊ฒจ์ฃผ๊ณ  ์žˆ์—ˆ๋‹ค.

public void runJob(Job targetJob) throws JobParametersInvalidException,
        JobExecutionAlreadyRunningException,
        JobRestartException, JobInstanceAlreadyCompleteException
    {
        try
        {
        // ํ˜„์žฌ ์‹œ๊ฐ„์„ parameter์— ์ถ”๊ฐ€ํ•จ์œผ๋กœ์จ ์—ฌ๋Ÿฌ๋ฒˆ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋„๋ก ํ•จ
            JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
            jobLauncher.run(targetJob, params);
        }
        catch (Exception e)
        {
            log.info("{} job ์Šค์ผ€์ฅด ๋“ฑ๋ก์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.", targetJob.getName());
            throw e;
        }
    }

Job Listener

Job Listener๋ฅผ ์ด์šฉํ•ด์„œ ์Šคํ”„๋ง ๋ฐฐ์น˜์˜ ์ƒ๋ช… ์ฃผ๊ธฐ์˜ ์—ฌ๋Ÿฌ ๋กœ์ง(์•Œ๋ฆผ, ์ดˆ๊ธฐํ™”, ์ •๋ฆฌ)์„ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค. ์—ฌ๊ธฐ์—๋„ ๋‘๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

  • JobExecutionListener ์ธํ„ฐํŽ˜์ด์Šค ์ด์šฉ
public class JobLoggerListener implements JobExecutionListener
{
    private static String START_MESSAGE = "%s is beginning execution";
    private static String END_MESSAGE = "%s has completed with the status %s";

    @Override
    public void beforeJob(JobExecution jobExecution)
    {
        System.out.printf((START_MESSAGE) + "%n", jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution)
    {
        System.out.printf(
            (END_MESSAGE) + "%n",
            jobExecution.getJobInstance().getJobName(),
            jobExecution.getStatus()
        );
    }
}
  • @BeforeJob, @AfterJob ์–ด๋…ธํ…Œ์ด์…˜ ์ด์šฉ
public class JobLoggerListener
{
    private static String START_MESSAGE = "%s is beginning execution";
    private static String END_MESSAGE = "%s has completed with the status %s";

    @BeforeJob
    public void beforeJob(JobExecution jobExecution)
    {
        System.out.printf((START_MESSAGE) + "%n", jobExecution.getJobInstance().getJobName());
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution)
    {
        System.out.printf(
            (END_MESSAGE) + "%n",
            jobExecution.getJobInstance().getJobName(),
            jobExecution.getStatus()
        );
    }
}

ExecutionContext

ExecutionContext๋Š” ์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ job๊ณผ step์— ๋Œ€ํ•œ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ  ์žˆ๋‹ค. ๋˜ํ•œ ํ•˜๋‚˜์˜ job๊ณผ step์— ๋Œ€ํ•ด์„œ ๊ฐ๊ฐ์˜ ExecutionContext๋ฅผ ๊ฐ€์ง„๋‹ค.

ExecutionContext ์กฐ์ž‘ํ•˜๊ธฐ

public class HelloWorld implements Tasklet
{
    public static final String HELLO_WORLD = "Hello World, %s";

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception
    {
        String name = (String) chunkContext.getStepContext()
            .getJobParameters()
            .get("name");

	// ExecutionContext ์ ‘๊ทผ
        ExecutionContext jobContext = chunkContext.getStepContext()
            .getStepExecution()
            .getJobExecution()
            .getExecutionContext();

	// ExecutionContext ์— user.name ์ถ”๊ฐ€ 
        jobContext.put("user.name", name);

        System.out.printf(HELLO_WORLD, name);
        return RepeatStatus.FINISHED;
    }
}
    // step์— ์ถ”๊ฐ€ํ•  listener
    @Bean
    public StepExecutionListener promotionListener()
    {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
		
        // listener์—์„œ name ํ‚ค ์Šน๊ฒฉ
        listener.setKeys(new String[] {"name"});
        return listener;
    }

Step

step์€ job์˜ ๊ตฌ์„ฑ ์š”์†Œ๋กœ ์ž์ฒด์ ์ธ ์ž…๋ ฅ, ์ถœ๋ ฅ, ์ฒ˜๋ฆฌ๋ฅผ ๊ฐ€์ง„๋‹ค. state machine์œผ๋กœ ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค. ํŠธ๋žœ์žญ์…˜์€ step ๋‚ด์—์„œ ์ด๋ค„์ง„๋‹ค.

Tasklet ๊ธฐ๋ฐ˜ Step

Tasklet ๊ธฐ๋ฐ˜ Step์„ ๋งŒ๋“œ๋Š”๋ฐ๋Š” ๋‘๊ฐ€์ง€ ์œ ํ˜•์ด ์žˆ๋‹ค.

1. MethodInvokingTaskletAdapter
์‚ฌ์šฉ์ž๊ฐ€ ์ž‘์„ฑํ•œ ์ฝ”๋“œ๋ฅผ Tasklet Step ์ฒ˜๋Ÿผ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ์‹
์ผ๋ฐ˜ POJO๋ฅผ Step์œผ๋กœ ํ™œ์šฉ ๊ฐ€๋Šฅ

2. Tasklet ์ธํ„ฐํŽ˜์ด์Šค ๊ตฌํ˜„
์ง€๊ธˆ๊นŒ์ง€ ์œ„์—์„œ ์‚ฌ์šฉํ•œ ๋ฐฉ์‹์ด๋‹ค. ์ด ๋•Œ Tasklet ์ธํ„ฐํŽ˜์ด์Šค๋Š” ํ•จ์ˆ˜ํ˜• ์ธํ„ฐํŽ˜์ด์Šค์ด๋ฏ€๋กœ ๋žŒ๋‹ค์‹์œผ๋กœ ๊ตฌํ˜„ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

Tasklet ๊ตฌํ˜„์ฒด์˜ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด RepeatStatus ๊ฐ์ฒด๋ฅผ ๋ฆฌํ„ดํ•ด์•ผ ํ•œ๋‹ค. (์ด๊ฒŒ ๋ฐ”๋กœ state machine์œผ๋กœ ์ƒ๊ฐ๋˜๋Š” ์ง€์ !)

public enum RepeatStatus {
    CONTINUABLE(true),		// ์–ด๋–ค ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ๋ฐ˜๋ณต ์‹คํ–‰
    FINISHED(false);		// ์„ฑ๊ณต ์—ฌ๋ถ€ ๊ด€๊ณ„ ์—†์ด tasklet ์ฒ˜๋ฆฌ ์™„๋ฃŒ ํ›„ ๋‹ค์Œ ์ฒ˜๋ฆฌ
}

๊ทธ ์™ธ 3๊ฐ€์ง€ ๋‹ค๋ฅธ Tasklet ๊ตฌํ˜„์ฒด

1. CallableTaskletAdapter
Tasklet์ด Step์ด ์‹คํ–‰๋˜๋Š” ์Šค๋ ˆ๋“œ์™€ ๋ณ„๊ฐœ๋กœ ์ƒˆ๋กœ์šด ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋œ๋‹ค. ํ•˜์ง€๋งŒ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋˜์ง€๋Š” ์•Š๋Š”๋‹ค.
Callable ๊ฐ์ฒด๊ฐ€ RepeatStatus๋ฅผ ๋ฆฌํ„ดํ•˜๊ธฐ ์ „๊นŒ์ง€๋Š” ํ•ด๋‹น Step์ด ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผ๋˜์ง€ ์•Š๋Š”๋‹ค. ๊ทธ๋ž˜์„œ ๋‹ค์Œ Step์ด ์‹คํ–‰๋  ์ˆ˜ ์—†๋‹ค.

    @Bean
    public Callable<RepeatStatus> callableObject()
    {
        return () -> {
            System.out.println("This was executed in another thread");
            
	    // Callable ๊ฐ์ฒด๊ฐ€ RepeatStatus๋ฅผ ๋ฆฌํ„ดํ•ด์•ผ tasklet ์™„๋ฃŒ๋กœ ๊ฐ„์ฃผ
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public CallableTaskletAdapter tasklet()
    {
        CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();

        callableTaskletAdapter.setCallable(callableObject());

        return callableTaskletAdapter;
    }

2. MethodInvokingTaskletAdapter
MethodInvokingTaskletAdapter๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๊ธฐ์กด์˜ ๋‹ค๋ฅธ ํด๋ž˜์Šค์˜ ๋ฉ”์„œ๋“œ๋ฅผ Step ๋‚ด์˜ Tasklet์œผ๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.

    @StepScope
    @Bean
    public MethodInvokingTaskletAdapter methodInvokingTasklet(@Value("#{jobParameters['message']}") String message)
    {
        MethodInvokingTaskletAdapter methodInvokingTaskletAdapter = new MethodInvokingTaskletAdapter();

        methodInvokingTaskletAdapter.setTargetObject(service());
        methodInvokingTaskletAdapter.setTargetMethod("serviceMethod");
        methodInvokingTaskletAdapter.setArguments(new String[] {message});

        return methodInvokingTaskletAdapter;
    }

    // ๋‚ด๊ฐ€ ์ƒ์„ฑํ•œ ํด๋ž˜์Šค(์ผ๋ฐ˜ POJO)
    @Bean
    public CustomService service()
    {
    	// ์ด ๋•Œ CustomService ํด๋ž˜์Šค์˜ serviceMethod๊ฐ€ message ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ด์šฉํ•˜์ง€ ์•Š์œผ๋ฉด ์—๋Ÿฌ ๋ฐœ์ƒ
        return new CustomService();
    }

3. SystemCommandTasklet
์‹œ์Šคํ…œ ๋ช…๋ น์„ ์‚ฌ์šฉํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋ฉฐ ํ•ด๋‹น ๋ช…๋ น์€ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰๋œ๋‹ค.

    @Bean
    public SystemCommandTasklet systemCommandTasklet()
    {
        SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet();

        systemCommandTasklet.setCommand("rm -rf /tmp.txt");
        systemCommandTasklet.setTimeout(5000);
        // ์‹œ์Šคํ…œ ๋ช…๋ น์ด ๋น„์ •์ƒ ์ข…๋ฃŒ๋ ๋•Œ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ•์ œ ์ข…๋ฃŒํ• ์ง€ ์—ฌ๋ถ€ ์„ค์ •
        systemCommandTasklet.setInterruptOnCancel(true);

        return systemCommandTasklet;
    }

์ถ”๊ฐ€๋กœ ๋‹ค์–‘ํ•œ ๊ธฐ๋Šฅ์„ ์†Œ๊ฐœํ•œ๋‹ค.

    @Bean
    public SystemCommandTasklet systemCommandTasklet()
    {
        SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet();

        systemCommandTasklet.setCommand("rm -rf /tmp.txt");
        systemCommandTasklet.setTimeout(5000);
        systemCommandTasklet.setInterruptOnCancel(true);

	// working directory ์„ค์ •
        systemCommandTasklet.setWorkingDirectory("/Users/we/spring-batch");

	// ExitCode ์„ค์ •
        systemCommandTasklet.setSystemProcessExitCodeMapper(touchCodeMapper());
        systemCommandTasklet.setTerminationCheckInterval(5000);
        // Lock์ด ๊ฑธ๋ฆฌ์ง€ ์•Š๋„๋ก ๋น„๋™๊ธฐ executor ์„ค์ •
        systemCommandTasklet.setTaskExecutor(new SimpleAsyncTaskExecutor());
        // ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์„ค์ •
        systemCommandTasklet.setEnvironmentParams(new String[] {
            "JAVA_HOME=/java",
            "BATCH_HOME=/Users/batch"
        });

        return systemCommandTasklet;
    }

    @Bean
    public SimpleSystemProcessExitCodeMapper touchCodeMapper()
    {	
    	// ์ข…๋ฃŒ ์ƒํƒœ์— ๋”ฐ๋ผ ExitStatus.COMPLETED, ExitStatus.FAILED ๋ฆฌํ„ด
        return new SimpleSystemProcessExitCodeMapper();
    }

Chunk ๊ธฐ๋ฐ˜ Step

    @Bean
    public Step step1()
    {
        return this.stepBuilderFactory.get("step1")
            // chunk size ๋Š” 10
            // 10๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฝ๊ณ  ์ฒ˜๋ฆฌํ•  ๋•Œ๊นŒ์ง€ ์“ฐ๊ธฐ ์ž‘์—… ํ•˜์ง€ ์•Š์Œ
            .<String, String>chunk(10)
            .reader(itemReader(null))
            .writer(itemWriter(null))
            .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<String> itemReader(@Value("#{jobParameters['inputFile']}") Resource inputFile)
    {
        return new FlatFileItemReaderBuilder<String>()
            .name("itemReader")
            .resource(inputFile)
            .lineMapper(new PassThroughLineMapper())
            .build();
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<String> itemWriter(@Value("#{jobParameters['outputFile']}") Resource outputFile)
    {
        return new FlatFileItemWriterBuilder<String>()
            .name("itemWriter")
            .resource(outputFile)
            .lineAggregator(new PassThroughLineAggregator<>())
            .build();
    }

ํฌ๊ธฐ๊ฐ€ ๋™์ผํ•˜์ง€ ์•Š์€ Chunk๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ

์ด ๋•Œ๋Š” Chunk ํฌ๊ธฐ๋ฅผ ํ•˜๋“œ ์ฝ”๋”ฉ์„ ํ•  ์ˆ˜ ์—†๋‹ค. ์ด ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒŒ SimpleCompletionPolicy, TimeoutTerminationPolicy์ด๋‹ค.

    @Bean
    public Step chunkStep()
    {
        return this.stepBuilderFactory.get("chunkStep")
            .<String, String>chunk(completionPolicy())
            .reader(itemReader())
            .writer(itemWriter())
            .build();
    }

    @Bean
    public ListItemReader<String> itemReader()
    {
        List<String> items = new ArrayList<>(10000);

        for (int i = 0; i < 10000; i++)
        {
            items.add(UUID.randomUUID().toString());
        }

        return new ListItemReader<>(items);
    }

    @Bean
    public ItemWriter<String> itemWriter()
    {
        return items -> {
            for (String item: items)
            {
                System.out.println(">> current item = " + item);
            }
        };
    }

    @Bean
    public CompletionPolicy completionPolicy()
    {
        CompositeCompletionPolicy policy = new CompositeCompletionPolicy();

        policy.setPolicies(
            new CompletionPolicy[]
                {
                	// ์ฒญํฌ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ๋„˜์„ ๊ฒฝ์šฐ ์•ˆ์ „ํ•˜๊ฒŒ ๋น ์ ธ๋‚˜์˜ค๊ฒŒ ํ•ด์คŒ
                    // timeout ๋ฐœ์ƒ์‹œ ํ•ด๋‹น ์ฒญํฌ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผ
                    new TimeoutTerminationPolicy(3),
                    // 100๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ์”ฉ ๋Š์–ด์„œ ์ž‘์—…
                    new SimpleCompletionPolicy(100)
                }
        );

        return policy;
    }

TimeoutTerminationPolicy๋กœ ์ธํ•ด ์ปค๋ฐ‹ ๊ฐœ์ˆ˜๊ฐ€ 10000/100 = 100 ๊ฐœ๊ฐ€ ์•„๋‹Œ 105๊ฐœ ์ธ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Step Listener

Job Listener์™€ ๋น„์Šทํ•˜๊ฒŒ AfterStep, BeforeStep, AfterChunk, BeforeChunk๋ฅผ ์ด์šฉํ•ด์„œ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ๋ฅผ ํ•  ์ˆ˜ ์žˆ๋‹ค.

Step Flow

์กฐ๊ฑด ๋กœ์ง

on ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•ด์„œ step์˜ ExitStatus์— ๋”ฐ๋ผ ์–ด๋–ค ์ผ์„ ์ˆ˜ํ–‰ํ• ์ง€ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

@Bean
public Job job()
{
	return this.jobBuilderFactory.get("conditionalJob")
    	.start(firstStep())
        // ์‹คํŒจ์‹œ failureStep
        .on("FAILED").to(failureStep())
        // FAILED ์™ธ์˜ ๊ฒฝ์šฐ์—” successStep
        .from(firstStep()).on("*").to(sucessStep())
        .end()
        .build();
}

* : 0๊ฐœ ์ด์ƒ์˜ ๋ฌธ์ž์™€ ์ผ์น˜
? : 1๊ฐœ์˜ ๋ฌธ์ž์™€ ์ผ์น˜

decider๋ฅผ ๋”ฐ๋กœ ๋‘ฌ์„œ ์ด์— ๋”ฐ๋ผ ๋‹ค์Œ step์„ ๊ฒฐ์ •ํ•˜๊ฒŒ๋” ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

public class RandomDecider implements JobExecutionDecider
{
    private Random random = new Random();
    
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution)
    {
        if (random.nextBoolean())
            return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
        else
            return new FlowExecutionStatus(FlowExecutionStatus.FAILED.getName());
    }
}

์žก ์ข…๋ฃŒํ•˜๊ธฐ

  • Completed : ์„ฑ๊ณต์ ์œผ๋กœ ์ข…๋ฃŒ. ๋™์ผ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋‹ค์‹œ ์‹คํ–‰ ๋ถˆ๊ฐ€
  • Failed : ์„ฑ๊ณต์ ์œผ๋กœ ์ข…๋ฃŒ๋˜์ง€ ์•Š์Œ. ๋™์ผ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์‹คํ–‰ ๊ฐ€๋Šฅ
  • Stopped : ๋‹ค์‹œ ์‹œ์ž‘ ๊ฐ€๋Šฅ. ์ค‘๋‹จ๋œ ์œ„์น˜๋ถ€ํ„ฐ ์žก์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Œ
@Bean
public Job job()
{
	return this.jobBuilderFactory.get("conditionalJob")
    	.start(firstStep())
        
        .on("FAILED").end()		// ์Šคํ…์ด ๋ฆฌํ„ดํ•œ ์ƒํƒœ์— ์ƒ๊ด€์—†์ด COMPLETED ์ €์žฅ
        
        .on("FAILED").fail()		// ์‹คํŒจํ•˜๋ฉด FAILED. ๋‹ค์‹œ ์‹คํ–‰ ๊ฐ€๋Šฅ
        
        .on("FAILED").stopAndRestart(sucessStep())		// FAILED๋กœ ์ข…๋ฃŒ๋˜์ง€๋งŒ ์žฌ์‹คํ–‰ ์‹œ successStep๋ถ€ํ„ฐ ์‹คํ–‰
        
        .from(firstStep()).on("*").to(successStep())
        .build();
}

flow ์™ธ๋ถ€ํ™”ํ•˜๊ธฐ

Step ์˜ ์ •์˜๋ฅผ ์ถ”์ถœํ•ด์„œ ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ปดํฌ๋„ŒํŠธ๋กœ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. ์—ฌ๊ธฐ์—” ๋‘๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

1. ์Šคํ…์˜ ์‹œํ€€์Šค๋ฅผ ๋…์ž์ ์ธ flow๋กœ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•

 @Bean
 Public Flow preprocessingFlow()
 {
 	return new FlowBuilder<Flow>("preProcessingFlow").start(loadFileStep())
    .next(loadCustomerStep())
    .next(updateStartStep())
    .build();
 }

flow builder๋ฅผ ์ด์šฉํ•ด์„œ flow๋ฅผ ์ƒ์„ฑํ•˜์—ฌ jobBuilder์—๊ฒŒ ๋„˜๊ธด๋‹ค.

2. flow step์„ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•
์œ„์™€ ๋น„์Šทํ•˜์ง€๋งŒ flow๋ฅผ step์„ wrappingํ•˜๊ณ  ํ•ด๋‹น step์„ job builder๋กœ ์ „๋‹ฌํ•œ๋‹ค.

@Bean
public Step initializeBatch()
{
	return this.stepBuilderFactory.get("initializeBatch")
    	.flow(preprocessingFlow())
        .build();
}

1๋ฒˆ ๋ฐฉ๋ฒ•๊ณผ ๋ฌด์Šจ ์ฐจ์ด๊ฐ€ ์žˆ์„๊นŒ? ๋ฐ”๋กœ JobRepository์—์„œ ์ฐจ์ด๊ฐ€ ๋‚œ๋‹ค. 1๋ฒˆ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•˜๋ฉด job์— step์„ ๊ตฌ์„ฑํ•˜๋Š” ๊ฒƒ๊ณผ ๊ฒฐ๊ณผ์ ์œผ๋กœ ๋™์ผํ•˜๋‹ค. ํ•˜์ง€๋งŒ 2๋ฒˆ ๋ฐฉ๋ฒ•์€ flow๊ฐ€ ๋‹ด๊ธด step์„ ํ•˜๋‚˜์˜ step์ฒ˜๋Ÿผ ๊ธฐ๋กํ•œ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด์„œ ๊ฐœ๋ณ„ step์„ ์ง‘๊ณ„ํ•˜์ง€ ์•Š๊ณ ๋„ flow์˜ ์˜ํ–ฅ์„ ์ „์ฒด์ ์œผ๋กœ ๋ชจ๋‹ˆํ„ฐ๋ง ๊ฐ€๋Šฅํ•˜๋‹ค.

3. job ๋‚ด์—์„œ ๋‹ค๋ฅธ job์„ ํ˜ธ์ถœํ•˜๋Š” ๋ฐฉ๋ฒ•
job์„ step์œผ๋กœ wrappingํ•˜๊ณ  ํ•ด๋‹น step์€ ๋˜ ๋‹ค๋ฅธ job์—์„œ ํ˜ธ์ถœ๋œ๋‹ค.

@Bean
public Step initializeBatch()
{
	return this.stepBuilderFactory.get("initializeBatch")
    	.job(preprocessingJob())
        .parameterExtractor(new DefaultJobParametersExtractor())
        .build();
}

ํ•˜์ง€๋งŒ ์œ„์˜ ๋ฐฉ๋ฒ•์€ job๊ณผ์˜ ์˜์กด์„ฑ์„ ๋†’์—ฌ์„œ ๊ฐœ๋ณ„ job์˜ ๊ด€๋ฆฌ๊ฐ€ ์–ด๋ ค์›Œ์ง€๋Š” ๋ฌธ์ œ์ ์ด ์ƒ๊ธธ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ํ”ผํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

profile
๋ฐฑ์—”๋“œ ๊ฐœ๋ฐœ์ž์ž…๋‹ˆ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€