Elasticsearch to Postgresql

Ja L·2023년 7월 19일
0

ElasticSearch

목록 보기
1/1

Elasticsearch를 사용하는 회사의 데이터를 분석할 일이 있다고해서 ES의 데이터를 PosgreSQL(AgensGraph)로 적재해달라는 요청에 사내 인원이 해당 코드를 작성했습니다. 흥미롭고 유용한 정보기에 공유합니다.



[테스트 환경]

분류하위 분류버전
ElasticSearch7.9.3(docker)
PostgreSQL13.9
python3.11.1
elasticsearch7.17.9
elasticsearch_dsl7.4.1
psycopg22.9.5


from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search
import psycopg2.extras
import csv
import json

#from crawler import *

# Elasticsearch connection info
es_host = "192.xxx.xx.xx"
es_port = "9200"
es_dbname = "news" # index
es_user = "elastic"
es = Elasticsearch([es_host], port=es_port, timeout=30*1)

# Agensgraph connection info
ag_host = "192.xxx.xx.xx"
ag_port = "5432"
ag_dbname = "postgres"
ag_user = "postgres"
ag_pw = "1234"
conn = psycopg2.connect(host=ag_host, port=ag_port, dbname=ag_dbname, user=ag_user, password=ag_pw)

# INSERT_ES는 테스트 환경을 만들기 위해 ES에 데이터를 적재하는 함수
def INSERT_ES():
    file_path = 'C:\\Users\\lee\\Desktop\\file\\'
    print('tagclass')
    index='tagclass'
    if es.indices.exists(index=index):
        pass
    else:
        es.indices.create(index=index)
    docs = []
    with open(file_path+'tagclass_0_0.csv', newline='', encoding='UTF8') as csvfile:
        csv1 = csv.reader(csvfile, delimiter='|', quotechar='"')
        for row in csv1:
            docs.append({
                '_index': index,
                '_source': {
                    'id': row[0],
                    'name': row[1],
                    'url': row[2]
                }
            })
    helpers.bulk(es, docs)
    
    print('tag')
    index='tag'
    if es.indices.exists(index=index):
        pass
    else:
        es.indices.create(index=index)
    docs = []
    with open(file_path2+'tag_0_0.csv', newline='', encoding='UTF8') as csvfile:
        csv2 = csv.reader(csvfile, delimiter='|', quotechar='"')
        for row in csv2:
            docs.append({
                '_index': index,
                '_source': {
                    'id': row[0],
                    'name': row[1],
                    'url': row[2]
                }
            })
    helpers.bulk(es, docs)
    
    print('comment')
    index='comment'
    if es.indices.exists(index=index):
        pass
    else:
        es.indices.create(index=index)
    docs = []
    with open(file_path1+'comment_0_0.csv', newline='', encoding='UTF8') as csvfile:
        csv3 = csv.reader(csvfile, delimiter='|', quotechar='"')
        for row in csv3:
            docs.append({
                '_index': index,
                '_source': {
                    'id': row[0],
                    'creationDate': row[1],
                    'locationIP': row[2],
                    'browserUsed': row[3],
                    'content': row[4],
                    'length': row[5]
                }
            })
    helpers.bulk(es, docs)
    
    print('post')
    index='post'
    if es.indices.exists(index=index):
        pass
    else:
        es.indices.create(index=index)
    docs = []
    with open(file_path1+'post_0_0.csv', newline='', encoding='UTF8') as csvfile:
        csv4 = csv.reader(csvfile, delimiter='|', quotechar='"')
        for row in csv4:
            docs.append({
                '_index': index,
                '_source': {
                    'id': row[0],
                    'imageFile': row[1],
                    'creationDate': row[2],
                    'locationIP': row[3],
                    'browserUsed': row[4],
                    'language': row[5],
                    'content': row[6],
                    'length': row[7]
                }
            })
    helpers.bulk(es, docs)

# ES의 데이터를 AG에 적재하는 함수
def INSERT_AG():
    # index 키 출력
    index_list = list(es.indices.get('*').keys())
    
    # index별 상세데이터 출력
    for idx in index_list:
        index = str(idx)
        page = es.search(index = index, scroll = '20m', body={'from':0, 'size':10000})
        scroll_size = page['hits']['total']['value']
        sid = page['_scroll_id'] # ES 데이터 조회 후 필요시 변경
        
        for i in range(scroll_size):
            if scroll_size == 0:
                print('scroll_size == 0')
                break
            scroll_size = len(page['hits']['hits'])
            argslist = [(index, json.dumps(j['_source'])) for j in page['hits']['hits']]

            cur = conn.cursor()
            sql = "INSERT INTO public.tb_es (idx_name, log) VALUES(%s, %s)"
            psycopg2.extras.execute_batch(cur, sql, argslist)
            conn.commit()
            
            # 다음 scroll을 위해 scroll_id 업데이트
            page = es.scroll(scroll_id = sid, scroll = '20m')
            sid = page['_scroll_id']
    conn.close()

# 엘라스틱서치로의 샘플데이터 적재
INSERT_ES()

# ES의 데이터를 읽어서 AG로 적재
INSERT_AG()
profile
DB Engineer

1개의 댓글

comment-user-thumbnail
2023년 7월 19일

아주 유익한 내용이네요!

답글 달기