๋ฌธ์ ์ํฉ
๊ธฐ์กด RDS๋ง ์ด์ฉํ๋ ์ํคํ ์ฒ์์ S3->Glue->Athena๋ก ์ํคํ ์ฒ๋ฅผ ๋ฐ๊พธ์๋ค. ๋ฐ๋ผ์ ๊ธฐ์กด RDS์ ์๋ ๋ฐ์ดํฐ๋ค์ ๋ ์ง ํํฐ์ ๋ ํ์ฌ S3์ ๋ฃ๊ธฐ๋ก ํ์๋ค.
๊ทธ ์ ์, channel_history
ํ
์ด๋ธ์๋ ํฌ๋กค๋งํด์ DB์ insert ๋ ๋ ์ง๊ฐ ๊ธฐ๋ก๋์ด ์์์ง๋ง channel
ํ
์ด๋ธ์๋ ์์ด์ ๋์ค์ ์ปฌ๋ผ์ ์ถ๊ฐํด์ฃผ์๋ค(ํ
์ด๋ธ ์ค๊ณ๋ฅผ ์ ํ์ด์ผ ํ๋๋ฐ...). ๊ทธ๋ฌ๊ณ default๋ก current timestamp์ ํด์ฃผ์๋๋ ์ค์ ํฌ๋กค๋ง ๋ ๋ ์ง๊ฐ ์๋๋ผ ๊ทธ ์ปฌ๋ผ์ ๋ง๋ ๋ ์ง๋ก ์ปฌ๋ผ์ด ์ฑ์์ ธ์ ๋นํฉํ๋ค.
๊ทธ๋์ channel_history๋ channel ํ
์ด๋ธ์ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ๋ ์์งํ๋ค. ๋ผ๋ ์ ์ ํ์ channel_history
์ ๋ ์ง์ปฌ๋ผ์ ๊ณ ๋๋ก ๋ณต์ฌํด์ channel
ํ
์ด๋ธ์ ๋ ์ง์ปฌ๋ผ์ ์์ ํ๊ธฐ๋ก ํ์๋ค.
ํ์ฌ ์ํฉ
SELECT c.channel_id, c.crawl_update_at as channel_date, ch.crawl_update_at as channel_history_date FROM channel c JOIN channel_history ch ON c.channel_id=ch.channel_id;
๊ฐ์ ๋ฐ์ดํฐ์ด๊ณ ๊ฐ์ ๋ ์ DB์ ์ ์ฅ๋์์์๋ ๋ ํ ์ด๋ธ์ ๋ ์ง๊ฐ ๋ค๋ฅธ ๊ฒ์ ํ์ธ ํ ์ ์์๋ค.
์ฒซ ๋ฒ์งธ ๋ฌธ์ ํด๊ฒฐ
์ด๋ฅผ ํด๊ฒฐ ํ๊ธฐ์ํด
channel_history
ํ ์ด๋ธ์ ๋ ์ง ์ปฌ๋ผ์ ๋ณต์ฌํด์channel
ํ ์ด๋ธ์ ๋ฃ์ด์ฃผ์๋ค.
UPDATE channel c
JOIN (
SELECT channel_id, crawl_update_at
FROM channel_history
) ch
ON c.channel_id = ch.channel_id
SET c.crawl_update_at = ch.crawl_update_at;
๊ทธ ๋ค, ๊ฒฐ๊ณผ๋ฅผ ํ์ธํด๋ณด๋
์จ์๐ ๋ฌด์ฌํ ์ ๋ฐ์ดํธ ๋ ๊ฒ์ ํ์ธํ ์ ์์๋ค.
๋ ๋ฒ์งธ ๋ฌธ์ ํด๊ฒฐ
์ด์ ์๋ ๋ชฉ์ ์ด์๋ MySQL ๋ฐ์ดํฐ๋ฅผ S3์ ์ ์ฅํ๊ธฐ ์ํ ์์ ์ ์ํํด๋ณด์.
์ฐ์ , Athena๋ฅผ ์ฌ์ฉํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ S3์ ๋ ์ง๋ณ๋ก ํํฐ์
๋ํด์ ๋ฐ์ดํฐ๋ฅผ ๋ฃ์ด๋์ด์ผ ํ๋ค. ๊ทธ๋ฅผ ์ํด channel
ํ
์ด๋ธ๊ณผ channel_history
ํ
์ด๋ธ์ ๋ ์ง๋ณ๋ก csv๋ก export ํด์ฃผ์ด์ผ ํ๊ณ , ๊ทธ ์ ์ S3์ ๋ ์ง๋ณ๋ก ํด๋๋ฅผ ๋ง๋ค์ด๋์ผ ํ๋ค.
์ฐ์ , ํ์ฌ ๋ช ๊ฐ์ ๋ ์ง๊ฐ ์กด์ฌํ๋์ง ํ์ธํ๊ธฐ ์ํด ์ฟผ๋ฆฌ๋ฅผ ๋ ๋ ค๋ณด์๋ค.
SELECT DATE(crawl_update_at) AS date, COUNT(*) AS count
FROM channel
GROUP BY DATE(crawl_update_at)
ORDER BY 1;
ํ์ธ ๊ฒฐ๊ณผ, ์ด 6์ผ๋์ 6500์ฌ๊ฐ์ ์ฑ๋ ๋ฐ์ดํฐ๋ฅผ ์์งํ ๊ฒ์ ์ ์ ์์๋ค.
๋ฐ๋ผ์ ์๋์ ๊ฐ์ ์ฟผ๋ฆฌ๋ฅผ ํตํด ๋ ์ง๋ณ๋ก S3์ ๋ฃ์ด์ฃผ๋ ค ์๋ํ๋ค.
SELECT *
FROM channel
WHERE date(crawl_update_at)='2023-03-12'
INTO OUTFILE 's3://dothis-crawling-data/channel/dt=2023-03-12/channel.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';
ํ์ง๋ง...
ERROR 1227 (42000): Access denied; you need (at least one of) the FILE privilege(s) for this operation
๊ณ์ํด์ ์ด๋ฐ ์๋ฌ๋ฅผ ๋ฑ์ด๋ด๊ธธ๋ DB ๊ถํ์ ๊ฐ์ง๊ณ ์๋ ๋ฐฑ์๋์๊ฒ ๊ถํ์ ์์ฒญํ๋....ํ์ฌ ์ฐ๋ฆฌ๊ฐ ์ฌ์ฉํ๊ณ ์๋ RDS๋ ํ๋ฆฌํฐ์ด๋ผ MySQL 8.0์ ์ฐ๊ณ ์๋ค๊ณ ํ๋๋ฐ, ๊ทธ 8.0 ๋ฒ์ ์๋ OUTFILE ๊ธฐ๋ฅ์ด ์๋ค๊ณ ํ๋ค.....ใ aurora๋ฅผ ์จ์ผ ์๊ธด๋ค๊ณ ํ๋ ์ด์ฉ ์ ์์ด ๋ฐ์ดํฐ๋ฅผ ์๋์ผ๋ก ์ฎ๊ธฐ๊ธฐ๋ก ํ๋ค.
์ธ ๋ฒ์งธ ๋ฌธ์ ํด๊ฒฐ
๊ทธ๋ฐ๋ฐ, ์ฌ๊ธฐ์ ๊ฐ๊ณผํ๊ณ ๋์ด๊ฐ ๋ถ๋ถ์ด, ๋๋ถ๋ถ์ ๋ธ๋ก๊ทธ ๊ธ์์๋ csv๋ก S3์ ๋ฃ๊ธธ๋ ์ฐ๋ฆฌ๋ ๊ทธ๋ ๊ฒ ํ ๊น ํ๋๋, ์ฉ๋์ด ๋๋ฌด ์ปค์ง๊ธฐ๋ ํ๊ณ ๋น์ฉ๋ ๊ฐ๋น์ด ์ ๋ ๊ฒ ๊ฐ์๋ค. ๊ทธ๋์ ํ์ผ ํ์์ ๋ํ ๊ณ ๋ฏผ์ ์์ํ๋ค.
์ฐ์ , S3์ ๋ฃ์ ์ ์๋ ํ์ผ ํ์์ ์๋์ ๊ฐ๋ค.
๊ทธ๋ฆฌ๊ณ ์์ถ ๋ฐฉ์์ ์๋์ ๊ฐ๋ค.
์ฒ์ ๋ฐฑ์๋๋ถ์ ์ฃผ๋ก ์ฌ์ฉ๋๋ gzip-json ์์ถ-ํ์ผํ์ ๋ฐฉ์์ ์ ํํ์ ์ ์๋ ค์ฃผ์ จ๋ค. ํ์ง๋ง, ๋น ๋ฐ์ดํฐ๋ฅผ ์์ฃผ ์กฐํํ๊ณ ๋ค๋ฃจ๊ธฐ์๋ ๋ด ์๊ฐ์ snappy-parquet๋ ๋์์ง ์์ ๊ฒ ๊ฐ์๋ค. ๋ฌด์๋ณด๋ค ์์ธ ๋ฐ์ดํฐ๋ฅผ ์์ฃผ ์กฐํํ๊ฑฐ๋ ์์ผ๋ก ๋ถ์๋ฉด์์ ์ฌ๋ฌ๋ฒ ๊ฑด๋ค์ด๊ณ ์ถ์ ๋ด ์ ์ฅ์์ ์ ์๋ณด๋จ ํ์์ธ ์ชฝ์ด ์๋๋ฉด์์๋, ๋น์ฉ๋ฉด์์๋ ๋์ ๊ฒ ๊ฐ์ ๋น๊ต ๋ถ์ ํด๋ณด๊ธฐ๋ก ํ๋ค.
# pip install pyarrow python-snappy
import gzip, snappy, time, os, json
import pyarrow as pa
import pyarrow.parquet as pq
import json, pandas as pd
from pathlib import Path
# ํ์ผ ํฌ๊ธฐ ๋ณํ ํจ์
def convert_size(size_bytes):
import math
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
BASE_PATH = Path('์๋ณธํ์ผ๊ฒฝ๋ก')
logfile_path = BASE_PATH/'ํ์ผ๋ช
.json'
gzipfile = BASE_PATH/'ํ์ผ๋ช
.gz'
pqfile = BASE_PATH/'ํ์ผ๋ช
.parquet'
file_size = os.path.getsize(logfile_path)
print('์๋ณธjson ํ์ผ ํฌ๊ธฐ : ', convert_size(file_size), 'bytes')
# json ํ์ผ์ -> gzip ์ผ๋ก ์์ถ .gz ํ์ผ๋ก ๋ณํํด์ ์ ์ฅ
start = time.time()
with open(logfile_path,'rb') as f_in:
with gzip.open(gzipfile,'wb') as f_out:
f_out.writelines(f_in)
end = time.time()
print(f"gzip์ฒ๋ฆฌ ์๊ฐ : {end - start:.5f} ์ด")
print(f"gzip ์์ถ ํ ํ์ผ ํฌ๊ธฐ : ", convert_size(os.path.getsize(gzipfile)), 'bytes')
# json ํ์ผ์ -> snappy๋ก ์์ถ .parquet ํ์ผ๋ก ๋ณํํด์ ์ ์ฅ
start = time.time()
with open(logfile_path, 'r', encoding='utf-8') as f:
data = json.load(f)
df = pd.json_normalize(data)
pq.write_table(
pa.Table.from_pandas(df),
pqfile,
compression='snappy'
)
end = time.time()
print(f"snappy์ฒ๋ฆฌ ์๊ฐ : {end - start:.5f} ์ด")
print(f"snappy ์์ถ ํ ํ์ผ ํฌ๊ธฐ : ", convert_size(os.path.getsize(pqfile)), 'bytes')
๊ทธ ๊ฒฐ๊ณผ, ์๋์ ๊ฐ์ด ๋์๋ค.
๊ฒฐ๊ตญ ์ฐ๋ฆฌ๋ ์ฉ๋๋ณด๋ค๋ ์ฒ๋ฆฌ ์๋์ ๋ ๋น์ค์ ๋๊ธฐ๋ก ํ๊ณ snappy-parquet
ํ์ด๋ฅผ ์ ํํ๊ธฐ๋ก ํ๋ค.
๋ค์ ๋ ๋ฒ์งธ ๋ฌธ์ ํด๊ฒฐ
๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ณ MySQL Workbench์์ ์ง์ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ jsonํ์ผ๋ก exportํ๋ ๋ฐฉ๋ฒ์ ํํด, snappy๋ฅผ ์ด์ฉํด parquet ํ์ผ๋ก ๋ณํํด์ฃผ๊ณ ์๋์ผ๋ก S3์ ์ ๋ก๋ ํ์๋ค.
import pyarrow as pa
import pyarrow.parquet as pq
import json, pandas as pd
from pathlib import Path
'''
json ํ์ผ์ snappy๋ก ์์ถํ์ฌ parquet ํ์ผ๋ก ๋ณํํ๋ ์ฝ๋
'''
BASE_PATH = Path('json ํ์ผ์ด ๋ค์ด์๋ ํด๋')
pq_dir = BASE_PATH / 'parquet'
pq_dir.mkdir(exist_ok=True)
for logfile_path in BASE_PATH.glob('*.json'):
data = []
with open(logfile_path, 'r', encoding='utf-8') as f:
# ์์ง๋จ์์ ๊ฐํ๋ฌธ์๋ฅผ ์ ๊ฑฐํ์ง ์์๊ณ , db์์ ๊ฐ์ ๋นผ์ค๋ค๋ณด๋ null๊ฐ์ด ์์ด์ json.load๋ฅผ ์ธ ์ ์์๋ค.
decoder = json.JSONDecoder()
buffer = ""
for line in f:
buffer += line.strip()
try:
while buffer:
obj, idx = decoder.raw_decode(buffer)
data.append(obj)
buffer = buffer[idx:].lstrip()
except json.JSONDecodeError:
pass
df = pd.json_normalize(data)
pqfile = pq_dir / (logfile_path.stem + '.parquet')
pq.write_table(
pa.Table.from_pandas(df),
pqfile,
compression='snappy'
)