from mcp.server.fastmcp import FastMCP
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload
import io
import base64
from typing import Optional, List, Dict, Any
# 설정
SCOPES = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = "<<서비스계정 json 파일>>"
# MCP 서버 초기화
mcp = FastMCP("drive_project")
# Google Drive 서비스 객체 생성
def get_drive_service():
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
return build('drive', 'v3', credentials=creds)
# Google Drive 파일 목록 조회
@mcp.tool(
name="list_drive_files",
description="Google Drive에서 최근 파일 목록을 조회합니다."
)
def list_drive_files() -> dict:
service = get_drive_service()
page_size = 10
results = service.files().list(
pageSize=page_size,
fields="files(id, name, mimeType, modifiedTime)"
).execute()
files = results.get("files", [])
return {"files": files}
# Google Drive에 파일 업로드 (공유 폴더 지원)
@mcp.tool(
name="upload_drive_file",
description="base64로 인코딩된 파일을 Google Drive에 업로드합니다. file_name과 mime_type을 입력하고, folder_id를 지정하면 공유 폴더에 업로드됩니다."
)
def upload_drive_file(
file_name: str,
base64_data: str,
mime_type: str = "application/octet-stream",
folder_id: str = None # 📂 공유 폴더 ID를 입력하면 해당 폴더에 업로드
) -> dict:
service = get_drive_service()
file_bytes = base64.b64decode(base64_data)
media_body = MediaIoBaseUpload(io.BytesIO(file_bytes), mimetype=mime_type)
file_metadata = {'name': file_name}
if folder_id:
file_metadata['parents'] = [folder_id]
uploaded_file = service.files().create(
body=file_metadata,
media_body=media_body,
fields='id'
).execute()
return {"uploaded_file_id": uploaded_file.get("id")}
# Google Drive 파일 다운로드
@mcp.tool(
name="download_drive_file",
description="file_id로 Google Drive의 파일을 다운로드하고 base64 인코딩된 내용으로 반환합니다."
)
def download_drive_file(file_id: str) -> dict:
service = get_drive_service()
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
encoded = base64.b64encode(fh.getvalue()).decode("utf-8")
return {"base64_file_content": encoded}
# Google Drive 파일 삭제
@mcp.tool(
name="delete_drive_file",
description="file_id로 Google Drive의 파일을 삭제합니다."
)
def delete_drive_file(file_id: str) -> dict:
service = get_drive_service()
try:
service.files().delete(fileId=file_id).execute()
return {"status": "success", "message": f"File {file_id} has been deleted successfully"}
except Exception as e:
return {"status": "error", "message": str(e)}
# Google Drive 디렉토리 생성
@mcp.tool(
name="create_drive_folder",
description="Google Drive에 새로운 디렉토리를 생성합니다. parent_folder_id를 지정하면 해당 폴더 안에 생성됩니다."
)
def create_drive_folder(folder_name: str, parent_folder_id: str = None) -> dict:
service = get_drive_service()
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
if parent_folder_id:
file_metadata['parents'] = [parent_folder_id]
folder = service.files().create(
body=file_metadata,
fields='id'
).execute()
return {"folder_id": folder.get("id")}
# Google Drive 파일/폴더 이동
@mcp.tool(
name="move_drive_file",
description="Google Drive의 파일이나 폴더를 다른 폴더로 이동합니다."
)
def move_drive_file(file_id: str, new_parent_id: str) -> dict:
service = get_drive_service()
try:
# 현재 부모 폴더 조회
file = service.files().get(
fileId=file_id,
fields='parents'
).execute()
previous_parents = ",".join(file.get('parents', []))
# 파일 이동
file = service.files().update(
fileId=file_id,
addParents=new_parent_id,
removeParents=previous_parents,
fields='id, parents'
).execute()
return {
"status": "success",
"message": f"File/Folder has been moved successfully",
"new_parents": file.get('parents', [])
}
except Exception as e:
return {"status": "error", "message": str(e)}
# Google Drive 파일/폴더 복사
@mcp.tool(
name="copy_drive_file",
description="Google Drive의 파일이나 폴더를 복사합니다. 대상 폴더 ID를 지정하면 해당 폴더에 복사됩니다."
)
def copy_drive_file(file_id: str, new_name: str = None, parent_id: str = None) -> dict:
service = get_drive_service()
try:
# 원본 파일 정보 조회
file = service.files().get(
fileId=file_id,
fields='name, mimeType'
).execute()
# 복사할 파일의 메타데이터 설정
body = {}
if new_name:
body['name'] = new_name
else:
body['name'] = f"Copy of {file.get('name')}"
if parent_id:
body['parents'] = [parent_id]
# 파일이 폴더인 경우
if file.get('mimeType') == 'application/vnd.google-apps.folder':
# 새 폴더 생성
new_folder = service.files().create(
body={
'name': body['name'],
'mimeType': 'application/vnd.google-apps.folder',
'parents': body.get('parents', [])
},
fields='id'
).execute()
# 원본 폴더 내의 파일들 조회
results = service.files().list(
q=f"'{file_id}' in parents",
fields="files(id, name, mimeType)"
).execute()
# 하위 파일들 재귀적으로 복사
for item in results.get('files', []):
copy_drive_file(item['id'], item['name'], new_folder['id'])
return {
"status": "success",
"message": "Folder has been copied successfully",
"new_file_id": new_folder['id']
}
else:
# 일반 파일 복사
copied_file = service.files().copy(
fileId=file_id,
body=body,
fields='id'
).execute()
return {
"status": "success",
"message": "File has been copied successfully",
"new_file_id": copied_file['id']
}
except Exception as e:
return {"status": "error", "message": str(e)}
@mcp.tool(
name="base64_to_file",
description="Google Drive 에서 받은 base64 데이터를 파일로 저장합니다."
)
def base64_to_file(base64_data: str, output_file: str) -> str:
"""
Google Drive 에서 받은 base64 데이터를 파일로 저장합니다.
Args:
base64_data (str): Base64 인코딩 문자열
output_file (str): 출력 파일 경로
Returns:
str: 성공 또는 오류 메시지
"""
try:
# 잠재적인 데이터 URI 스킴 헤더 제거
if ',' in base64_data:
base64_data = base64_data.split(',')[1]
# 공백 또는 줄바꿈 제거
base64_data = base64_data.strip()
# base64 데이터 디코딩
file_data = base64.b64decode(base64_data)
# 파일 쓰기
with open(output_file, 'wb') as f:
f.write(file_data)
return f"Successfully created file: {output_file}"
except Exception as e:
raise Exception(f"Error creating file: {str(e)}")
@mcp.tool(
name="file_to_base64",
description="파일을 base64 문자열로 변환합니다. 기본적으로 base64 문자열을 반환하거나, 출력 파일 경로를 지정하면 파일로 저장합니다."
)
def file_to_base64(input_file: str, output_file: Optional[str] = None) -> str:
"""
파일을 base64 문자열로 변환합니다. 기본적으로 base64 문자열을 반환하거나, 출력 파일 경로를 지정하면 파일로 저장합니다.
Args:
input_file (str): 입력 파일 경로
output_file (str, optional): 출력 파일 경로. 지정하지 않으면 base64 문자열을 반환합니다.
Returns:
str: Base64 인코딩 문자열 또는 출력 파일 경로를 지정한 경우 성공 메시지
"""
try:
# 이진 모드로 파일 읽기
with open(input_file, 'rb') as f:
file_data = f.read()
# base64 인코딩
base64_data = base64.b64encode(file_data).decode('utf-8')
# 파일 쓰기 또는 문자열 반환
if output_file:
with open(output_file, 'w') as f:
f.write(base64_data)
return f"Successfully wrote base64 data to: {output_file}"
else:
return base64_data
except Exception as e:
raise Exception(f"Error processing file: {str(e)}")
# MCP 서버 실행
if __name__ == "__main__":
mcp.run(transport="stdio")
"drive_project": {
"command": "uv",
"args": [
"run",
"D:/google-mcp/google_drive_tool.py"
]
}
파일을 올릴 때 base64 인코딩이 되어 있어야 해서 툴로 놓았다.
파일을 올릴 때에 경로를 물어봐주는 친절함...
파일이 이동되어도 id 값은 그대로임.
파일을 그냥 복사하면 "Copy of" 가 prefix 가 된다.
일단 여기다가 STT 를 붙여서 음성으로 명령을 내리고 싶다. 작동 자체는 너무 잘되는 것을 확인하였고 어차피 서비스 계정의 드라이브이니 테스트용으로는 손색이 없다.
그리고 cursor ai 의 premium 500 개가 끝나서 gemini-2.5-pro-exp 를 사용하는데 왜 잘되는 걸까?