https://www.teamdatascience.com/post/how-to-quickly-set-up-airflow-to-invoke-lambda
를 참조해서 하려고 했는데... 안된다!
2020년 글이니 당연하겠지...만 인터넷에 있는 example이 존재하는 여러 수단중 유일하게 dag parsing이 되어 이걸 기반으로 수행하였다.
[2022-11-25, 17:15:20 KST] {standard_task_runner.py:97} ERROR - Failed to execute job 11182 for task lambda1 (__init__() got multiple values for argument 'aws_conn_id'; 17)
아마 내용이 다를텐데 이런 에러가 뜬다.
그래서 막 원인을 찾다가 airflow 코드를 보니...
AwsLambdaHook의 조상인 AwsGenericHook에서 첫번째 인자는 aws_conn_id 이다.
그러니 저 글대로 선언하면 안된다.
def lambda_fail(ds,**kwargs):
hook = AwsLambdaHook('myAirflowTest',
region_name='ap-northeast-2',
log_type='None',
qualifier='$LATEST',
invocation_type='RequestResponse',
config=None,
aws_conn_id='my_lambda')
response_1 = hook.invoke_lambda(payload='null')
print ('Response--->' , response_1)
def lambda_success(ds, **kwargs):
hook = AwsLambdaHook(
aws_conn_id='my_lambda',
region_name='ap-northeast-2',
)
response_1 = hook.invoke_lambda(
function_name='myAirflowTest',
payload='null'
)
print('Response--->', response_1)
function_name 은 AwsLambdaHook 의 생성자에서 쓰이지 않고 hook.invoke_lambda 함수에서 쓰인다.
이제 남은 관문은 aws_conn_id에 맞는 airflow connection을 만드는 것이다.
web ui 버전마다 다른데, connections에 들어가서
connection id는 dag에서 사용하는 이름으로,
connection Type은 amazone web services 로
extra에 {"aws_access_key_id": "", "aws_secret_access_key": ""}
를 입력하면 된다.
해당 키는 IAM에서 AWSLambda_FullAccess 를 가지고 있는 사용자의 키를 사용해야 한다.
이걸 하려고 오만 문서를 보고 했는데 안된다.
AwsLambdaInvokeFunctionOperator 는 dag parse에 성공한 적이 없다.
invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
task_id='setup__invoke_lambda_function',
function_name=LAMBDA_FUNCTION_NAME,
payload=SAMPLE_EVENT,
)
공식문서에 있는 위 내용으로 dag를 만들었는데 안 되었을때의 기분이란.
2022년 7월 19일에 시작해서 10월 24일에 추가되었던데, 예제를 찾을 수 없다.
사용한 버전이 뭔지 모르겠는데 helm chart에서는
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
airflow airflow 4 2022-11-23 01:16:32.069266856 +0000 UTC deployed airflow-1.6.0 2.3.0
이렇게 뜬다.
이거 1.6이야 2.3.0이야..?
근데 최신이 6.0.0 ..?