airflow에서 aws lambda 실행하기

마가·2022년 11월 25일
0

trial-error

목록 보기
14/19

https://www.teamdatascience.com/post/how-to-quickly-set-up-airflow-to-invoke-lambda
를 참조해서 하려고 했는데... 안된다!
2020년 글이니 당연하겠지...만 인터넷에 있는 example이 존재하는 여러 수단중 유일하게 dag parsing이 되어 이걸 기반으로 수행하였다.

[2022-11-25, 17:15:20 KST] {standard_task_runner.py:97} ERROR - Failed to execute job 11182 for task lambda1 (__init__() got multiple values for argument 'aws_conn_id'; 17)

아마 내용이 다를텐데 이런 에러가 뜬다.

그래서 막 원인을 찾다가 airflow 코드를 보니...
AwsLambdaHook의 조상인 AwsGenericHook에서 첫번째 인자는 aws_conn_id 이다.
그러니 저 글대로 선언하면 안된다.

def lambda_fail(ds,**kwargs):       
        hook = AwsLambdaHook('myAirflowTest', 
                              region_name='ap-northeast-2',        
                              log_type='None',
                              qualifier='$LATEST',
                              invocation_type='RequestResponse',
                              config=None,
                              aws_conn_id='my_lambda')
        response_1 = hook.invoke_lambda(payload='null')
        print ('Response--->' , response_1)

def lambda_success(ds, **kwargs):
    hook = AwsLambdaHook(
        aws_conn_id='my_lambda',
        region_name='ap-northeast-2',
    )
    response_1 = hook.invoke_lambda(
        function_name='myAirflowTest',
        payload='null'
    )
    print('Response--->', response_1)

function_name 은 AwsLambdaHook 의 생성자에서 쓰이지 않고 hook.invoke_lambda 함수에서 쓰인다.

이제 남은 관문은 aws_conn_id에 맞는 airflow connection을 만드는 것이다.

web ui 버전마다 다른데, connections에 들어가서
connection id는 dag에서 사용하는 이름으로,
connection Type은 amazone web services 로
extra에 {"aws_access_key_id": "", "aws_secret_access_key": ""}
를 입력하면 된다.

해당 키는 IAM에서 AWSLambda_FullAccess 를 가지고 있는 사용자의 키를 사용해야 한다.

이걸 하려고 오만 문서를 보고 했는데 안된다.
AwsLambdaInvokeFunctionOperator 는 dag parse에 성공한 적이 없다.

invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
    task_id='setup__invoke_lambda_function',
    function_name=LAMBDA_FUNCTION_NAME,
    payload=SAMPLE_EVENT,
)

공식문서에 있는 위 내용으로 dag를 만들었는데 안 되었을때의 기분이란.
2022년 7월 19일에 시작해서 10월 24일에 추가되었던데, 예제를 찾을 수 없다.

사용한 버전이 뭔지 모르겠는데 helm chart에서는

NAME   	NAMESPACE	REVISION	UPDATED                                	STATUS  	CHART        	APP VERSION
airflow	airflow  	4       	2022-11-23 01:16:32.069266856 +0000 UTC	deployed	airflow-1.6.0	2.3.0

이렇게 뜬다.
이거 1.6이야 2.3.0이야..?
근데 최신이 6.0.0 ..?

profile
마음 가는 길은 죽 곧은 길

0개의 댓글