[AI] mcp 로 Google Drive 만져보자.

늘 공부하는 괴짜·2025년 4월 30일
0

AI : mcp

목록 보기
6/9

1. google_drive_tool.py

from mcp.server.fastmcp import FastMCP
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload
import io
import base64
from typing import Optional, List, Dict, Any

# 설정
SCOPES = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = "<<서비스계정 json 파일>>"

# MCP 서버 초기화
mcp = FastMCP("drive_project")

# Google Drive 서비스 객체 생성
def get_drive_service():
    creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
    return build('drive', 'v3', credentials=creds)

# Google Drive 파일 목록 조회
@mcp.tool(
    name="list_drive_files",
    description="Google Drive에서 최근 파일 목록을 조회합니다."
)
def list_drive_files() -> dict:
    service = get_drive_service()
    page_size = 10
    results = service.files().list(
        pageSize=page_size,
        fields="files(id, name, mimeType, modifiedTime)"
    ).execute()
    files = results.get("files", [])
    return {"files": files}

# Google Drive에 파일 업로드 (공유 폴더 지원)
@mcp.tool(
    name="upload_drive_file",
    description="base64로 인코딩된 파일을 Google Drive에 업로드합니다. file_name과 mime_type을 입력하고, folder_id를 지정하면 공유 폴더에 업로드됩니다."
)
def upload_drive_file(
    file_name: str,
    base64_data: str,
    mime_type: str = "application/octet-stream",
    folder_id: str = None  # 📂 공유 폴더 ID를 입력하면 해당 폴더에 업로드
) -> dict:
    service = get_drive_service()
    file_bytes = base64.b64decode(base64_data)
    media_body = MediaIoBaseUpload(io.BytesIO(file_bytes), mimetype=mime_type)

    file_metadata = {'name': file_name}
    if folder_id:
        file_metadata['parents'] = [folder_id]

    uploaded_file = service.files().create(
        body=file_metadata,
        media_body=media_body,
        fields='id'
    ).execute()

    return {"uploaded_file_id": uploaded_file.get("id")}

# Google Drive 파일 다운로드
@mcp.tool(
    name="download_drive_file",
    description="file_id로 Google Drive의 파일을 다운로드하고 base64 인코딩된 내용으로 반환합니다."
)
def download_drive_file(file_id: str) -> dict:
    service = get_drive_service()
    request = service.files().get_media(fileId=file_id)
    fh = io.BytesIO()
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while not done:
        _, done = downloader.next_chunk()

    encoded = base64.b64encode(fh.getvalue()).decode("utf-8")
    return {"base64_file_content": encoded}

# Google Drive 파일 삭제
@mcp.tool(
    name="delete_drive_file",
    description="file_id로 Google Drive의 파일을 삭제합니다."
)
def delete_drive_file(file_id: str) -> dict:
    service = get_drive_service()
    try:
        service.files().delete(fileId=file_id).execute()
        return {"status": "success", "message": f"File {file_id} has been deleted successfully"}
    except Exception as e:
        return {"status": "error", "message": str(e)}

# Google Drive 디렉토리 생성
@mcp.tool(
    name="create_drive_folder",
    description="Google Drive에 새로운 디렉토리를 생성합니다. parent_folder_id를 지정하면 해당 폴더 안에 생성됩니다."
)
def create_drive_folder(folder_name: str, parent_folder_id: str = None) -> dict:
    service = get_drive_service()
    file_metadata = {
        'name': folder_name,
        'mimeType': 'application/vnd.google-apps.folder'
    }
    if parent_folder_id:
        file_metadata['parents'] = [parent_folder_id]

    folder = service.files().create(
        body=file_metadata,
        fields='id'
    ).execute()
    
    return {"folder_id": folder.get("id")}

# Google Drive 파일/폴더 이동
@mcp.tool(
    name="move_drive_file",
    description="Google Drive의 파일이나 폴더를 다른 폴더로 이동합니다."
)
def move_drive_file(file_id: str, new_parent_id: str) -> dict:
    service = get_drive_service()
    try:
        # 현재 부모 폴더 조회
        file = service.files().get(
            fileId=file_id,
            fields='parents'
        ).execute()
        previous_parents = ",".join(file.get('parents', []))

        # 파일 이동
        file = service.files().update(
            fileId=file_id,
            addParents=new_parent_id,
            removeParents=previous_parents,
            fields='id, parents'
        ).execute()

        return {
            "status": "success",
            "message": f"File/Folder has been moved successfully",
            "new_parents": file.get('parents', [])
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}

# Google Drive 파일/폴더 복사
@mcp.tool(
    name="copy_drive_file",
    description="Google Drive의 파일이나 폴더를 복사합니다. 대상 폴더 ID를 지정하면 해당 폴더에 복사됩니다."
)
def copy_drive_file(file_id: str, new_name: str = None, parent_id: str = None) -> dict:
    service = get_drive_service()
    try:
        # 원본 파일 정보 조회
        file = service.files().get(
            fileId=file_id,
            fields='name, mimeType'
        ).execute()

        # 복사할 파일의 메타데이터 설정
        body = {}
        if new_name:
            body['name'] = new_name
        else:
            body['name'] = f"Copy of {file.get('name')}"
        
        if parent_id:
            body['parents'] = [parent_id]

        # 파일이 폴더인 경우
        if file.get('mimeType') == 'application/vnd.google-apps.folder':
            # 새 폴더 생성
            new_folder = service.files().create(
                body={
                    'name': body['name'],
                    'mimeType': 'application/vnd.google-apps.folder',
                    'parents': body.get('parents', [])
                },
                fields='id'
            ).execute()
            
            # 원본 폴더 내의 파일들 조회
            results = service.files().list(
                q=f"'{file_id}' in parents",
                fields="files(id, name, mimeType)"
            ).execute()
            
            # 하위 파일들 재귀적으로 복사
            for item in results.get('files', []):
                copy_drive_file(item['id'], item['name'], new_folder['id'])
            
            return {
                "status": "success",
                "message": "Folder has been copied successfully",
                "new_file_id": new_folder['id']
            }
        else:
            # 일반 파일 복사
            copied_file = service.files().copy(
                fileId=file_id,
                body=body,
                fields='id'
            ).execute()
            
            return {
                "status": "success",
                "message": "File has been copied successfully",
                "new_file_id": copied_file['id']
            }
            
    except Exception as e:
        return {"status": "error", "message": str(e)}





@mcp.tool(
    name="base64_to_file",
    description="Google Drive 에서 받은 base64 데이터를 파일로 저장합니다."
)
def base64_to_file(base64_data: str, output_file: str) -> str:
    """
    Google Drive 에서 받은 base64 데이터를 파일로 저장합니다.
    
    Args:
        base64_data (str): Base64 인코딩 문자열
        output_file (str): 출력 파일 경로
    
    Returns:
        str: 성공 또는 오류 메시지
    """
    try:
        # 잠재적인 데이터 URI 스킴 헤더 제거
        if ',' in base64_data:
            base64_data = base64_data.split(',')[1]
            
        # 공백 또는 줄바꿈 제거
        base64_data = base64_data.strip()
        
        # base64 데이터 디코딩
        file_data = base64.b64decode(base64_data)
        
        # 파일 쓰기
        with open(output_file, 'wb') as f:
            f.write(file_data)
            
        return f"Successfully created file: {output_file}"
        
    except Exception as e:
        raise Exception(f"Error creating file: {str(e)}")

@mcp.tool(
    name="file_to_base64",
    description="파일을 base64 문자열로 변환합니다. 기본적으로 base64 문자열을 반환하거나, 출력 파일 경로를 지정하면 파일로 저장합니다."
)
def file_to_base64(input_file: str, output_file: Optional[str] = None) -> str:
    """
    파일을 base64 문자열로 변환합니다. 기본적으로 base64 문자열을 반환하거나, 출력 파일 경로를 지정하면 파일로 저장합니다.
    
    Args:
        input_file (str): 입력 파일 경로
        output_file (str, optional): 출력 파일 경로. 지정하지 않으면 base64 문자열을 반환합니다.
    
    Returns:
        str: Base64 인코딩 문자열 또는 출력 파일 경로를 지정한 경우 성공 메시지
    """
    try:
        # 이진 모드로 파일 읽기
        with open(input_file, 'rb') as f:
            file_data = f.read()
        
        # base64 인코딩
        base64_data = base64.b64encode(file_data).decode('utf-8')
        
        # 파일 쓰기 또는 문자열 반환
        if output_file:
            with open(output_file, 'w') as f:
                f.write(base64_data)
            return f"Successfully wrote base64 data to: {output_file}"
        else:
            return base64_data
            
    except Exception as e:
        raise Exception(f"Error processing file: {str(e)}") 



# MCP 서버 실행
if __name__ == "__main__":
    mcp.run(transport="stdio")

2. mcp 등록

    "drive_project": {
      "command": "uv",
      "args": [
        "run",
        "D:/google-mcp/google_drive_tool.py"
      ]
    }   

3. 질의

3-1. 드라이브 파일 목록

3-2. 파일 업로드

파일을 올릴 때 base64 인코딩이 되어 있어야 해서 툴로 놓았다.
파일을 올릴 때에 경로를 물어봐주는 친절함...

3-3. 파일 다운로드

3-4. 폴더 생성

3-5. 파일 이동

파일이 이동되어도 id 값은 그대로임.

3-6. 파일 복사

파일을 그냥 복사하면 "Copy of" 가 prefix 가 된다.

3-7. 파일 삭제

3-8. 파일이 들어있는 폴더 삭제

Finally

일단 여기다가 STT 를 붙여서 음성으로 명령을 내리고 싶다. 작동 자체는 너무 잘되는 것을 확인하였고 어차피 서비스 계정의 드라이브이니 테스트용으로는 손색이 없다.
그리고 cursor ai 의 premium 500 개가 끝나서 gemini-2.5-pro-exp 를 사용하는데 왜 잘되는 걸까?

profile
인공지능이라는 옷을 입었습니다. 뭔가 멋지면서도 잘 맞습니다.

0개의 댓글