[스프링 배치] 청크 단위 트랜잭션

조갱·2025년 5월 17일
0

이전 포스팅 에서 아래와 같은 내용이 있었다.

TransactionTemplate 에 의해 실행되기 때문에 트랜잭션이 적용되며,
이는 곧 청크 단위의 트랜잭션 (Commit, Rollback) 이 보장된다.

이 내용에 대해 좀더 깊게 살펴보자.
결론은 맨 아래에..

전체 아키텍쳐

아키텍처를 따라가보자.

트랜잭션이 시작되는 곳

TaskletStep#doExecute(StepExecution)

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
    ... 중략

    stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
        @Override
        public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception {        	
            ... 중략
        }
    });
}

코드를 좀 더 한눈에 보기 위해 일부 코드를 중략했다.
위 코드가 chunk 단위로 트랜잭션이 적용됨 을 확인할 수 있는 시작점이다.
위 코드를 분석해보자.

  1. stepOperations는 StepContextRepeatCallback 타입을 받아 iterate를 돌린다.
  2. StepContextRepeatCallback 은 추상 클래스로서, 반환형이 RepeatStatus 인 doInChunkContext(RepeatContext, ChunkContext) 를 멤버로 가진다.
  3. 그래서 doInChunkContext(RepeatContext, ChunkContext)를 오버라이드 한다.

우선 여기까지 짚고 넢어가야 할 부분은,

  • stepOperations 는 RepeatOperations 타입이지만, 실제로는 RepeatOperations 을 상속받는 RepeatTemplate 타입이다. 그래서 위에 아키텍쳐의 시작부분에 TaskletStep 이 RepeatTemplate 를 실행한다는 그림이 된다.
  • 그리고, 일반적으로 Iterate 라면 Collection 타입의 객체 (List, Array, Set...)를 반복문 돌릴 것 같지만, 여기서는 Collection이 아닌 RepeatOperations 단일 객체에 대해 반복된다.
    • 물론 RepeatOperations 의 필드에 Collection 타입이 있는것도 아니다.
      단지 RepeatStatus 를 반환하는 iterate 메소드만을 멤버로 가진다.

doInChunkContext

@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception {
    ... 중략

    RepeatStatus result;
    try {
        result = new TransactionTemplate(transactionManager, transactionAttribute)
                	.execute(new ChunkTransactionCallback(chunkContext, semaphore));
    } catch (UncheckedTransactionException e) {
        throw (Exception) e.getCause();
    }
    ... 중략

    return result == null ? RepeatStatus.FINISHED : result;
    }
}

이번엔 로직이 실행되는 doInChunkContext 를 살펴보자.

  1. TransactionTemplate 을 통해 트랜잭션에 필요한 정보(transactionManager, transactionAttribute)를 설정한다.
  2. execute 메소드를 통해 로직을 실행한다. (여기서 트랜잭션이 적용된다.)

execute 메소드를 좀 더 깊게 보자.

@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
    if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
        return ((CallbackPreferringPlatformTransactionManager)this.transactionManager).execute(this, action);
    } else {
        TransactionStatus status = this.transactionManager.getTransaction(this);

        Object result;
        try {
            result = action.doInTransaction(status);
        } catch (Error | RuntimeException var5) {
            this.rollbackOnException(status, var5);
            throw var5;
        } catch (Throwable var6) {
            this.rollbackOnException(status, var6);
            throw new UndeclaredThrowableException(var6, "TransactionCallback threw undeclared checked exception");
        }

        this.transactionManager.commit(status);
        return result;
    }
}

트랜잭션 내에서 ChunkTransactionCallback 이 실행되는 것을 확인할 수 있다.
그러면 다음으로, action.doInTransaction(TransactionStatus) 를 확인해보자.

@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
    TransactionSynchronizationManager.registerSynchronization(this);

    RepeatStatus result = RepeatStatus.CONTINUABLE;

    ... 중략
    try {
        try {
            try {
                result = tasklet.execute(contribution, chunkContext);
                if (result == null) {
                    result = RepeatStatus.FINISHED;
                }
            }
            catch (Exception e) {
                if (transactionAttribute.rollbackOn(e)) {
                    chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
                    throw e;
                }
            }
        }
        ... 중략
    }
    ... 중략
    return result;
}

tasklet (ChunkOrientedTasklet) 의 execute 를 통해
chunkProvider 에서 ItemRead,
chunkProcessor 에서 ItemProcess, ItemWrite 가 실행된다.
그리고 결과에 따라 RepeatStatus 를 반환한다.

어떻게 반복하지?

지금까지 확인한 코드를 통해 아래와 같은 동작방식을 확인했다.

Job 이 TaskletStep 을 실행한다.
TaskletStep 내의 doExecute 가 실행된다.
RepeatTemplate 의 iterate 를 통해 StepContextRepeatCallback 이 실행된다.
StepContextRepeatCallback 에서는 TransactionTemplate 의 execute 메소드를 통해 ChunkOrientedTasklet 가 실행된다.

ChunkOrientedTasklet 이 1회 실행되는건 알았는데, 그 이후에 더 이상 읽을 아이템이 없을 때까지 반복은 어떻게 되는걸까?

RepeatTemplate

클래스 명에서도 알 수 있듯, 반복 작업을 위한 기능을 제공한다.

private RepeatStatus executeInternal(final RepeatCallback callback) {
    ... 중략
    try {
        while (running) {
            for (int i = 0; i < listeners.length; i++) {
                RepeatListener interceptor = listeners[i];
                interceptor.before(context);
                running = running && !isMarkedComplete(context);
            }

            // Check that we are still running (should always be true) ...
            if (running) {
                try {
                    result = getNextResult(context, callback, state);
                    executeAfterInterceptors(context, result);
                }
                catch (Throwable throwable) {
                    doHandle(throwable, context, deferred);
                }
                // N.B. the order may be important here:
                if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
                    running = false;
                }
            }
        }

        result = result.and(waitForResults(state));
        for (Throwable throwable : throwables) {
            doHandle(throwable, context, deferred);
        }
        // Explicitly drop any references to internal state...
        state = null;
    }
    ...
    return result;
}
protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) throws Throwable {
    update(context);
    if (logger.isDebugEnabled()) {
        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
    }
    return callback.doInIteration(context);

}

RepeatTemplate이 running 상태일 동안에, 계속하여 RepeatCallback의 doInIteration 을 호출한다.

StepContextRepeatCallback 의 doInIteration 을 살펴보면,

@Override
public RepeatStatus doInIteration(RepeatContext context) throws Exception {
	... 중략
    try {
        if (logger.isDebugEnabled()) {
            logger.debug("Chunk execution starting: queue size="+attributeQueue.size());
        }
        return doInChunkContext(context, chunkContext);
    }
    ... 중략
}

초반에 override 했던 doInChunkContext 를 호출함을 알 수 있다.

결론


(알고보니 나중에 RepeatTemplate 에 대한 내용을 알려주더라.. 조금 더 기다려보고 완강할껄 그랬나)

profile
A fast learner.

0개의 댓글