WSL에서
git clone https://github.com/jinman-kim/airflow-docker.git
cd airflow-docker && source airflow.sh
를 해서 들어가면
airflow.sh 안의 코드가
sudo apt update && apt-get update
sudo apt install docker
sudo apt install docker-compose
sudo docker-compose up -d
다음과 같아서 컨테이너도 백그라운드에서 켜진다.
현재 dags에 있는 binance_crawl.py의 코드로 에어플로우에서 일마다 크롤링을 해오는 코드가 작성되어 있다.
import airflow
import pendulum
import requests
import pandas as pd
from datetime import datetime, date, timedelta
import time
from binance.client import Client
import pandas as pd
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
import os
from dotenv import load_dotenv
#load_dotenv()
ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)
#api_key, api_secret = os.getenv('API_KEY'), os.getenv('API_SECRET')
api_key = '비밀'
api_secret = '비밀'
def _binance_api():
client = Client(api_key, api_secret)
symbols = client.get_all_tickers()
# interval = ["5MINUTE", "1HOUR", "4HOUR", "1DAY", "1WEEK", "1MONTH"]
symbols_usdt = []
for i in symbols:
if 'USDT' in i['symbol']:
symbols_usdt.append(i['symbol'])
symbols = symbols_usdt
# interval = ["5MINUTE", "1HOUR", "4HOUR", "1DAY", "1WEEK", "1MONTH"]
intervals = ['1MONTH']
for symbol in symbols[3:6]:
# ETHBTC : 5m, 1h, 4h, 1d, 1w, 1M
for interval in intervals:
klines = client.get_historical_klines(symbol, getattr(Client, f"KLINE_INTERVAL_{interval}"), "2 Jul, 2022", "4 Jul, 2023")
df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume',
'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignored'])
df = df.iloc[:,:6]
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
# Save the DataFrame as a CSV file
file_name = f"{symbol.lower()}_{interval.lower()}.csv"
df.to_csv(f'/home/airflow/data/{file_name}', index=False)
with DAG(
dag_id='binance_crawl',
start_date=airflow.utils.dates.days_ago(1),
> schedule_interval='@daily',
) as dag:
start = DummyOperator(task_id='start')
binance_api = PythonOperator(task_id='binance_crawl', python_callable=_binance_api)
start >> binance_api
이런 코드인데 csv파일로 바이낸스 api를 통해 가져온 데이터를 저장하는 코드이다.
참고
위에서
#load_dotenv()
ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)
#api_key, api_secret = os.getenv('API_KEY'),
주석처리한 load_dotenv()랑 api_key, api_secret부분은
원래는 보안상의 이유로 .env 파일을 따로 만들어서 api_key와 api_secret을 관리하기에 있는 부분인데 바로 위의 코드에서는 편의를 위해 .env 파일을 따로 만들어두지 않아서 주석처리 된 것이다.
.env 파일로 api_key와 api_secret을 관리하는 방법은
1. .env 파일 생성: 프로젝트 루트 디렉토리에 .env 파일을 생성
2. .env 파일 형식: .env 파일은 "키=값" 형식으로 변수와 값을 저장
그 예시는
API_KEY=your_api_key
API_SECRET=your_api_secret
다음과 같고
.env 파일은 api_key와 api_secret을 숨기기 위해 만든건데 그대로
git에 올린다면 의미가 없는 것이기에
.env 파일을 프로젝트의 버전 관리 시스템(.gitignore)에 추가하여 실수로 공개되지 않도록 한다.
그럼 어떻게 load하는지는 파이썬이냐 자바스크립트냐에 따라 다른데
파이썬에서는
from dotenv import load_dotenv
import os
# .env 파일 로드
load_dotenv()
# API 키 및 비밀 가져오기
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
위의 방식으로 load하게 된다.
에어플로우의 binance_crawl.py 코드의 수정을 통해 단순히 크롤링 해오고 데이터만 저장하는 것이 아니라 gcp에서 만든 cloud storage에 데이터를 저장하는 것이 목표이다.
우선은 Google Cloud Console에서 서비스 계정 키를 생성해야 GCS에 접근할 수 있다(Binance API 쓸 때도 api_key랑 api_secret 필요한 것처럼).
탐색 메뉴에서 IAM 및 관리자 > 서비스계정 > 작업 > 키관리 로 들어간 뒤 json 파일로 키 받으면 된다.
처음엔 local에서 /home/user/airflow-docker/dags 경로에 json 파일을 넣어두고 연결했었다.
api_key = '비밀'
api_secret = '비밀'
key_path = '/home/user/airflow-docker/dags/key.json'
그런데 계속 에러가 떠서
docker logs airflow-docker-airflow-1 해봤지만 역시 에러가 잘 안 나와서
docker exec -it airflow-docker-airflow-1 /bin/bash 으로 컨테이너 안으로 들어가서 log를 보기로 하였다.
그렇게 확인한 결과 key_path의 경로를 로컬에서의 경로가 아닌 컨테이너 내부의 path로 설정해줘야 한다는 것을 알게 되었다.
그래서 위의 key_path를
key_path = '/opt/airflow/dags/key.json'로 바꿔줘야 에러가 뜨지 않는다.
사실 처음 에러가 뜰 때의 코드 맨 밑 부분을 보면
upload_to_gcp_task = PythonOperator(
task_id='upload_to_gcp',
python_callable=upload_to_gcp,
op_args=[f'/home/airflow/data/', 'jinman_team_project', ''],
trigger_rule='all_success'
)
이런 부분이 있는데 op_args부분을 보면 컨테이너 내부 경로인 것이 보인다. 따라서 위에서도 컨테이너 내부 경로로 써주는 것이 맞았다는 것이 된다.
그리고 최종 코드는 원래
start >> binance_crawl >> upload_to_gcs 이런 과정으로 만드려고 했지만
그냥 단축해서
start >> binance_crawl 과정으로 버킷에 넣는것까지 완료하는 식으로 구현했다.
import airflow
import pendulum
import requests
import pandas as pd
from datetime import datetime, date, timedelta
import time
from binance.client import Client
import pandas as pd
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from google.cloud import storage
ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)
api_key = '비밀'
api_secret = '비밀'
key_path = '/opt/airflow/dags/key.json'
def _binance_api():
client = Client(api_key, api_secret)
symbols = client.get_all_tickers()
symbols_usdt = []
for i in symbols:
if 'USDT' in i['symbol']:
symbols_usdt.append(i['symbol'])
symbols = symbols_usdt
intervals = ['1MONTH']
for symbol in symbols[3:6]:
for interval in intervals:
klines = client.get_historical_klines(symbol, getattr(Client, f"KLINE_INTERVAL_{interval}"), "2 Jul, 2022", "4 Jul, 2023")
df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume',
'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignored'])
df = df.iloc[:,:6]
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
now = datetime.now()
date_str = now.strftime('%Y%m%d')
file_name = f"{symbol.lower()}_{interval.lower()}_{date_str}.csv"
file_path = f'/home/airflow/data/{file_name}'
df.to_csv(file_path, index=False)
# Upload the file to GCP
bucket_name = 'jinman_team_project'
destination_blob_name = file_name
# upload_to_gcp(file_path, 'jinman_team_project', file_name)
storage_client = storage.Client.from_service_account_json(key_path)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
# def upload_to_gcp(file_path, bucket_name, destination_blob_name):
# """Uploads a file to Google Cloud Storage."""
# storage_client = storage.Client.from_service_account_json(key_path)
# bucket = storage_client.bucket(bucket_name)
# blob = bucket.blob(destination_blob_name)
# blob.upload_from_filename(file_path)
with DAG(
dag_id='gcs',
start_date=airflow.utils.dates.days_ago(1),
schedule_interval='@daily',
) as dag:
start = DummyOperator(task_id='start')
binance_api = PythonOperator(task_id='binance_crawl', python_callable=_binance_api)
# upload_to_gcp_task = PythonOperator(
# task_id='upload_to_gcp',
# python_callable=upload_to_gcp,
# op_args=[f'/home/airflow/data/', 'jinman_team_project', ''],
# trigger_rule='all_success'
# )
start >> binance_api
이런 식으로 버킷에 csv파일로 데이터가 저장되어 있는 것을 확인할 수 있다.
너무 좋은 글이네요. 공유해주셔서 감사합니다.