Elasticsearch를 사용하는 회사의 데이터를 분석할 일이 있다고해서 ES의 데이터를 PosgreSQL(AgensGraph)로 적재해달라는 요청에 사내 인원이 해당 코드를 작성했습니다. 흥미롭고 유용한 정보기에 공유합니다.
[테스트 환경]
분류 | 하위 분류 | 버전 |
---|---|---|
ElasticSearch | 7.9.3(docker) | |
PostgreSQL | 13.9 | |
python | 3.11.1 | |
elasticsearch | 7.17.9 | |
elasticsearch_dsl | 7.4.1 | |
psycopg2 | 2.9.5 |
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search
import psycopg2.extras
import csv
import json
#from crawler import *
# Elasticsearch connection info
es_host = "192.xxx.xx.xx"
es_port = "9200"
es_dbname = "news" # index
es_user = "elastic"
es = Elasticsearch([es_host], port=es_port, timeout=30*1)
# Agensgraph connection info
ag_host = "192.xxx.xx.xx"
ag_port = "5432"
ag_dbname = "postgres"
ag_user = "postgres"
ag_pw = "1234"
conn = psycopg2.connect(host=ag_host, port=ag_port, dbname=ag_dbname, user=ag_user, password=ag_pw)
# INSERT_ES는 테스트 환경을 만들기 위해 ES에 데이터를 적재하는 함수
def INSERT_ES():
file_path = 'C:\\Users\\lee\\Desktop\\file\\'
print('tagclass')
index='tagclass'
if es.indices.exists(index=index):
pass
else:
es.indices.create(index=index)
docs = []
with open(file_path+'tagclass_0_0.csv', newline='', encoding='UTF8') as csvfile:
csv1 = csv.reader(csvfile, delimiter='|', quotechar='"')
for row in csv1:
docs.append({
'_index': index,
'_source': {
'id': row[0],
'name': row[1],
'url': row[2]
}
})
helpers.bulk(es, docs)
print('tag')
index='tag'
if es.indices.exists(index=index):
pass
else:
es.indices.create(index=index)
docs = []
with open(file_path2+'tag_0_0.csv', newline='', encoding='UTF8') as csvfile:
csv2 = csv.reader(csvfile, delimiter='|', quotechar='"')
for row in csv2:
docs.append({
'_index': index,
'_source': {
'id': row[0],
'name': row[1],
'url': row[2]
}
})
helpers.bulk(es, docs)
print('comment')
index='comment'
if es.indices.exists(index=index):
pass
else:
es.indices.create(index=index)
docs = []
with open(file_path1+'comment_0_0.csv', newline='', encoding='UTF8') as csvfile:
csv3 = csv.reader(csvfile, delimiter='|', quotechar='"')
for row in csv3:
docs.append({
'_index': index,
'_source': {
'id': row[0],
'creationDate': row[1],
'locationIP': row[2],
'browserUsed': row[3],
'content': row[4],
'length': row[5]
}
})
helpers.bulk(es, docs)
print('post')
index='post'
if es.indices.exists(index=index):
pass
else:
es.indices.create(index=index)
docs = []
with open(file_path1+'post_0_0.csv', newline='', encoding='UTF8') as csvfile:
csv4 = csv.reader(csvfile, delimiter='|', quotechar='"')
for row in csv4:
docs.append({
'_index': index,
'_source': {
'id': row[0],
'imageFile': row[1],
'creationDate': row[2],
'locationIP': row[3],
'browserUsed': row[4],
'language': row[5],
'content': row[6],
'length': row[7]
}
})
helpers.bulk(es, docs)
# ES의 데이터를 AG에 적재하는 함수
def INSERT_AG():
# index 키 출력
index_list = list(es.indices.get('*').keys())
# index별 상세데이터 출력
for idx in index_list:
index = str(idx)
page = es.search(index = index, scroll = '20m', body={'from':0, 'size':10000})
scroll_size = page['hits']['total']['value']
sid = page['_scroll_id'] # ES 데이터 조회 후 필요시 변경
for i in range(scroll_size):
if scroll_size == 0:
print('scroll_size == 0')
break
scroll_size = len(page['hits']['hits'])
argslist = [(index, json.dumps(j['_source'])) for j in page['hits']['hits']]
cur = conn.cursor()
sql = "INSERT INTO public.tb_es (idx_name, log) VALUES(%s, %s)"
psycopg2.extras.execute_batch(cur, sql, argslist)
conn.commit()
# 다음 scroll을 위해 scroll_id 업데이트
page = es.scroll(scroll_id = sid, scroll = '20m')
sid = page['_scroll_id']
conn.close()
# 엘라스틱서치로의 샘플데이터 적재
INSERT_ES()
# ES의 데이터를 읽어서 AG로 적재
INSERT_AG()
아주 유익한 내용이네요!