Schedule Executer Service

박근수·2024년 5월 27일
0

ssafy_PINN

목록 보기
4/4
  1. Game Start (5초)
  2. Stage 1 Start(30초)
  3. Stage 2 Start(20초)
  4. Round End(15초)
  5. Game End

1번,5번은 한 게임당 한 번만 발생하지만 2~4번의 이벤트는 사용자가 설정한 Round 반복 회수에 따라서 1회에서 5회까지 반복될 수 있었다.
서버는 시간에 따른 이벤트 발생을 처리하고 클라이언트들에게 메세지를 보내줘야 했다.
Thread.timesleep();
으로 동기 처리를 하는 것은 쓰레드가 멈추는 문제가 있으니 비동기로 이벤트를 발생시켜야 했다.
스프링의 batch@schedule을 활용하는 방법도 찾아 봤으나 문제는
1. 일정 시간 동안 이벤트가 발생하는 것은 좋은데 특정 반복 후에 멈추는 기능이 없다는 점.
2. 사용자의 게임 반복 회수, Stage 설정 시간에 따라서 유동적으로 적용하기 어렵다는 점.

이 2가지의 문제로 다른 비동기 Schedule Executer Service를 활용하기로 했다.

ScheduledExecutorService

Schedule Executer Service는 Executers 쓰레드를 생성해서 사용해야 한다.

   private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Executors는 쓰레드 풀을 활용해도 괜찮지만
이는 스프링 쓰레드로 관리되지 않는 쓰레드라는 것과 Scheduling만 시킬것이기 때문에 SingleThread만 사용했다.

ScheduleFuture 또한 Future를 상속받은 인터페이스다. 따라서 비동기로 처리되는 것을 알 수 있다. 나는 schedule 메소드를 활용했는데 delay만큼의 시간이 지나면 Runnable 또는 Callable이 실행된다.
(Callable은 리턴이 있고 Runnable은 리턴 값이 없는 인터페이스다)


    //게임 시작 Future
    public CompletableFuture<Integer> startGame(int gameId, int round) throws BaseException {
        log.info("{} game start after 5 sec : {}", gameId, LocalDateTime.now());
        notifyRemainingTime(gameId, 5, 1,0, ServerEvent.NOTIFY_LEFT_TIME);

        sendingOperations.convertAndSend("/game/sse/" + gameId,
                new ServerSendEvent(ServerEvent.START, round)); // #1201 game start
        CompletableFuture<Integer> future = new CompletableFuture<>();
        executorService.schedule(() -> {
            future.complete(round);
        }, 5, TimeUnit.SECONDS);
        return future;
    }
    public CompletableFuture<Integer> scheduleFuture(int gameId, int delayTime) throws BaseException {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        executorService.schedule(() -> {
            future.complete(gameId);
        }, delayTime, TimeUnit.SECONDS);
        return future;

스케줄링이 연속적으로 이어져야 하는데 ScheduleFuture는 CompletableFuture처럼 조합할 수 없어서 return을 CompletableFuture로 반환하도록 만들었다.

executorService.schedule의 delayTime이 지나면 Runnable이 호출된다. 이 때 CompletableFuture의 future.complete가 호출되고 CompletableFuture에서 다음 Future를 실행한다.

한 라운드당 조합되는 ScheduleFuture

public CompletableFuture<Integer> roundScheduler(int gameId, GameStartRequestDTO gameStartRequestDTO, int currentRound) {
        CompletableFuture<Integer> future = new CompletableFuture<>();

        // Round 시작
        log.info("{} Round {} Start: {}", gameStartRequestDTO.getGameId(), currentRound, LocalDateTime.now());
        sendingOperations.convertAndSend("/game/sse/" + gameId,
                new ServerSendEvent(ServerEvent.ROUND_START, currentRound)); // Game Start # 12

        notifyRemainingTime(gameId, gameStartRequestDTO.getStage1Time(), 1, currentRound, ServerEvent.NOTIFY_LEFT_TIME);
        scheduleFuture(gameId, gameStartRequestDTO.getStage1Time())
                .thenCompose(r -> { // Round Start stage 1
                    log.info("{} game stage 1 End : {}", gameStartRequestDTO.getGameId(), LocalDateTime.now());
                    sendingOperations.convertAndSend("/game/sse/" + gameId,
                            new ServerSendEvent(ServerEvent.STAGE_1_END, currentRound)); // send Hint and Stage 1 End # 1203
                    notifyRemainingTime(gameId, gameStartRequestDTO.getStage2Time(), 2, currentRound, ServerEvent.NOTIFY_LEFT_TIME);
                    return scheduleFuture(gameId, gameStartRequestDTO.getStage2Time());  // Stage 2 기다리기
                }).thenCompose(r -> { // Stage 2
                    log.info("{} game stage 2 End : {}", gameStartRequestDTO.getGameId(), LocalDateTime.now());
                    sendingOperations.convertAndSend("/game/sse/" + gameId,
                            new ServerSendEvent(ServerEvent.STAGE_2_END, currentRound)); // Stage 2 End go To Score # 1204
                    // 2스테이지 종료 > 해당 라운드 결과 집계
                    RoundFinishRequestDTO finishRequestDTO = new RoundFinishRequestDTO(gameStartRequestDTO.getSenderNickname(), gameStartRequestDTO.getSenderGameId(), gameStartRequestDTO.getSenderTeamId(), currentRound);
                    gameService.finishRound(finishRequestDTO);

                    notifyRemainingTime(gameId, gameStartRequestDTO.getScorePageTime(), 3, currentRound, ServerEvent.NOTIFY_LEFT_TIME);
                    return scheduleFuture(gameId, gameStartRequestDTO.getScorePageTime());
                }).thenCompose(r -> {
                    log.info("{} game {} Round End  : {}", gameStartRequestDTO.getGameId(), currentRound, LocalDateTime.now());
                    sendingOperations.convertAndSend("/game/sse/" + gameId,
                            new ServerSendEvent(ServerEvent.ROUND_END, currentRound)); // Round End stage 1, 2 score # 1205

                    notifyRemainingTime(gameId, 1, 4, currentRound, ServerEvent.NOTIFY_LEFT_TIME);
                    return scheduleFuture(gameId, 1);
                }).thenRun(() -> {
                    future.complete(currentRound);
                });
        return future;
    }

한 라운드당 scheduleFuture가 여러개가 적용된다.

ScheduleFuture에 CompletableFuture를 결합해서 스케줄링이 연속적으로 진행될 수 있게 했다.

	@DisplayName("시작 시간 포함 라운드 시간 관리")
    @Test
    public void testGameProcessAsync() throws ExecutionException, InterruptedException {
        // given
        LocalDateTime startTime = LocalDateTime.now();
        AtomicReference<LocalDateTime> asyncEndTime = new AtomicReference<>();
        GameStartRequestDTO requestDTO = new GameStartRequestDTO("testUser", 1, 1, 1, 3, 2, 3, 4);
        int currentRound = 0;
        int roundCount = requestDTO.getRoundCount();
        AtomicBoolean isPass = new AtomicBoolean(false);
        AtomicInteger executedRounds = new AtomicInteger(0);

        // when
        scheduleProvider.startGame(requestDTO.getGameId(), currentRound)
                .thenCompose(v -> {
                    // 초기 체인 생성
                    CompletableFuture<Integer> roundChain = CompletableFuture.completedFuture(currentRound);

                    // 각 라운드에 대한 비동기 작업 체인 구축
                    for (int round = 1; round <= roundCount; round++) {
                        final int currentRoundInLoop = round;
                        roundChain = roundChain.thenCompose(ignored -> {
                            executedRounds.incrementAndGet(); // 라운드 실행 횟수 증가
                            return scheduleProvider.roundScheduler(requestDTO.getGameId(), requestDTO, currentRoundInLoop);
                        });
                        RoundFinishRequestDTO finishRequestDTO = new RoundFinishRequestDTO(requestDTO.getSenderNickname(), requestDTO.getSenderGameId(), requestDTO.getSenderTeamId(), currentRound);
                        gameService.finishRound(finishRequestDTO);
                    }
                    // 마지막 결과를 설정
                    return roundChain;
                }).get();

        log.info(startTime + " Async operation ended at: "+ LocalDateTime.now());
        asyncEndTime.set(LocalDateTime.now());
        long compare = ChronoUnit.SECONDS.between(startTime, asyncEndTime.get());
        long expectedTime = (2 + 3 + 4 + 1) * 3 + 5; // 각 스테이지 시간의 합 (초 단위로 계산) / stage 1, 2, score, wait

        // 비교를 위해 오차 범위 설정
        long tolerance = 1;
        long minTime = expectedTime - tolerance;
        long maxTime = expectedTime + tolerance;

        isPass.set(true);
        assertEquals(roundCount, executedRounds.get(), "roundScheduler 호출 횟수가 예상한 반복 횟수와 다릅니다.");
        assertTrue(compare >= minTime && compare <= maxTime, "scheduler가 예상 시간 범위에서 벗어났습니다. "+ compare +" "+ expectedTime);

    }

해당 Schedule의 기능들이 제대로 작동하는지 확인하기 위한 테스트 코드를 작성했다. 오차를 1초로 설정하긴 했으나 밀리세컨즈 단위의 오차가 있었다.

profile
개성이 확실한편

0개의 댓글