[Quartz] 이기종 DB Transaction

S_H_H·2023년 6월 22일
0

Quartz And Batch

목록 보기
8/8
post-thumbnail

Oracle 에서 읽은 데이터를 Mssql로 Insert Job이 있다.

에러가 발생하지 않지 않는다면 문제가 없겠지만,
procedure에서 Exception이 발생한다면 그 결과를 사용자가 알 수 있도록 Update해야한다.

보통 에러가 발생했을 때 양쪽 DB 모두 Rollback을 하는 것이 일반적이다.
지금 발생한 케이스는 한쪽은 Rollback 처리를 하고, 다른 한쪽은 Commit를 해야 한다.

반대로 정상 처리되었을 때는 양쪽 다 Commit을 해야한다.
양쪽 DB간 에러가 발생 했을 때 고안했던 과정을 적어볼려고 합니다.

Step

한 Step안에서 read, process, writer가 처리 됨으로 같은 Step에서 읽고 쓸때 하나의 Transaction으로 관리되어야 한다.

Step에서 어떤점을 수정해야하는지 알아보자

transactionManager

ChainedTransactionManager는 분리된 DB에 대해 Transaction을 하나로 묶어주는 역할을 한다.

Commit 등록된 TransactionManager에 대해 역순으로 처리합니다.
만약 여러 트랜잭션에 대해 Commit 중 에러 발생 시 미 Commit에 대해서 Rollback을 한다

해당 메소드를 사용한다고 해도 완벽한 데이터 일괄성을 보여주기는 힘듭니다.
실제로 에러가 발생 시, 방어 로직이 필요합니다.

    PlatformTransactionManager chainTxManager() {
        ChainedTransactionManager txManager = new ChainedTransactionManager(oracleTransactionManager, mssqlTransactionManager);
        return txManager;
    }

writer

분리된 DB에 다른 query를 사용해야 하니 Delegates를 사용해서 하나의 Writer에서 다중 쿼리를 실행할 수 있도록 했습니다.

여기서 oracleWriter는 에러가 미 발생 시 사용하는 쿼리 입니다.

    public CompositeItemWriter<dtoModel> test_Writer() {
        CompositeItemWriter<dtoModel> compositeItemWriter = new CompositeItemWriter<>();
        compositeItemWriter.setDelegates(Arrays.asList(oracleWriter(), mssqlWriter()));

        return compositeItemWriter;
    }

faultTolerant

Exception이 발생 했을 때 Step이 종료되지 않고 이후 처리가 가능

skip

faultTolerant 활성화 필요

  • Step 처리 중 에러 종류에 따른 Skip 처리 여부
  • Skip 처리 횟수 제한

processorNonTransactional

processor non transaction 영어 뜻 그대로 process에 따로 기입된 트랜잭션이 없다

만약 Chunk 단위로 처리 중 writer 중 에러 발생 시, Chunk의 처음 데이터 부터 process가 실행하게 된다.

위 옵션을 통해 에러가 발생해도 해당 row만 제거된 상태로 writer가 실햄됨으로 process가 다시 시작하지 않는다.

listener

SkipListener 를 사용해서 Skip이 일어난 데이터에 대해 후 처리를 했습니다.
이 부분이 에러가 발생 시, 에러에 대한 내용을 Update하기 위해 만들어진 로직입니다.

SkipListener는 총 3개로 나눠져 있습니다.

  • onSkipInRead
  • onSkipInProcess
  • onSkipInWrite

Skip 발생 한 원인으로 부터 listener가 호출되게 됩니다.
write 중 에러가 발생함으로 후 처리 로직을 기입했습니다.

    public class skipListener implements SkipListener<dtoModel, dtoModel> {
        @Override
        public void onSkipInRead(Throwable t) {
            log.info("Skipped at Read due to: {}", t.getMessage());
        }

        @Override
        public void onSkipInWrite(dtoModel item, Throwable t) {
            log.info("Item {} was skipped at Writer due to: {}", item.getInfo().get(0).getTest(), t.getMessage());

            SQLException se = (SQLException) ((DataAccessException) t).getRootCause();

			while (se != null) {
				log.debug("SQLException SQLState is " + se.getSQLState());
				log.debug("SQLException SQLErrorCode is " + se.getErrorCode());
				log.debug("SQLException Message is " + se.getMessage());
				log.debug("SQLException StackTrace is " + se.getStackTrace());

				se = se.getNextException();
			}

            testService.updateFail(item, t.getMessage());
        }

        @Override
        public void onSkipInProcess(dtoModel item, Throwable t) {
            log.info("Item {} was skipped at Process due to: {}", item.getInfo().get(0).getTest(), t.getMessage());
        }
    }

Total

    public Step test_step() {
        return stepBuilderFactory
                .get("test_step")
                .transactionManager(chainTxManager())
                .<dtoModel, dtoModel>chunk(100)
                .reader(test_Reader())
                .processor(test_Processor())
                .writer(test_Writer())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(9999)
                .listener(new skipListener())
                .processorNonTransactional()
                .build();
    }

이렇게 정상 처리되었을 때와 에러가 발생했을 때를 한쪽은 Rollback이지만 다른 한쪽은 Update를 할 수 있도록 처리했습니다.

profile
LEVEL UP

0개의 댓글