django orm으로 lichess 분석 따라잡기

김형수·2023년 3월 9일
0

lichess database에서 추출한 74만개의 체스 기보 데이터를 가지고 lichess나 chess.com에서 제공하는 analze기능을 따라잡아보자

체스는 전략과 계획이 필요한 대표적인 보드 게임. 체스에서 성공하기 위해서는 다양한 기술과 전략을 학습하고 연습해야 한다. 실력을 향상시키기 위해서는 자신의 게임을 분석하고, 상대방의 전략을 이해하는 것이 매우 중요하다.

lichess나 chess.com과 같은 사이트들에서 여러가지 퍼즐이나 여러가지 상황에 따른 다음 수들을 실제 행해졌던 게임들에서 수집하여 보여주는 기능들이 있고, 여러가지 체스 오프닝들이나, 전략을 공부할 수 있는 퍼즐과 같은 기능들을 제공한다.

평소 체스에 관심이 많았던 사람으로써, 개인 프로젝트로 lichess나 chess.com사이트의 기능들을 클론하고, 내가 학습하기 용이한 기능들을 추가하여, 개인적인 체스 분석, 연습 사이트를 만드는 프로젝트를 진행하고 있다.

나만의 체스 분석 프로그램을 만들어보자 프로젝트

이번 포스팅은 lichess 분석기능에 대해서 정리한다.

기존 방법

기존의 방법은 lichess database에서 제공하는 pgn파일들을 가져와 데이터를 수집하고 정제하여 데이터베이스에 적재하는 파이프라인을 구축하였다.

파이프라인 코드 예시


class Processor:

    def __init__(self, fpath, num, checkpoint, is_start=False) -> None:
        self.df = pd.DataFrame()
        self.num = num
        self.msg = 'done'
        self.fpath = fpath
        self.checkpoint = checkpoint
        self.status = False
        self.next_checkpoint = checkpoint + num
        self.is_start = False

        b, m = self.parse_data_to_csv()
        if b:
            self.msg = self.store_data()
        else:
            self.msg = m

    def parse_data_to_csv(self):
        pgn = open(self.fpath, 'r')
        if self.is_start:
            for _ in range(1):
                game = chess.pgn.read_game(pgn)

        for i in range(self.checkpoint):
            game = chess.pgn.read_game(pgn)

        game = chess.pgn.read_game(pgn)
        num = 0
        if game:
            while game != None:
                if num == self.num:
                    break
                if not self.df.empty:
                    df = pd.DataFrame(
                        list(game.headers.values()) + [self.san_to_uci(game.mainline_moves())]).T
                    df.columns = list(game.headers.keys()) + ['mainline']
                    self.df = pd.concat([self.df, df], ignore_index=True)
                else:
                    self.df = pd.DataFrame(
                        list(game.headers.values()) + [self.san_to_uci(game.mainline_moves())]).T
                    self.df.columns = list(game.headers.keys()) + ['mainline']

                num += 1
                game = chess.pgn.read_game(pgn)

            if game == None:
                self.status = True
                self.df.drop(self.df.index[-1])

            return True, 'done'
        else:
            return False, f'{self.fpath} is already done.'
            
            .......

lichess database에서 제공하는 파일들은 기본적으로 크기가 매우 크기 때문에 파일을 분산하여 처리하는 api를 만들어 데이터베이스에 저장했다.

여기서 저장한 데이터를 활용하여, 체스의 id라고 할 수 있는 fen을 활용하여 각각의 fen마다 사람들이 둔 수들을 하나씩 세서 데이터베이스에 저장했다.

@api.get('/mainline')
def parse_mainline(request, num: int):
    pool = Pool(2)
    first_pk = ChessNotation.objects.first()
    try:
        checkpoint = ChessMainline.objects.all()[0]
        if checkpoint.checkpoint < first_pk.pk:
            checkpoint.checkpoint = first_pk.pk
    except:
        checkpoint = ChessMainline.objects.create(checkpoint=first_pk.pk)
    data = ChessNotation.objects.filter(
        pk__gte=checkpoint.checkpoint, pk__lt=checkpoint.checkpoint + num)

    checkpoint.checkpoint += num

    checkpoint.save()
    if data:
        # MainlineProcessor(data)

        pool.map(MainlineProcessor, data)

    return
class MainlineProcessor:
    def __init__(self, data: ChessNotation) -> None:
        self.data = data
        self.df = pd.DataFrame()
        self.current = None

        self.parse()

    def process(self, next_move, result, fen):
        try:
            start = ChessProcess.objects.get(fen=fen)
        except:
            try:
                start = ChessProcess.objects.create(fen=fen)
            except:
                return
        try:
            created_move = start.next_moves.get(next_move=next_move)
        except:
            created_move = ChessFenNextMoves.objects.create(
                fen=start, white=0, draw=0, black=0, next_move=next_move, cnt=0)

        created_move.cnt += 1
        if result == 'white':
            created_move.white += 1
        elif result == 'black':
            created_move.black += 1
        else:
            created_move.draw += 1

        created_move.save()

    def parse(self):
        i = self.data
        # for i in self.data:
        board = chess.Board()
        moves = i.mainline.split(',')

        self.process(moves[0], i.result, board.fen())

        for cnt in range(len(moves) - 1):
            board.push_uci(moves[cnt])
            fen = board.fen()

            self.process(moves[cnt+1], i.result, fen)

python의 내장 라이브러리인 multiprocessing을 활용했음에도 불구하고 100개의 데이터를 정제하는데 대략 30초의 시간이 소요됐고, 각각의 fen을 따로 데이터베이스에 저장함에 따라 100000개의 기보를 정제하는데 3000000만개의 fen 데이터가 생성되는 아주 비효율적인 방법을 사용하였다.

수정된 방법

수정된 방법은 아주 간단하고 빠르다. django orm기능을 적극 활용한다.

import chess

board = chess.Board()
legal_moves = list(board.legal_moves)

for move in legal_moves:
	print(<database model name>.objects.filter(mainline__startswith = move).count(), move)
countmove
141g1h3
21899g1f3
3799b1c3
60b1a3
610h2h3
13416g2g3
907f2f3
19884e2e3
6709d2d3
1927c2c3
10138b2b3
884a2a3
798h2h4
2894g2g4
7503f2f4
427490e2e4
181418d2d4
23802c2c4
4474b2b4
737a2a4

정말 간단하다..
머리가 나쁘면 몸이 고생한다

1개의 댓글

comment-user-thumbnail
2023년 10월 22일

좋은 포스팅 감사합니다~

답글 달기