[airflow] airflow 설치서버에서 다른서버 shell 스크립트 실행하기

HI·2024년 2월 13일
0

airflow

목록 보기
1/2

1) 서버 준비, airflow 준비과정

https://velog.io/@pshyeok2/%EC%97%90%EC%96%B4%ED%94%8C%EB%A1%9C%EC%9A%B0%EA%B0%80-ssh-%EC%A0%91%EC%86%8D%EC%9D%B4-%EC%95%88%EB%90%9C%EB%8B%A4-%ED%82%A4-%ED%8C%8C%EC%9D%BC%EC%9D%98-%EC%9C%84%EC%B9%98%EB%A5%BC-%ED%99%95%EC%9D%B8%ED%95%98%EC%9E%90 참고

2) dag 예시

ssh_call.py

import pendulum
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.models import Variable

ssh_hook = SSHHook(ssh_conn_id="serv1")

KST = pendulum.timezone("Asia/Seoul")

dag = DAG(
    dag_id="ssh_call", #dag이름(py파일명과 동일하지 않아도 됨)
    schedule="58 11 * * 2",
    start_date=datetime(2024, 2, 13, tzinfo=KST),
    #schedule_interval=None
)

t0 = BashOperator(
    task_id="echo_bash_task", #dag의 task이름, graph에 보이는 이름
    bash_command="echo OK~!",
    dag=dag,
)

t1= SSHOperator(
    task_id = "task1",
    command="/home/test/test.sh ",
    ssh_hook = ssh_hook,
     dag=dag,
)

#tasks = [t1]

t0 >> t1

SSHHook을 만드는 두가지 방법

1) airflowUI>Admin>Connections에 다른서버 정보를 미리 등록 후 ssh_conn_id를 명시하여 사용하는 방법

ssh_hook = SSHHook(ssh_conn_id="sev1")

2) Connections에 등록하지 않고 dag에 정보를 모두 입력해서 사용하는 방법

ssh_hook = SSHHook(ssh_conn_id=None, username="home",remote_host="11.1.1.1",key_file="/opt/airflow/dags/id_rsa")

그래도 id_rsa까지는 두가지 모두 해주어야 함!

  • 주의

실행중 SSH command timed out 이라는 에러 발생.
이 cmd_timeout 기본값은 10초라서 10초만 실행되고 로그를 보면 연결이 끊겨있다.
그러나 task를 실행한 서버에 들어가 보면 실행시킨 task는 계속 작업중이다. 단지 airflow와의 연결이 끊긴것! 이러면 airflow에는 fail로 나타나고 연결이 끊긴 상태로 다음 task는 실행이 제대로 되지 않을 것이다!
하고있던 실제 서버의 작업은 계속 실행중이겠지만... cmd_timeout를 수정하여 task가 끊기지 않도록 수정하자.

수정하기 위해서 처음엔

ssh_hook = SSHHook(ssh_conn_id="host1", cmd_timeout=None) 

이렇게 ssh_hook 부분을 수정했는데 계속해서 cmd_timeout 이라는 것을 못찾는 airflow...

결국 아래와 같이

t1= SSHOperator(
    task_id = "task1",
    command="/home/test.sh extract_task ",
    ssh_hook = ssh_hook,
    cmd_timeout=600,
     dag=dag,
)

테스크 안에 넣어줘야 잘 실행이되었다.
https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/_modules/airflow/providers/ssh/hooks/ssh.html#CMD_TIMEOUT

profile
https://github.com/gaeunban

0개의 댓글