Step 3. Splunk에서 데이터를 추출해 그 결과를 외부 API와 결합한 결과 저장하기 / Splunk / Airflow

Munang·2024년 12월 22일
0

Airflow

목록 보기
3/4

오늘은 Splunk에서 IP 데이터를 추출해 그 결과를 whois 에 조회한 후 Splunk Index에 다시 Collect 하는 프로세스를 구축해볼 것 이다.

이는 DAG로 여러개의 Operation을 작성하면 되는데, 그냥 파이썬 코드를 작성하면 된다.

환경은 로컬 환경에서 했기 때문에 로컬에 Splunk와 Airflow가 설치되어있고 실행중이어야 한다.

Step 1~2를 따라왔다면, Splunk만 준비가 되어있으면 된다. 만약 준비가 안되어있다면 Step1을 통해 Airflow를 설치 후 이번 스텝을 따라오는게 좋다.

0. whois API Key 발급받기

whois 기능을 OpenAPI 기능으로서 사용하려면 아래의 공공데이터 포털에 가입한 후 진행해야 한다.

1) 공공데이터 포털 가입

https://www.data.go.kr/index.do

2) 가입 후 whois 검색 후 활용 신청

3) Key 확인

활용 신청 후 승인은 자동으로 된다. 이후에는 마이페이지에 접속한다
마이페이지 > 데이터 활용 > Open API > 인증키 발급현황 경로로 이동한다.

이후에는 발급된 API 키를 확인할 수 있다. 별도로 메모장에 복붙해준다.

1. DAG 작성

1) Chat gpt 님에게 작성해달라고 요청 후 복붙

cd airflow/ ## airflow가 있는 디렉토리로 이동한다. 
cd dags/
vi whois.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import splunklib.client as client
import splunklib.results as results
import json

# Splunk 연결 정보
SPLUNK_HOST = 'host.docker.internal' #실제 통신이 docker-> localhost로 진행됨으로, 도커에서 나의 PC를 호출할 때에는 host.docker.internal이라고 . 
# SPLUNK_HOST = 'localhost' # 테스트 할 때에는 localhost
SPLUNK_PORT = '8089'
SPLUNK_USERNAME = 'admin'
SPLUNK_PASSWORD = '' ## Splunk 저장 시 어느 Sourcetype에 저장할지
SPLUNK_INDEX = 'main' ## Splunk 저장 시 어느 인덱스에 저장할지

# KISA Whois API URL
WHOIS_API_URL = 'https://apis.data.go.kr/B551505/whois/ip_address'
WHOIS_API_KEY = '여기에 키 값 입력해주세요. 그대로 복붙하면 안됩니다.'	

# Splunk에서 데이터 읽기
def fetch_splunk_data():
    service = client.connect(
        host=SPLUNK_HOST,
        port=SPLUNK_PORT,
        username=SPLUNK_USERNAME,
        password=SPLUNK_PASSWORD
    )

    kwargs_oneshot = {"earliest_time": "-3mon@mon"}
    searchquery_oneshot = "search index=main |  stats c by dstip, index | dedup dstip|  head 10 | rename dstip as ip"
    oneshotsearch_results = service.jobs.oneshot(searchquery_oneshot, **kwargs_oneshot)
    reader = results.ResultsReader(oneshotsearch_results)

    ip_list = [result['ip'] for result in reader if 'ip' in result]
    return ip_list

# KISA Whois API 호출
def fetch_whois_info(**kwargs):
    ti = kwargs['ti']
    ip_list = ti.xcom_pull(task_ids='fetch_splunk_data')
    whois_results = []

    for ip in ip_list:
        params = {
            'serviceKey': WHOIS_API_KEY,
            'query': ip,
            'answer': 'json'
        }

        response = requests.get(WHOIS_API_URL, params=params)
        if response.status_code == 200:
            whois_results.append({"ip": ip, "whois_data": response.json()})
        else:
            whois_results.append({"ip": ip, "whois_data": {"error": f"Failed to fetch data for {ip}"}})

    ti.xcom_push(key='whois_results', value=whois_results)

# 결과를 Splunk에 저장
def send_to_splunk(**kwargs):
    ti = kwargs['ti']
    whois_results = ti.xcom_pull(task_ids='fetch_whois_info', key='whois_results')
    service = client.connect(
        host=SPLUNK_HOST,
        port=SPLUNK_PORT,
        username=SPLUNK_USERNAME,
        password=SPLUNK_PASSWORD
    )
    index = service.indexes[SPLUNK_INDEX]

    for result in whois_results:
        index.submit(json.dumps(result), sourcetype='whois_api_result')

# Airflow DAG 정의
with DAG(
    dag_id='splunk_to_kisa_whois',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='@daily',
    catchup=False
) as dag:

    fetch_data = PythonOperator(
        task_id='fetch_splunk_data',
        python_callable=fetch_splunk_data
    )

    fetch_whois = PythonOperator(
        task_id='fetch_whois_info',
        python_callable=fetch_whois_info,
        provide_context=True
    )

    send_results = PythonOperator(
        task_id='send_to_splunk',
        python_callable=send_to_splunk,
        provide_context=True
    )

    fetch_data >> fetch_whois >> send_results

이후 :wq!를 입력해 저장 후 나와준다.

  • splunklib에서 제공하는 API는 splunk의 무료버전이면 자격증명이 필요하지 않다. 그래서 ID:PWadmin:으로 입력해주면 된다.
  • 현재 테스트에서는 local에 splunk가 있고, docker에서 splunk를 호출한다. 따라서 hostname을 host.docker.internal라고 해줘야한다.

2-1) 일부 도커에 미설치된 splunk 모듈 설치하기 (일시적 해결 방안)

splunlib 라이브러리는 설치되지 않아서 별도의 설치작업이 필요했다.
도커 내부로 진입해 종속성 라이브러리를 설치해줘야 한다.

도커 진입

  • 현재 실행중인 이미지 확인
docker ps

  • airflow-webserver_1 이미지로 진입
docker exec -it CONTAINER ID /bin/sh
## 로컬 환경의 /bin/sh 쉘을 이용해 CONTAINER ID에 해당되는 이미지에 진입하겠다 라는 의미이다.


이렇게 하면 아래와 같이 airflow내부의 터미널로 진입했다.

라이브러리 설치

라이브러리 설치는 반드시 아래의 순서로 진행해준다.

pip install pycryptodome
pip install splunklib --no-deps ## 종속성 없이 splunklib 설치
pip install splunk-sdk

설치가 완료되면 airflow의 도커 진입 화면에서 import splunklib를 실행해본다.

지금의 경우 1회성 테스트 이기 때문에 이런식으로 해결해도 괜찮다. 하지만 실제 운영환경 에서는 도커를 올릴 때마다 이러한 의존성 패키지들이 자동으로 설치 되게끔 해줘야한다.

2-2) 일부 도커에 미설치된 splunk 모듈 설치하기 (영구적 해결 방안) - Dockerfile 작성 및 적용하기

Dockerfile 작성하기

cd airflow/ ## airflow가 있는 디렉토리로 이동한다. 
vi Dockerfile

Dockerfile을 생성해준다. 관련 내용은 이 블로그의 게시글을 참고했다.

파일의 내용은 아래로 설정해줬다.

FROM apache/airflow:2.5.1

RUN pip install pycryptodome && \
    pip install splunklib --no-deps && \
    pip install splunk-sdk

Run 이후를 직접 실행해야 하는 커맨드로 넣어 주고, 커맨드와 커맨드 사이는 &&을 이용해 연속하여 실행하게 끔 하였다.

FROM 이후 나와있는 내용은 도커 이미지인데, 이는 설치시 저장되었던 에어플로우 이미지와 동일한 네이밍이다. 이것은 docker-compose.yaml 파일에서 찾을 수 있다.

vi docker-compose.yaml
/image: ${AIRFLOW_IMAGE_NAME ## image: ${AIRFLOW_IMAGE_NAME로 시작하는 문자열 찾기

그러면 아래와 같이 네이밍 확인이 가능하다. 각자 개인에 맞는 네이밍으로 입력해주면 된다.

Dockerfile 적용해주기

vi docker-compose.yaml
/build:  ## build: 로 시작하는 문자열 찾기

도커를 빌드 할 때에 현재 디렉토리(.) 에서 도커파일을 참조해 빌드한다는 의미로 기본적으로는 주석 처리 되어있는 부분이다. 주석을 지워 활성화 하면 자동으로 현재 디렉토리에 있는 Docker file을 참조해 이미지를 빌드해준다.

build: . 부분의 주석을 지워준다.

이후에는 아래의 명령어를 입력해준다.

docker compose up -d

자동으로 패키지가 설치되는 것을 확인할 수 있다.

2-3) 일부 도커에 미설치된 splunk 모듈 설치하기 (영구적 해결 방안) - requirements.txt를 이용한 Dockerfile 작성 및 적용하기

2-2의 방법은 파이썬 패키지가 몇 가지 안되었을 경우에 활용하면 좋다. 구독자님이 requirements.txt를 활용한 방법을 추천해줘서 이 방안도 기록해두려고 한다.

requirements.txt 파일 생성하기

파이썬에서 제공하는 패키지 인스톨러인 pip를 이용해 txt 파일을 참조해서 한번에 설치할 수 있는 방법이다.

원초적인 방법은 기존에 설치된 환경에서 pip freeze, pip list 를 requirements.txt로 떨구지만, 현재 도커 환경에서 필요한 패키지는 splunklib만 필요하기 때문에 이 라이브러리에 필요한 의존성 라이브러리만 설치해주면 된다.

따라서 이 목록을 따로 만들어서 requirements.txt로 만들면 된다.

수동으로 만들어주기

의존성 없이 설치해야하는 패키지도 있기 때문에 2개를 만들어 줘야 한다.

vi requirements.txt

이후에는 아래 내용을 입력해준다.

pycryptodome==3.21.0
splunk-sdk==2.1.0

의존성 없이 설치하는 txt파일도 만들어준다.

vi requirements_nodep.txt

이후에는 아래 내용을 입력해준다.

splunklib==1.0.0

Dockerfile 작성하기

이후에는 Dockerfile을 수정해준다.

FROM apache/airflow:2.5.1

RUN pip install --no-deps -r requirements_nodeps.txt && \
    pip install -r requirements.txt

Dockerfile 적용해주기

vi docker-compose.yaml
/build:  ## build: 로 시작하는 문자열 찾기

도커를 빌드 할 때에 현재 디렉토리(.) 에서 도커파일을 참조해 빌드한다는 의미로 기본적으로는 주석 처리 되어있는 부분이다. 주석을 지워 활성화 하면 자동으로 현재 디렉토리에 있는 Docker file을 참조해 이미지를 빌드해준다.

build: . 부분의 주석을 지워준다.

이후에는 아래의 명령어를 입력해준다.

docker compose up -d

자동으로 패키지가 설치되는 것을 확인할 수 있다.

  • pip-tools로 만들어주기(지금은 안됨. 참고로 이런 방법이 있다는 것만 보세요.)

본래는 pip-tools로 바로 만들어주면 되는데, splunklib는 의존성 문제가 있어서 pycrypto가 아니라 다른 라이브러리를 설치해줘야 한다. 이걸 splunk에서 아직 업데이트를 안한건지 계속 자동으로 pycrypto를 끌고와서 결국 수동으로 바꿔줘야한다.

찾아보니 아직 업데이트 안한거같다. splunklib 설치 시 pip에서 계속 pycryptodome가 아닌 pycrypto를 끌고 오는 문제가 커뮤니티 게시글도 2024년에 올라와있다. pypi도 뒤져보니 목록이 업데이트가 안된듯 하다;;

아무튼 상황이 이렇기 때문에 일단 방법만 참고하면 아래와 같다!!

pip-tools를 설치하면, 특정 라이브러리에 필요한 모든 의존성 라이브러리를 찾아서 requirements.txt로 만들 수 있다.

pip install pip-tools ## 패키지 설치해주기 
echo "splunklib" > requirements.in 
pip-compile requirements.in ## requirements.in에 필요한 라이브러리 명만 기재해주면 이를 참조하여 자동으로 생성해준다. 

이후에 디렉토리를 보면 아래와 같이 requirements.txt가 생성되어있는 모습을 볼 수 있다.

내용은 아래와 같다.

이 requirements.txt를 이용해 위에서 봤던 Dockerfile에 작성&적용해주면 된다.

2. Airflow 제대로 실행되는지 확인하기

1) 에어플로우 스케줄러 실행

airflow scheduler

위의 명령어를 실행 후 에어플로우 사이트에 들어가 내가 작성한 DAG가 잘 업로그 되었는지 확인한다.

2) DAGs 실행하기

위에서 확인한 DAGs명을 클릭해서 들어간 후 아래의 플레이 버튼에서 Trigger DAG를 눌러준다.

3) 정상 실행 확인

일단 내가 작성한 DAG는 3개의 Flow로 되어있어 3개의 박스가 확인된다.
3개모두 초록색이 뜬 것을 보면 된다.

4) splunk 접속 후 적재된 데이터 확인

위에서 코드에 main 인덱서의 whois_api_result 소스코드로 적재한다고 지정했다. 따라서 아래와 같이 커맨드를 입력해주면

성공이다!

3. 사투의 흔적..


나의 마음을 아프게 한 붉은색 네모들.. 별거 아닐거라 생각했는데 정말 오래 걸렸다.

0개의 댓글