[Airflow] pytest 로 테스트하기

Woong·2024년 10월 22일
0

Apache Airflow

목록 보기
5/9
  • 개별 태스크는 단위 테스트로 테스트
  • 여러 태스크 동작은 통합 테스트로 테스트

pytest 의존성 설치

  • pytest 설치

pip3 install pytest --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org

DAG integrity test (DAG 무결성 테스트)

  • 모든 DAG 가 정상적으로 구현되었는지 테스트
    • ex) task id 중복, cycle (순환) 포함 등
예제
  • 인스턴스화하며 검사 수행
    • ex) BashOperator 에서 bash_command 파라미터가 없으면 인스턴스화할 때 fail
  • test_cycle(): DAG 에 cycle 이 존재하는지 테스트
import glob
import importlib.util
import os

import pytest
from airflow.models import DAG

# **: 디렉토리 반복 탐색 (recursive)
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/*.py")
DAG_FILES = glob.glob(DAG_PATH)


# dags 경로의 python 파일들에 대해 테스트
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    # DAG module 로딩, 인스턴스화하면서 테스트
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)

    # 인스턴스화된 DAG 가 1개 이상인지 테스트
    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
    assert dag_objects

	# 각 DAG 가 순환(cycle) 이 있는지 체크
    for dag in dag_objects:
        # Test cycles
        dag.test_cycle()
  • 테스트 실행

pytest tests/

BashOperator 테스트 예제
  • BashOperator 는 context 불필요
    • 다른 operator 는 필요시 context key-value 전달
  • execute() 를 직접 호출하여 테스트
from airflow.operators.bash import BashOperator

# Bash Operator 는 context 불필요
def test_example():
    task = BashOperator(task_id="test", bash_command="echo 'hello!'", xcom_push=True)
    result = task.execute(context={})
    assert result == "hello!"

단위테스트에서 mocking 하기

  • DAG 를 독립적으로 테스트하기 위해, 테스트간 정보는 mocking 으로 대응
    • pip3 install pytest-mock --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org
    • mocker 파라미터를 테스트 함수에 전달
      • monkey-patch : 런타임시 메타스토어 쿼리 대신 지정된 객체 반환
외부 시스템 호출을 mocker 로 대체하는 예제
  • get_connection() mocking
  • 목업 변수를 통해 동작을 캡처하고, 동작 검증
    • get_connection() 호출 횟수 가 1회인지,
    • conn_id 가 operator 에서 제공한 값과 일치하는지 테스트
  • 유의사항: mocker 정의는 정의부가 아닌 호출부에서 구성
from airflow.models import Connection

def test_movielenspopularityoperator(mocker):   
    # mock_get: 동작을 캡처할 목업 변수
    mock_get = mocker.patch.object(
    	# patch 할 객체
        MovielensHook,
        # patch 할 함수
        # get_connection() 호출은 monkey-patch -> DB 호출 없이 예상되는 connection 객체 반환
        "get_connection",
        # return 할 값
        return_value=Connection(conn_id="test", login="airflow", password="airflow"),
    )
    
    task = MovielensPopularityOperator(
        task_id="test_id",
        conn_id="testconn",
        start_date="2015-01-01",
        end_date="2015-01-03",
        top_n=5,
    )
    result = task.execute(context=None)
    assert len(result) == 5

    # get_connection() 호출 횟수가 1회인지 확인
    assert mock_get.call_count == 1
    # Airflow metastore에서 요청될 것으로 예상, conn_id 가 Operator 에서 넘긴 값과 일치하는지 확인
    mock_get.assert_called_with("testconn")

disk file 로 테스트

  • tempfile: python 에서 임시 저장소 작업하는 모듈
    • 사용 후 자동 삭제
    • tmp_dir, tmp_path
예제
  • json 파일 read 후 csv 로 write 하는 JsonToCsvOperator 를 정의했다고 가정
import csv
import json
from pathlib import Path

# tmp_path 는 고정
def test_json_to_csv_operator(tmp_path: Path):
    print(tmp_path.as_posix())

	# 테스트에 사용할 경로 지정
    input_path = tmp_path / "input.json"
    output_path = tmp_path / "output.csv"

    # 테스트 input json 데이터 정의, write
    input_data = [
        {"name": "bob", "age": "41", "sex": "M"},
        {"name": "alice", "age": "24", "sex": "F"},
        {"name": "carol", "age": "60", "sex": "F"},
    ]
    with open(input_path, "w") as f:
        f.write(json.dumps(input_data))

    # task 실행
    operator = JsonToCsvOperator(
        task_id="test", input_path=input_path, output_path=output_path
    )
    operator.execute(context={})

    # task 실행 후 생성된 output 
    with open(output_path, "r") as f:
        reader = csv.DictReader(f)
        result = [dict(row) for row in reader]

    # output data 로 테스트 시행
    assert result == input_data

테스트에서 DAG, task context 작업

  • operator.execute(context={}) 와 같이 테스트시에는 ds 변수 사용 불가

    • -> DAG 에 operator 를 할당하여 operator.run() 을 직접 호출
  • task 실행하기 위해, Airflow database 가 필요

    • 앞선 task instance 정보 등을 database 에서 쿼리하기 때문
    • airflow db init 으로 db 구성 필요
      • 기본 <AIRFLOW_HOME>/airflow.db SQLite 파일이 db 역할 수행
DAG 재사용, DB mocking 예제
  • 의존성 설치
    • pytest_docker_tools: 테스트를 위한 docker container 를 위한 helper 제공
      • fetch : docker pull 하여 docker image 반환
      • container : container 실행

pip3 install pytest_docker_tools pytest_mock --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --trusted-host pypi.org

  • conftest.py 파일 정의
    • conftest.py : pytest 에서 fixture 기능으로 모든 테스트에서 공유, 적용
# 모든 테스트에서 DAG 재사용 가능
@pytest.fixture
def test_dag():
    return DAG(
        "test_dag",
        default_args={"owner": "airflow", "start_date": datetime.datetime(2015, 1, 1)},
        schedule_interval="@daily",
    )
  • DB 접속 mocking
    • PostgreSQL credential
    • container 함수도 fixture 이므로, test parameter로 전달
@pytest.fixture(scope="module")
def postgres_credentials():
    PostgresCredentials = namedtuple("PostgresCredentials", ["username", "password"])
    return PostgresCredentials("testuser", "testpass")

# docker 이미지 가져오기
postgres_image = fetch(repository="postgres:11.1-alpine")

# docker container 실행
postgres = container(
    # docker image id
    image="{postgres_image.id}",
    environment={
        "POSTGRES_USER": "{postgres_credentials.username}",
        "POSTGRES_PASSWORD": "{postgres_credentials.password}",
    },
    # ports key 는 container, value 는 host port (None 일 경우 임의의 포트)
    ports={"5432/tcp": None},
    # 실제 DB 스키마와 데이터로 초기화하고자 할 경우, postgres-init.sql 파일을 volume 마운트
    volumes={
        os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
            "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
        }
    },
)
import datetime
import os
from collections import namedtuple
from pathlib import Path

import pytest
from airflow.models import DAG, Connection
from pytest_docker_tools import fetch, container
from pytest_mock import MockFixture

# test_dag, postgres container, credentials fixture 를 인자로 받아서 테스트
def test_movielens_to_postgres_operator(
    mocker: MockFixture, test_dag: DAG, postgres, postgres_credentials
):
    mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(conn_id="test", login="airflow", password="airflow"),
    )
    
    # postgres container, postgres_credentials fixture 정보를 이용해 postgres 목업 container 로 연결
    mocker.patch.object(
        PostgresHook,
        "get_connection",
        return_value=Connection(
            conn_id="postgres",
            conn_type="postgres",
            host="localhost",
            login=postgres_credentials.username,
            password=postgres_credentials.password,
            # container 설정에 지정된 host port
            port=postgres.ports["5432/tcp"][0],
        ),
    )

    task = MovielensToPostgresOperator(
        task_id="test",
        movielens_conn_id="movielens_id",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        postgres_conn_id="postgres_id",
        insert_query=(
            "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
            "VALUES ({0}, '{{ macros.datetime.now() }}')"
        ),
        dag=test_dag,
    )

    pg_hook = PostgresHook()

    row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
    assert row_count == 0

    pytest.helpers.run_airflow_task(task, test_dag)

    row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
    assert row_count > 0


reference

  • 서적 Apache Airflow 기반의 데이터 파이프라인

0개의 댓글