Airflow task 간 데이터 공유
- airflow 의 task 는 독립적으로 실행
- 메모리에서 데이터 공유 불가
- task 간 데이터는 별도로 저장해야 읽을 수 있다
- XCom 으로 meta store 에서
picklable
직렬화 객체를 디스크에 저장하고 읽는 방식
- DB 나 파일 등 외부 저장소에 저장하는 방법
PostgresOperator
- 추가 의존성 설치
pip3 install apache-airflow-providers-postgres
- DB 접근하기 위해 credential 저장
- 등록 후 UI 에서
Admin > Connections
에서 확인 가능
airflow connections add \
--conn-type postgres \
--conn-host <host> \
--conn-login <username> \
--conn-password <password> \
<connection 식별자>
PostgresOperator
를 통해 SQL 실행
hook
을 통해 Postgres 연결 생성, 쿼리 실행, 연결 종료
- operator 자체는 요청을 hook 으로 전달하는 것만 수행
postgres_conn_id
로 자격증명 지정
template_searchpath
: 파일을 탐색할 경로를 추가로 지정
template_ext
로 지정된 확장자(.sql)가 일치하면 SQL template화
from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
dag = DAG(
dag_id="listing_4_20",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
template_searchpath="/tmp",
max_active_runs=1,
)
def _get_data(year, month, day, hour, output_path):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)
def _fetch_pageviews(pagenames, execution_date):
result = dict.fromkeys(pagenames, 0)
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts
with open("/tmp/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', {pageviewcount}, '{execution_date}'"
");\n"
)
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres",
sql="postgres_query.sql",
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews >> write_to_postgres
reference