[Airflow] 8. Airflow plugin 생성하기

Denver·2022년 9월 11일
0

Airflow

목록 보기
10/11
post-thumbnail

airflow 2.0에서는 plugin system에 operator 를 추가해야한다.
존재하는 operator를 확장, 수정해서

Airflow 장점이 operator, views, hooks 등 모든 것을 커스텀할 수 있다는 점이다.

생성 방법
AirflowPlugin 클래스를상속하는 View, Operator,hook... 클래스를 생성한다
plugin 이름 등 속성을 설정한다
생성 후 Lazy Loaded 이기때문에 airflow 인스턴스를 재시작해야한다.

  1. web애서 elastic connection 생성
  2. plugins/hooks/elastic/elastic_hook.py 생성
from airflow.plugins_manager import AirflowPlugin
# 모든 hook은 BaseHook 상속해서 메서드, property 등
from airflow.hooks.base import BaseHook

from elasticsearch import Elasticsearch

class ElasticHook(BaseHook):

    def __init__(self, conn_id='elastic_default', *args, **kwargs):
        super().__init__(*args, **kwargs)
        conn = self.get_connection(conn_id)

        conn_config = {}
        hosts = []

        if conn.host:
            hosts = conn.host.split(',')
        if conn.port:
            conn_config['port'] = int(conn.port)
        if conn.login:
            conn_config['http_auth'] = (conn.login, conn.password)

        self.es = Elasticsearch(hosts, **conn_config)
        self.index = conn.schema

    def info(self):
        return self.es.info()

    def set_index(self, index):
        self.index = index

    def add_doc(self, index, doc_type, doc):
        self.set_index(index)
        res = self.es.index(index=index, doc_type=doc_type, doc=doc)
        return res


class AirflowElasticPlugin(AirflowPlugin):
    name = 'elastic'
    hooks = [ElasticHook]
profile
까먹었을 미래의 나를 위해

0개의 댓글