Airflow Task 실패 시 Slack 알림

ohyujeong·2023년 6월 21일
0

Airflow

목록 보기
6/6

위와 같이 Airflow DAG 실행 중 Task가 실패했을 때 지정한 Slack 워크스페이스의 채널에 에러 정보를 올려주는 기능을 알아보자.

1. Slack app 생성하기

https://api.slack.com/ 로 접속하여 알림을 받을 앱을 생성한다. 이 앱은 특정 채널에 메시지를 보낼 수 있는 권한을 가진다.

2. Incoming Webhooks 활성화

표시한 링크로 들어가 Incoming Webhooks를 활성화해준다.

아래쪽으로 스크롤을 내려서 워크스페이스에 웹 훅을 추가한다. 알림을 하고자 할 채널을 설정해줘야 한다.
추가를 하고 나면 Webhook URL 을 복사하여 아래 curl request 명령어를 실행하여 테스트한다.

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' [Webhook URL]

이 URL은 고유하며, 이 URL로 HTTP POST 요청을 보내면 메시지가 해당 채널에 게시된다.

3. Airflow Variable 저장

Webhook URLhttps://hooks.slack.com/services/ 뒤에 있는 스트링을 복사하여 Airflow variable에 저장한다.
중간중간 / 가 들어가있는 형태의 스트링이다 (T25E230LAU/B05DCC7SJ75/J69PCCuXVRbfFLiYL1znh)

4. on_failure_callback 함수 작성

DAG를 설정할 때 default_argson_failure_callback 함수에 할당할 slack on_failure_callback 함수를 작성해야 한다. on_failure_callback 함수는 실패한 경우에 호출되며, 이 함수는 작업의 실행 context 를 인자로 자동으로 받게 된다.

context

context는 딕셔너리 형태로, 현재 실행중인 작업 인스턴스에 대한 많은 정보를 포함하고 있다.
예를 들어, task_instance, execution_date (-> logical_date로 표기가 권장됨), dag_run 등의 정보가 포함된다.

메세지에 보여질 text 가져오기

인자로 받은 context 딕셔너리에서 Task, 에러와 관련된 값들을 가져와 조합한다.

def on_failure_callback(context):

	# Task 인스턴스를 가져와 어떤 Task에서 에러가 났는지 확인할 수 있도록 한다.
	text = str(context['task_instance'])   
    
    # exception 정보가 있으면 가져온다.
    text += f"``` {str(context.get('exception'))} ```"
    
    # Slack에 메세지 보내는 함수 (아래에서 설명)
    send_message_to_a_slack_channel(message, ':cry:')

로그파일 내용 가져오기

이렇게 하면 간단하게 에러가 발생한 Task와 에러 유형을 알 수 있는데, 만약 좀 더 자세하게 알고 싶다면 로그파일에 접근하여 내용을 가져올 수 있다.

로그 파일의 경로는 위와 같이 context 인자에서 얻을 수 있는 dag_id, run_id, task_id, attempt 로 구성되어 있다. 각각 key로 값을 가져와 파일 경로 형식에 맞춰 로그 경로를 완성한다. 로그 전체를 가져오는 것은 화면을 너무 많이 차지하고 비효율적이므로 끝에서 10~20 줄 정도를 가져올 수 있도록 한다.

# 로그의 마지막 n줄을 가져온다
def tail(file_path, n=20):
    try:
        with open(file_path, 'r') as f:
            return deque(f, n)
    except FileNotFoundError:
        return ["Log file not found."]
 
# 로그 경로 string 만들기 
log_path = os.path.join(
        settings.AIRFLOW_HOME, 
        "logs", 
        f'dag_id={dag_id}', 
        f'run_id={run_id}', 
        f'task_id={task_id}',
        f'attempt={str(int(try_number) - 1)}.log'
    )

log_text = '\n'.join(tail(log_path, 10))

이렇게 생성된 로그 내용을 위의 text 에 append한다.

Slack에 메세지 보내기

Web hook url을 테스트할 때 사용했던 curl 명령어를 사용하여 메세지를 보낼 수 있다. request 라이브러리를 사용하여 url, header, data를 설정하고 Slack 채널로 에러내용이 포함된 메세지를 보낸다.
Airflow Variable에 저장했던 slack_url 을 여기에서 사용한다.

def send_message_to_a_slack_channel(message, emoji):
    url = f"https://hooks.slack.com/services/{Variable.get('slack_url')}"
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "oyj", "text": message, "icon_emoji": emoji }
    requests.post(url, json=data, headers=headers)

DAG 설정

작성한 on_failure_callback 함수를 DAG의 default_args 에 설정해준다.
앞서 작성한 코드는 dags/plugins 폴더에 slack.py 로 생성해주었기 때문에 import하여 사용한다.

├── dags
│   └── plugins
│   │   └── slack.py

위와 같은 디렉토리 구조를 갖는다.

from plugins import slack

with DAG(
    dag_id='build_summary_v2',
    ...
    default_args= {
        'on_failure_callback': slack.on_failure_callback, 
    }
) as dag:

어떻게 동작하는걸까?

일단 Slack에 메세지를 보내는 것까지는 성공했지만 이게 어떤 원리로 작동하는 건지 궁금했다.
Webhook 이라는 개념도 처음 들어보고 이게 어떤 역할을 하는 건지 자세하게 알아보려고 한다.

Webhook

Webhook 에서 hook 은 특정 이벤트가 발생했을 때 호출되는 함수나 루틴을 지칭하는 용어로, 일반적으로 시스템 또는 어플리케이션의 기본 동작에 추가적인 동작을 "걸어두는"(hook) 개념이다.

예를 들어, GitHub에서는 사용자가 특정 이벤트(커밋, 브랜치 생성 등)를 수행할 때마다 Webhook 을 설정하여 특정 URL로 HTTP POST 요청을 보내는 기능을 제공한다. 이런 식으로, Webhook하나의 시스템에서 다른 시스템으로 실시간 정보를 전송하는 트리거 역할을 한다.

동작 과정

1. Webhook 설정

Webhook을 사용하려는 서비스(Slack)에서 Webhook의 대상이 되는 URL(콜백 URL)을 설정한다. 이 URL은 이벤트(Task 실패)가 발생할 때 정보(에러 내용)가 전달될 대상 서버의 주소이다.

2. 이벤트 발생

설정한 이벤트(Task 실패)가 발생하면, 이벤트를 감지하는 서비스(Airflow)는 콜백 URL로 HTTP 요청을 보낸다. 이 요청은 주로 POST 방식을 사용하며, 이벤트에 관련된 데이터를 요청 본문에 담아 전송한다.

# 예)
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' https://hooks.slack.com/services/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

3. 요청 처리

콜백 URL의 서버는 HTTP 요청을 받아 이를 처리한다.(Slack 채널에 메세지 보내기) 이때, 요청 본문에 담긴 데이터를 사용하여 필요한 작업을 수행한다.

API vs. Webhook

출처: https://www.zoho.com/blog/assist/webhooks-what-are-they-and-how-are-they-used.html

API와 Webhook은 애플리케이션에서 다른 앱으로 정보를 전달하는 데 사용되고, 모두 HTTP를 통해 데이터를 전송하지만 데이터 수신 프로세스에 있어 차이점이 있다.

API (Application Programming Interface)

  • polling 프로세스를 사용하여 원하는 데이터가 준비되었는지 확인한다.
  • 요청과 응답의 모델을 따른다. 클라이언트(시스템 또는 사용자)가 특정 API 엔드포인트에 데이터를 요청하고, 서버는 해당 요청에 대한 응답을 반환한다.
  • 주로 동기식으로 작동하며, 요청이 있을 때만 데이터를 제공한다. 클라이언트가 API에 질문을 하면 서버가 답변을 제공하는 방식이다.
  • API를 사용하면 개발자들이 특정 어플리케이션의 기능을 사용하여 다른 어플리케이션을 만들 수 있다. 예를 들어, Facebook, Twitter, Google 등의 서비스들은 자신들의 API를 제공하여 개발자들이 이를 이용해 새로운 앱을 만들 수 있게 한다.

Webhook

  • 이벤트 기반의 모델을 따른다. 특정 이벤트가 발생하면, 서버가 클라이언트에게 데이터를 push한다. (pull 방식의 API와는 반대)
  • 비동기적으로 작동하며, 특정 이벤트가 발생할 때만 데이터를 전달한다.
  • 실시간 알림과 같은 기능을 구현하는 데 유용하다.

이러한 특성으로 인해 실시간 앱 업데이트가 필요한 경우 Webhook을 사용하고 서버 측 애플리케이션이 자주 변경되는 경우 API를 사용한다.

요약하자면, API는 주로 데이터를 요청하고 가져오는 데 사용되며, Webhook은 특정 이벤트에 대한 실시간 업데이트를 제공하는 데 사용된다.

profile
거친 돌이 다듬어져 조각이 되듯

0개의 댓글