[Airflow] PostgresHook에서 cursor 변경하기

owlur·2021년 8월 3일
1

airflow

목록 보기
2/2

connections에서 cursor 지정하기

Airflow의 connections에서 conn_type을 Postgres로 추가하게되면 PostgresHook을 쓸 수 있게된다

connection을 추가할 때 extra에 {"cursor": "realdictcursor"}처럼 json타입으로 cursor를 지정해줄 수 있고 아래의 코드에서 보이듯이 3가지 타입을 지원한다

        if _cursor == 'dictcursor':
            return psycopg2.extras.DictCursor
        if _cursor == 'realdictcursor':
            return psycopg2.extras.RealDictCursor
        if _cursor == 'namedtuplecursor':
            return psycopg2.extras.NamedTupleCursor

다만 이렇게 추가하면 문제가 생기는데 SqlSensor에서 이 connection을 이용할 수 없게 된다

SqlSensor의 poke() 코드를 보면

def poke(self, context):
        hook = self._get_hook()

        self.log.info('Poking: %s (with parameters %s)', self.sql, self.parameters)
        records = hook.get_records(self.sql, self.parameters)
        if not records:
            if self.fail_on_empty:
                raise AirflowException("No rows returned, raising as per fail_on_empty flag")
            else:
                return False
        first_cell = records[0][0]
        if self.failure is not None:
            if callable(self.failure):
                if self.failure(first_cell):
                    raise AirflowException(f"Failure criteria met. self.failure({first_cell}) returned True")
            else:
                raise AirflowException(f"self.failure is present, but not callable -> {self.failure}")
        if self.success is not None:
            if callable(self.success):
                return self.success(first_cell)
            else:
                raise AirflowException(f"self.success is present, but not callable -> {self.success}")
        return bool(first_cell)

여기서 첫번째 셀을 가져오기 위해

first_cell = records[0][0]

number index를 이용하게 되는데 이 경우 위의 지정된 cursor들은 모두 이용할 수 없다.

이를 위해 connections에 sensor용과 postgrehook용 connection을 따로 만들 수도 있으나 이는 너무 비효율적이므로 connections의 extra는 비워두고 코드에서 cursor를 변경하는 방법을 알아보자

코드에서 cursor 수정하기

검색으로는 찾을수가 없어 Connection코드까지 뜯어 봤는데 덕분에 구조를 꽤나 자세히 알게 된거 같다!

PostgresHook에서는 이 커서를 변경하는 방법을 제공하지 않으므로 PostgresHook이 extra정보를 가져오는 Connection객체에서부터 수정해주면 된다

airflow_conn = PostgresHook.get_connection(conn_id)
airflow_conn.extra = '{"cursor": "realdictcursor"}'
postgre_hook = PostgresHook(connection=airflow_conn)

connection을 얻어온 후 extra를 수정해주고 이 connection을 PostgreHook을 만들 때 넣어주면 우리가 원하는 cursor로 수정할 수 있다

profile
개발자

0개의 댓글