목표 : 프로젝트 중 무기 데이터의 집계를 하기 위한 데이터를 주기적으로 받아 GCP 스토리지에 저장해야 하는 작업이 필요했다. Raw Data를 받아와 우리가 사용할 데이터로 전처리하고 파싱하는 dag를 짰고, 이것이 잘 수행 중인지에 대해 slack bot을 통해 알림을 받고자 한다.
![]() | ![]() |
---|
Slack API 소켓모드란?
실시간 메시징 및 이벤트 기능을 구현하는 데 사용되는 기술로, 기본적으로 Slack API의 웹 소켓 연결을 통해 실기산으로 메시지 및 이벤트를 주고 받을 수 있게 해주며 이를 통해 애플리케이션은 더 빠르게 사용자와 상호작용하고, 이벤트를 감지하며, 실시간 업데이트를 제공
소켓 모드를 사용하면 회사 방화벽 뒤에서 허용되지 않을 수 있는 공용 HTTP 끝점을 노출하지 않고 봇이 작업 공간에서 상호 작용이 가능
![]() | ![]() |
---|
워크스페이스에 앱 설치
처음 만들고 나면 Install to Workspace에 눌러 자신이 원하는 워크스페이스에 등록하면 된다.
권한 수정과 같이 수정하였을 때는 Reinstall을 이용하여 수정 해주면 된다.
채널에 봇 추가하기
@봇이름
을 하여 채널에 참여 시킨다.
airflow를 켠 후 admin - Variavles 설정
![]() | ![]() |
---|
key값에 본인이 쓸 token 변수명을 적고, val에 토큰값을 입력 (xoxb로 시작하는 토큰값)
여러 개의 dag에 쉽게 적용 시키기 위해서 slack_nofitications.py 작성
from slack_sdk import WebClient
from datetime import datetime
class SlackAlert:
# 클래스 인스턴스 초기화
# 채널정보와 slack 인증 토큰을 인자로 받음
def __init__(self, channel, token):
self.channel = channel
self.client = WebClient(token=token)
def success_msg(self, msg):
# 성공메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
text = f"""
date : {datetime.today().strftime('%Y-%m-%d')}
alert :
Success!
task id : {msg.get('task_instance').task_id},
dag id : {msg.get('task_instance').dag_id},
log url : {msg.get('task_instance').log_url}
"""
self.client.chat_postMessage(channel=self.channel, text=text)
def fail_msg(self, msg):
# 실패메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
text = f"""
date : {datetime.today().strftime('%Y-%m-%d')}
alert :
Fail!
task id : {msg.get('task_instance').task_id},
dag id : {msg.get('task_instance').dag_id},
log url : {msg.get('task_instance').log_url}
"""
self.client.chat_postMessage(channel=self.channel, text=text)
앞서 slack_notifications.py를 이용하여 SlackAlert 함수 사용
토큰 값에 노출을 막기 위해 Variable 지정
Variable.get을 통해 지정해두었던 slack_api_token을 불러옴
SlackAlert에 첫째 인자는 메시지를 보낼 채널명을 입력하고, 두번째 인자에는 slack token을 입력
dag에 on_success_callback과 on_failure_callback에
각각 slack_notifications.py에서 만든 함수인 sucess_msg, fail_msg를 넣어준다.
message 채널에 notice-bot이 성공 여부를 알리는 메시지를 보내준다.
Raw data를 분석하기 위해 데이터 파싱 및 전처리를 한 후 파케이 파일로 logs_weapon에 넣는 것이 목적
logs_weapon파일에 가보니 잘 들어온 것을 확인할 수 있었음
gcp 스토리지에도 파케이 파일로 잘 들어온 모습
파케이 파일을 꺼내서 본 모습
참고 blog
https://www.twilio.com/blog/how-to-build-a-slackbot-in-socket-mode-with-python
유익한 자료 감사합니다.