Airflow TaskVirtualEnv 정리

Minsang Yu·2023년 9월 2일
0

Airflow의 TaskVirtualEnv는 Apache Airflow에서 사용되는 특정 Python 가상 환경을 관리하는 도구입니다.

Airflow는 데이터 파이프라인 및 워크플로우를 정의하고 실행하는 데 사용되며, 이러한 작업은 일반적으로 Python 스크립트로 작성됩니다.

TaskVirtualEnv는 각 작업에 대한 독립적인 Python 가상 환경을 관리하여 작업 간에 필요한 종속성과 환경을 격리시킵니다.

TaskVirtualEnv 를 사용하면 다음과 같은 이점을 얻습니다

  • 의존성 관리: TaskVirtualEnv를 사용하면 작업마다 고유한 Python 가상 환경을 생성할 수 있으므로 각 작업에 필요한 라이브러리 및 패키지를 독립적으로 관리할 수 있습니다. 이로 인해 작업 간의 충돌이나 버전 관리 문제를 피할 수 있습니다.

  • 환경 격리: 작업이 독립된 가상 환경에서 실행되므로 다른 작업이나 시스템 전체에 영향을 미치지 않고 안전하게 작업을 수행할 수 있습니다.

  • 코드 일관성: TaskVirtualEnv를 사용하면 작업이 실행되는 환경이 항상 동일하게 유지됩니다. 이는 코드가 서로 다른 환경에서 실행될 때 발생하는 문제를 방지합니다.

virtualEnv의 특징

  • virtualEnv는 task실행시 자동적으로 생성되고 종료되면 소멸합니다.

  • def 에 의해서 정의되며 class 의 부분은 될 수 없습니다.

  • 모든 imports 들은 function 내에서만 동작하며 함수외의 변수들에 대해서는 접근할 수 없다.

  • global scope variabe인 virtualenv_string_args 로 정의된 변수에 대해서는 함수 외의 변수에 대해서도 접근이 가능하다.

  • op_args나 op_kwargs를 사용해서 인자를 넘기고 반환값을 받을 수 있다.

  • 만약 virtual 환경의 파이썬 과 local 환경의 파이썬의 버전이 다르다면 return value인 op_args, op_kwargs 를 사용할 수없다.

매개변수

python_callable (Callable) – A python function with no references to outside variables, defined with def, which will be run in a virtualenv

requirements (None | Iterable[str] | str) – Either a list of requirement strings, or a (templated) “requirements file” as specified by pip.

python_version (str | int | float | None) – The Python version to run the virtualenv with. Note that both 2 and 2.7 are acceptable forms.

use_dill (bool) – Whether to use dill to serialize the args and result (pickle is default). This allow more complex types but requires you to include dill in your requirements.

system_site_packages (bool) – Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information.

pip_install_options (list[str] | None) – a list of pip install options when installing requirements See ‘pip install -h’ for available options

op_args (Collection[Any] | None) – A list of positional arguments to pass to python_callable.

op_kwargs (Mapping[str, Any] | None) – A dict of keyword arguments to pass to python_callable.

string_args (Iterable[str] | None) – Strings that are present in the global var virtualenv_string_args, available to python_callable at runtime as a list[str]. Note that args are split by newline.

templates_dict (dict | None) – a dictionary where the values are templates that will get templated by the Airflow engine sometime between init and execute takes place and are made available in your callable’s context after the template has been applied

templates_exts (list[str] | None) – a list of file extensions to resolve while processing templated fields, for examples ['.sql', '.hql']

expect_airflow (bool) – expect Airflow to be installed in the target environment. If true, the operator will raise warning if Airflow is not installed, and it will attempt to load Airflow macros when starting.

skip_on_exit_code (int | collections.abc.Container[int] | None) – If python_callable exits with this exit code, leave the task in skipped state (default: None). If set to None, any non-zero exit code will be treated as a failure.

– 한글 번역

python_callable ( Callable ) – def로 정의된 외부 변수에 대한 참조가 없는 Python 함수로 virtualenv 에서 실행됩니다.

요구사항 ( None | Iterable [ str ] | str ) – 요구사항 문자열 목록 또는 pip에서 지정한 (템플릿화된) "요구사항 파일"입니다.

python_version ( str | int | float | None ) – virtualenv를 실행할 Python 버전입니다 . 2와 2.7 모두 허용되는 형식입니다.

use_dill ( bool ) – 인수 및 결과를 직렬화하기 위해 dill을 사용할지 여부입니다(pickle이 기본값입니다). 이를 통해 더 복잡한 유형이 허용되지만 요구 사항에 딜을 포함해야 합니다.

system_site_packages ( bool ) – virtualenv 에 system_site_packages를 포함할지 여부입니다 . 자세한 내용은 virtualenv 설명서를 참조하세요.

pip_install_options ( list [ str ] | None ) – 요구 사항 설치 시 pip 설치 옵션 목록 사용 가능한 옵션은 'pip install -h'를 참조하세요.

op_args ( Collection [ Any ] | None ) – python_callable에 전달할 위치 인수 목록입니다.

op_kwargs ( Mapping [ str , Any ] | None ) – python_callable에 전달할 키워드 인수의 사전입니다.

string_args ( Iterable [ str ] | None ) – 전역 var virtualenv _string_args에 존재하는 문자열로, 런타임에 python_callable에서 목록[str]으로 사용할 수 있습니다. 인수는 개행 문자로 분할됩니다.

template_dict ( dict | Noneinit ) – 값이 Airflow 엔진에 의해 템플릿화 되고 execute템플릿이 적용된 후 콜러블 컨텍스트에서 사용할 수 있는 템플릿인 사전입니다.

template_exts ( list [ str ] | None ) – 예를 들어 템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록['.sql', '.hql']

Expect_airflow ( bool ) – Airflow가 대상 환경에 설치될 것으로 예상합니다. true인 경우 Airflow가 설치되지 않은 경우 운영자는 경고를 발생시키고 시작할 때 Airflow 매크로 로드를 시도합니다.

Skip_on_exit_code ( int | collections.abc.Container [ int ] | None ) – python_callable이 이 종료 코드로 종료되면 작업을 skipped상태(기본값: None)로 둡니다. 로 설정하면 None0이 아닌 종료 코드가 모두 실패로 처리됩니다.

예제

Taskflow API를 이용해 virtualenv를 사용하는 예제 코드

import bentoml

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.
    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    
    from time import sleep

    from colorama import Back, Fore, Style
  
    """
    Error Occor 
     VirtualEnv와 localEnv는 서로 격리된 의존성을 가지고 있다. 만약 local 에서는 가지고 있는 
     모듈을 import하면 에러가 발생한다.
    """ 
    import benotoml
    
    # 또한 경로를 다시 잡아줘야한다. 
    # 가상 환경은 local환경과 경로 시작이 다르므로 다시 모듈들의 경로를 잡는 코드가 필효하다.
    sys.path.append('/path/to/your/env')
    from proj.script.dag.module import func 
    
    
    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

VirtualEnv 사용시 경로문제가 발생할 수 있다. loccalEnv와 VirtualEnv의 경로가 다르므로 task.virtualEnv 함수 안에서 다시 경로를 잡아줘야 한다.

profile
Jr. DataEngineer

0개의 댓글