pip3 install pytest --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org
bash_command
파라미터가 없으면 인스턴스화할 때 failtest_cycle()
: DAG 에 cycle 이 존재하는지 테스트import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# **: 디렉토리 반복 탐색 (recursive)
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/*.py")
DAG_FILES = glob.glob(DAG_PATH)
# dags 경로의 python 파일들에 대해 테스트
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# DAG module 로딩, 인스턴스화하면서 테스트
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
# 인스턴스화된 DAG 가 1개 이상인지 테스트
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
# 각 DAG 가 순환(cycle) 이 있는지 체크
for dag in dag_objects:
# Test cycles
dag.test_cycle()
pytest tests/
from airflow.operators.bash import BashOperator
# Bash Operator 는 context 불필요
def test_example():
task = BashOperator(task_id="test", bash_command="echo 'hello!'", xcom_push=True)
result = task.execute(context={})
assert result == "hello!"
pip3 install pytest-mock --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org
mocker
파라미터를 테스트 함수에 전달monkey-patch
: 런타임시 메타스토어 쿼리 대신 지정된 객체 반환get_connection()
호출 횟수 가 1회인지, conn_id
가 operator 에서 제공한 값과 일치하는지 테스트from airflow.models import Connection
def test_movielenspopularityoperator(mocker):
# mock_get: 동작을 캡처할 목업 변수
mock_get = mocker.patch.object(
# patch 할 객체
MovielensHook,
# patch 할 함수
# get_connection() 호출은 monkey-patch -> DB 호출 없이 예상되는 connection 객체 반환
"get_connection",
# return 할 값
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
task = MovielensPopularityOperator(
task_id="test_id",
conn_id="testconn",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5
# get_connection() 호출 횟수가 1회인지 확인
assert mock_get.call_count == 1
# Airflow metastore에서 요청될 것으로 예상, conn_id 가 Operator 에서 넘긴 값과 일치하는지 확인
mock_get.assert_called_with("testconn")
tempfile
: python 에서 임시 저장소 작업하는 모듈tmp_dir, tmp_path
등JsonToCsvOperator
를 정의했다고 가정import csv
import json
from pathlib import Path
# tmp_path 는 고정
def test_json_to_csv_operator(tmp_path: Path):
print(tmp_path.as_posix())
# 테스트에 사용할 경로 지정
input_path = tmp_path / "input.json"
output_path = tmp_path / "output.csv"
# 테스트 input json 데이터 정의, write
input_data = [
{"name": "bob", "age": "41", "sex": "M"},
{"name": "alice", "age": "24", "sex": "F"},
{"name": "carol", "age": "60", "sex": "F"},
]
with open(input_path, "w") as f:
f.write(json.dumps(input_data))
# task 실행
operator = JsonToCsvOperator(
task_id="test", input_path=input_path, output_path=output_path
)
operator.execute(context={})
# task 실행 후 생성된 output
with open(output_path, "r") as f:
reader = csv.DictReader(f)
result = [dict(row) for row in reader]
# output data 로 테스트 시행
assert result == input_data
operator.execute(context={})
와 같이 테스트시에는 ds 변수 사용 불가
operator.run()
을 직접 호출task 실행하기 위해, Airflow database 가 필요
airflow db init
으로 db 구성 필요<AIRFLOW_HOME>/airflow.db
SQLite 파일이 db 역할 수행pytest_docker_tools
: 테스트를 위한 docker container 를 위한 helper 제공fetch
: docker pull 하여 docker image 반환container
: container 실행pip3 install pytest_docker_tools pytest_mock --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org
conftest.py
: pytest 에서 fixture 기능으로 모든 테스트에서 공유, 적용# 모든 테스트에서 DAG 재사용 가능
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={"owner": "airflow", "start_date": datetime.datetime(2015, 1, 1)},
schedule_interval="@daily",
)
container
함수도 fixture 이므로, test parameter로 전달@pytest.fixture(scope="module")
def postgres_credentials():
PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
return PostgresCredentials("testuser", "testpass")
# docker 이미지 가져오기
postgres_image = fetch(repository="postgres:11.1-alpine")
# docker container 실행
postgres = container(
# docker image id
image="{postgres_image.id}",
environment={
"POSTGRES_USER": "{postgres_credentials.username}",
"POSTGRES_PASSWORD": "{postgres_credentials.password}",
},
# ports key 는 container, value 는 host port (None 일 경우 임의의 포트)
ports={"5432/tcp": None},
# 실제 DB 스키마와 데이터로 초기화하고자 할 경우, postgres-init.sql 파일을 volume 마운트
volumes={
os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
"bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
}
},
)
import datetime
import os
from collections import namedtuple
from pathlib import Path
import pytest
from airflow.models import DAG, Connection
from pytest_docker_tools import fetch, container
from pytest_mock import MockFixture
# test_dag, postgres container, credentials fixture 를 인자로 받아서 테스트
def test_movielens_to_postgres_operator(
mocker: MockFixture, test_dag: DAG, postgres, postgres_credentials
):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
# postgres container, postgres_credentials fixture 정보를 이용해 postgres 목업 container 로 연결
mocker.patch.object(
PostgresHook,
"get_connection",
return_value=Connection(
conn_id="postgres",
conn_type="postgres",
host="localhost",
login=postgres_credentials.username,
password=postgres_credentials.password,
# container 설정에 지정된 host port
port=postgres.ports["5432/tcp"][0],
),
)
task = MovielensToPostgresOperator(
task_id="test",
movielens_conn_id="movielens_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
postgres_conn_id="postgres_id",
insert_query=(
"INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
"VALUES ({0}, '{{ macros.datetime.now() }}')"
),
dag=test_dag,
)
pg_hook = PostgresHook()
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count == 0
pytest.helpers.run_airflow_task(task, test_dag)
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count > 0