sensor 는 에어플로우 task중 하나다. 주로 외부 이벤트를 기다렸다가 특정조건을 만족하면 task를 진행한다
내가 쓸 task 에서는 sql조회 결과에 따라 다음 task 를 실행하기 위해 sql sensor 를 사용했다
내가 만들 task는 table_A에 데이터를 조회해서 데이터가 있으면 그 다음 task를 실행하는거였다
먼저 sql sensor 를 import 해준다
from airflow.sensors.sql_sensor import SqlSensor
두개의 task 를 만들었는데 task1은 sql sensor 를 사용하여서 디비에 값이 있는지 조회를 하고 만약에 있다면 task2로 넘어간다
t1 = SqlSensor(
task_id='check_data',
conn_id='mysql_db',
sql="SELECT count(*) FROM table_A;"
mode="poke",
timeout=300,
dag=dag)
디비 정보는 airflow에 등록해서 conn_id에 넣으면 된다
그 다음에는 t1이 성공적으로 실행되면 실행할 task2를 만들어주고 dependency를 설정해줬다
# dependencies
t1 >> t2
이렇게 설정해줬을때 아무런 데이터도 확인되지 않으면 t1에서 에러가 나고 t2는 실행되지 않는다
SqlSensor는 쿼리의 결과가 참(True)인지 거짓(False)인지만을 판단하는데
내가 실제 사용한 t1에서 의도치 않게 쿼리를 잘못넣어 count(*) 가 0 이 되어 (데이터 있는데,,,,,) False로 판단이 되었고
poke mode로 300초 유지하다 False인 채로 timeout 이 되어
AirflowSensorTimeout 에러가 발생해서 t2는 자동으로 실행되지 않았다,,,,,
그래서 upstream failed라고 된걸 볼수있었는데 무튼 의도한건 아니다(당연히 success일줄...)
다시 제대로 작성을 해서 select count(*)가 0이 아니게 나와서 잘 실행 완료 !!!