위와 같이 Airflow DAG 실행 중 Task가 실패했을 때 지정한 Slack 워크스페이스의 채널에 에러 정보를 올려주는 기능을 알아보자.
https://api.slack.com/ 로 접속하여 알림을 받을 앱을 생성한다. 이 앱은 특정 채널에 메시지를 보낼 수 있는 권한을 가진다.
표시한 링크로 들어가 Incoming Webhooks를 활성화해준다.
아래쪽으로 스크롤을 내려서 워크스페이스에 웹 훅을 추가한다. 알림을 하고자 할 채널을 설정해줘야 한다.
추가를 하고 나면 Webhook URL
을 복사하여 아래 curl request 명령어를 실행하여 테스트한다.
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' [Webhook URL]
이 URL은 고유하며, 이 URL로 HTTP POST 요청을 보내면 메시지가 해당 채널에 게시된다.
Webhook URL
의 https://hooks.slack.com/services/
뒤에 있는 스트링을 복사하여 Airflow variable에 저장한다.
중간중간 /
가 들어가있는 형태의 스트링이다 (T25E230LAU/B05DCC7SJ75/J69PCCuXVRbfFLiYL1znh)
DAG를 설정할 때 default_args
의 on_failure_callback
함수에 할당할 slack on_failure_callback 함수를 작성해야 한다. on_failure_callback
함수는 실패한 경우에 호출되며, 이 함수는 작업의 실행 context
를 인자로 자동으로 받게 된다.
context
context
는 딕셔너리 형태로, 현재 실행중인 작업 인스턴스에 대한 많은 정보를 포함하고 있다.
예를 들어, task_instance, execution_date (-> logical_date로 표기가 권장됨), dag_run 등의 정보가 포함된다.
인자로 받은 context
딕셔너리에서 Task, 에러와 관련된 값들을 가져와 조합한다.
def on_failure_callback(context):
# Task 인스턴스를 가져와 어떤 Task에서 에러가 났는지 확인할 수 있도록 한다.
text = str(context['task_instance'])
# exception 정보가 있으면 가져온다.
text += f"``` {str(context.get('exception'))} ```"
# Slack에 메세지 보내는 함수 (아래에서 설명)
send_message_to_a_slack_channel(message, ':cry:')
이렇게 하면 간단하게 에러가 발생한 Task와 에러 유형을 알 수 있는데, 만약 좀 더 자세하게 알고 싶다면 로그파일에 접근하여 내용을 가져올 수 있다.
로그 파일의 경로는 위와 같이 context
인자에서 얻을 수 있는 dag_id, run_id, task_id, attempt 로 구성되어 있다. 각각 key로 값을 가져와 파일 경로 형식에 맞춰 로그 경로를 완성한다. 로그 전체를 가져오는 것은 화면을 너무 많이 차지하고 비효율적이므로 끝에서 10~20 줄 정도를 가져올 수 있도록 한다.
# 로그의 마지막 n줄을 가져온다
def tail(file_path, n=20):
try:
with open(file_path, 'r') as f:
return deque(f, n)
except FileNotFoundError:
return ["Log file not found."]
# 로그 경로 string 만들기
log_path = os.path.join(
settings.AIRFLOW_HOME,
"logs",
f'dag_id={dag_id}',
f'run_id={run_id}',
f'task_id={task_id}',
f'attempt={str(int(try_number) - 1)}.log'
)
log_text = '\n'.join(tail(log_path, 10))
이렇게 생성된 로그 내용을 위의 text
에 append한다.
Web hook url을 테스트할 때 사용했던 curl 명령어를 사용하여 메세지를 보낼 수 있다. request
라이브러리를 사용하여 url, header, data를 설정하고 Slack 채널로 에러내용이 포함된 메세지를 보낸다.
Airflow Variable에 저장했던 slack_url
을 여기에서 사용한다.
def send_message_to_a_slack_channel(message, emoji):
url = f"https://hooks.slack.com/services/{Variable.get('slack_url')}"
headers = {
'content-type': 'application/json',
}
data = { "username": "oyj", "text": message, "icon_emoji": emoji }
requests.post(url, json=data, headers=headers)
작성한 on_failure_callback
함수를 DAG의 default_args
에 설정해준다.
앞서 작성한 코드는 dags/plugins
폴더에 slack.py
로 생성해주었기 때문에 import하여 사용한다.
├── dags
│ └── plugins
│ │ └── slack.py
위와 같은 디렉토리 구조를 갖는다.
from plugins import slack
with DAG(
dag_id='build_summary_v2',
...
default_args= {
'on_failure_callback': slack.on_failure_callback,
}
) as dag:
일단 Slack에 메세지를 보내는 것까지는 성공했지만 이게 어떤 원리로 작동하는 건지 궁금했다.
Webhook
이라는 개념도 처음 들어보고 이게 어떤 역할을 하는 건지 자세하게 알아보려고 한다.
Webhook
에서 hook
은 특정 이벤트가 발생했을 때 호출되는 함수나 루틴을 지칭하는 용어로, 일반적으로 시스템 또는 어플리케이션의 기본 동작에 추가적인 동작을 "걸어두는"(hook) 개념이다.
예를 들어, GitHub에서는 사용자가 특정 이벤트(커밋, 브랜치 생성 등)를 수행할 때마다 Webhook
을 설정하여 특정 URL로 HTTP POST 요청을 보내는 기능을 제공한다. 이런 식으로, Webhook
은 하나의 시스템에서 다른 시스템으로 실시간 정보를 전송하는 트리거 역할을 한다.
Webhook을 사용하려는 서비스(Slack)에서 Webhook의 대상이 되는 URL(콜백 URL)을 설정한다. 이 URL은 이벤트(Task 실패)가 발생할 때 정보(에러 내용)가 전달될 대상 서버의 주소이다.
설정한 이벤트(Task 실패)가 발생하면, 이벤트를 감지하는 서비스(Airflow)는 콜백 URL로 HTTP 요청을 보낸다. 이 요청은 주로 POST 방식을 사용하며, 이벤트에 관련된 데이터를 요청 본문에 담아 전송한다.
# 예)
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' https://hooks.slack.com/services/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
콜백 URL의 서버는 HTTP 요청을 받아 이를 처리한다.(Slack 채널에 메세지 보내기) 이때, 요청 본문에 담긴 데이터를 사용하여 필요한 작업을 수행한다.
출처: https://www.zoho.com/blog/assist/webhooks-what-are-they-and-how-are-they-used.html
API와 Webhook은 애플리케이션에서 다른 앱으로 정보를 전달하는 데 사용되고, 모두 HTTP를 통해 데이터를 전송하지만 데이터 수신 프로세스에 있어 차이점이 있다.
이러한 특성으로 인해 실시간 앱 업데이트가 필요한 경우 Webhook을 사용하고 서버 측 애플리케이션이 자주 변경되는 경우 API를 사용한다.
요약하자면, API는 주로 데이터를 요청하고 가져오는 데 사용되며, Webhook은 특정 이벤트에 대한 실시간 업데이트를 제공하는 데 사용된다.