[MySQL๐Ÿ‘‰S3]๋ฐ์ดํ„ฐ ์ด์ „

ํฌ๋™๋™ยท2023๋…„ 4์›” 17์ผ
1

[์œ ํŠœ๋ธŒ ํ”„๋กœ์ ํŠธ]

๋ชฉ๋ก ๋ณด๊ธฐ
6/13

๋ฌธ์ œ ์ƒํ™ฉ

๊ธฐ์กด RDS๋งŒ ์ด์šฉํ•˜๋Š” ์•„ํ‚คํ…์ฒ˜์—์„œ S3->Glue->Athena๋กœ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๋ฐ”๊พธ์—ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๊ธฐ์กด RDS์— ์žˆ๋˜ ๋ฐ์ดํ„ฐ๋“ค์„ ๋‚ ์งœ ํŒŒํ‹ฐ์…”๋‹ ํ•˜์—ฌ S3์— ๋„ฃ๊ธฐ๋กœ ํ•˜์˜€๋‹ค.

๊ทธ ์ „์—, channel_history ํ…Œ์ด๋ธ”์—๋Š” ํฌ๋กค๋งํ•ด์„œ DB์— insert ๋œ ๋‚ ์งœ๊ฐ€ ๊ธฐ๋ก๋˜์–ด ์žˆ์—ˆ์ง€๋งŒ channel ํ…Œ์ด๋ธ”์—๋Š” ์—†์–ด์„œ ๋‚˜์ค‘์— ์ปฌ๋Ÿผ์„ ์ถ”๊ฐ€ํ•ด์ฃผ์—ˆ๋‹ค(ํ…Œ์ด๋ธ” ์„ค๊ณ„๋ฅผ ์ž˜ ํ–ˆ์–ด์•ผ ํ–ˆ๋Š”๋ฐ...). ๊ทธ๋Ÿฌ๊ณ  default๋กœ current timestamp์„ ํ•ด์ฃผ์—ˆ๋”๋‹ˆ ์‹ค์ œ ํฌ๋กค๋ง ๋œ ๋‚ ์งœ๊ฐ€ ์•„๋‹ˆ๋ผ ๊ทธ ์ปฌ๋Ÿผ์„ ๋งŒ๋“  ๋‚ ์งœ๋กœ ์ปฌ๋Ÿผ์ด ์ฑ„์›Œ์ ธ์„œ ๋‹นํ™ฉํ–ˆ๋‹ค.

๊ทธ๋ž˜์„œ channel_history๋ž‘ channel ํ…Œ์ด๋ธ”์€ ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ™์€ ๋‚  ์ˆ˜์ง‘ํ–ˆ๋‹ค. ๋ผ๋Š” ์ „์ œํ•˜์— channel_history์˜ ๋‚ ์งœ์ปฌ๋Ÿผ์„ ๊ณ ๋Œ€๋กœ ๋ณต์‚ฌํ•ด์™€ channel ํ…Œ์ด๋ธ”์˜ ๋‚ ์งœ์ปฌ๋Ÿผ์„ ์ˆ˜์ •ํ•˜๊ธฐ๋กœ ํ•˜์˜€๋‹ค.


ํ˜„์žฌ ์ƒํ™ฉ

SELECT c.channel_id, c.crawl_update_at as channel_date, ch.crawl_update_at as channel_history_date 
FROM channel c
JOIN channel_history ch
ON c.channel_id=ch.channel_id;

๊ฐ™์€ ๋ฐ์ดํ„ฐ์ด๊ณ  ๊ฐ™์€ ๋‚ ์— DB์— ์ €์žฅ๋˜์—ˆ์Œ์—๋„ ๋‘ ํ…Œ์ด๋ธ”์— ๋‚ ์งœ๊ฐ€ ๋‹ค๋ฅธ ๊ฒƒ์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.


์ฒซ ๋ฒˆ์งธ ๋ฌธ์ œ ํ•ด๊ฒฐ

์ด๋ฅผ ํ•ด๊ฒฐ ํ•˜๊ธฐ์œ„ํ•ด channel_history ํ…Œ์ด๋ธ”์˜ ๋‚ ์งœ ์ปฌ๋Ÿผ์„ ๋ณต์‚ฌํ•ด์„œ channel ํ…Œ์ด๋ธ”์— ๋„ฃ์–ด์ฃผ์—ˆ๋‹ค.

UPDATE channel c
JOIN (
  SELECT channel_id, crawl_update_at
  FROM channel_history
) ch
ON c.channel_id = ch.channel_id
SET c.crawl_update_at = ch.crawl_update_at;

๊ทธ ๋’ค, ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•ด๋ณด๋‹ˆ

์จ”์ž”๐Ÿ˜Ž ๋ฌด์‚ฌํžˆ ์—…๋ฐ์ดํŠธ ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.


๋‘ ๋ฒˆ์งธ ๋ฌธ์ œ ํ•ด๊ฒฐ

์ด์ œ ์›๋ž˜ ๋ชฉ์ ์ด์—ˆ๋˜ MySQL ๋ฐ์ดํ„ฐ๋ฅผ S3์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ด๋ณด์ž.

์šฐ์„ , Athena๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๊ธฐ ๋•Œ๋ฌธ์— S3์— ๋‚ ์งœ๋ณ„๋กœ ํŒŒํ‹ฐ์…”๋‹ํ•ด์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋„ฃ์–ด๋‘์–ด์•ผ ํ•œ๋‹ค. ๊ทธ๋ฅผ ์œ„ํ•ด channel ํ…Œ์ด๋ธ”๊ณผ channel_history ํ…Œ์ด๋ธ”์„ ๋‚ ์งœ๋ณ„๋กœ csv๋กœ export ํ•ด์ฃผ์–ด์•ผ ํ•˜๊ณ , ๊ทธ ์ „์— S3์— ๋‚ ์งœ๋ณ„๋กœ ํด๋”๋ฅผ ๋งŒ๋“ค์–ด๋†”์•ผ ํ•œ๋‹ค.

์šฐ์„ , ํ˜„์žฌ ๋ช‡ ๊ฐœ์˜ ๋‚ ์งœ๊ฐ€ ์กด์žฌํ•˜๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด ์ฟผ๋ฆฌ๋ฅผ ๋‚ ๋ ค๋ณด์•˜๋‹ค.

SELECT DATE(crawl_update_at) AS date, COUNT(*) AS count
FROM channel
GROUP BY DATE(crawl_update_at)
ORDER BY 1;

ํ™•์ธ ๊ฒฐ๊ณผ, ์ด 6์ผ๋™์•ˆ 6500์—ฌ๊ฐœ์˜ ์ฑ„๋„ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์—ˆ๋‹ค.
๋”ฐ๋ผ์„œ ์•„๋ž˜์™€ ๊ฐ™์€ ์ฟผ๋ฆฌ๋ฅผ ํ†ตํ•ด ๋‚ ์งœ๋ณ„๋กœ S3์— ๋„ฃ์–ด์ฃผ๋ ค ์‹œ๋„ํ–ˆ๋‹ค.

SELECT *
FROM channel
WHERE date(crawl_update_at)='2023-03-12'
INTO OUTFILE 's3://dothis-crawling-data/channel/dt=2023-03-12/channel.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n';

ํ•˜์ง€๋งŒ...

ERROR 1227 (42000): Access denied; you need (at least one of) the FILE privilege(s) for this operation

๊ณ„์†ํ•ด์„œ ์ด๋Ÿฐ ์—๋Ÿฌ๋ฅผ ๋ฑ‰์–ด๋‚ด๊ธธ๋ž˜ DB ๊ถŒํ•œ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๋ฐฑ์—”๋“œ์—๊ฒŒ ๊ถŒํ•œ์„ ์š”์ฒญํ•˜๋‹ˆ....ํ˜„์žฌ ์šฐ๋ฆฌ๊ฐ€ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š” RDS๋Š” ํ”„๋ฆฌํ‹ฐ์–ด๋ผ MySQL 8.0์„ ์“ฐ๊ณ  ์žˆ๋‹ค๊ณ  ํ•˜๋Š”๋ฐ, ๊ทธ 8.0 ๋ฒ„์ „์—๋Š” OUTFILE ๊ธฐ๋Šฅ์ด ์—†๋‹ค๊ณ  ํ•œ๋‹ค.....ใ…Ž aurora๋ฅผ ์จ์•ผ ์ƒ๊ธด๋‹ค๊ณ  ํ•˜๋‹ˆ ์–ด์ฉ” ์ˆ˜ ์—†์ด ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜๋™์œผ๋กœ ์˜ฎ๊ธฐ๊ธฐ๋กœ ํ–ˆ๋‹ค.


์„ธ ๋ฒˆ์งธ ๋ฌธ์ œ ํ•ด๊ฒฐ

๊ทธ๋Ÿฐ๋ฐ, ์—ฌ๊ธฐ์„œ ๊ฐ„๊ณผํ•˜๊ณ  ๋„˜์–ด๊ฐ„ ๋ถ€๋ถ„์ด, ๋Œ€๋ถ€๋ถ„์˜ ๋ธ”๋กœ๊ทธ ๊ธ€์—์„œ๋Š” csv๋กœ S3์— ๋„ฃ๊ธธ๋ž˜ ์šฐ๋ฆฌ๋„ ๊ทธ๋ ‡๊ฒŒ ํ• ๊นŒ ํ–ˆ๋”๋‹ˆ, ์šฉ๋Ÿ‰์ด ๋„ˆ๋ฌด ์ปค์ง€๊ธฐ๋„ ํ•˜๊ณ  ๋น„์šฉ๋„ ๊ฐ๋‹น์ด ์•ˆ ๋  ๊ฒƒ ๊ฐ™์•˜๋‹ค. ๊ทธ๋ž˜์„œ ํŒŒ์ผ ํ˜•์‹์— ๋Œ€ํ•œ ๊ณ ๋ฏผ์„ ์‹œ์ž‘ํ–ˆ๋‹ค.

์šฐ์„ , S3์— ๋„ฃ์„ ์ˆ˜ ์žˆ๋Š” ํŒŒ์ผ ํ˜•์‹์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  • CSV
  • TSV
  • JSON (newline delimited JSON, JSON with arrays)
  • Apache Parquet
  • ORC
  • AVRO
  • Apache RCFile
  • SequenceFile
  • Text (plain text)

๊ทธ๋ฆฌ๊ณ  ์••์ถ• ๋ฐฉ์‹์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  • Gzip : ์ผ๋ฐ˜์ ์œผ๋กœ ๊ฐ€์žฅ ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š” ์••์ถ• ๋ฐฉ์‹ ์ค‘ ํ•˜๋‚˜๋กœ, ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ฒŒ ์••์ถ•ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Gzip์€ ์ผ๋ฐ˜์ ์œผ๋กœ ํ…์ŠคํŠธ ํŒŒ์ผ์— ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.
  • Snappy : ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ฒŒ ์••์ถ•ํ•˜๊ณ  ํ•ด์ œํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, CPU ์‚ฌ์šฉ๋ฅ ์ด ๋‚ฎ์•„์„œ ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค.
  • LZO : ๋ฐ์ดํ„ฐ๋ฅผ ์••์ถ•ํ•˜๊ณ  ํ•ด์ œํ•  ๋•Œ CPU ์‚ฌ์šฉ๋ฅ ์ด ๋‚ฎ์œผ๋ฉฐ, ๋งค์šฐ ํฐ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•  ๋•Œ ํšจ๊ณผ์ ์ž…๋‹ˆ๋‹ค.
  • Bzip2 : ์••์ถ•๋ฅ ์ด ๋†’์ง€๋งŒ ์••์ถ•๊ณผ ํ•ด์ œ ์†๋„๊ฐ€ ๋Š๋ฆฌ๋ฉฐ, CPU ์‚ฌ์šฉ๋ฅ ์ด ๋†’๊ธฐ ๋•Œ๋ฌธ์— ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ๋Š” ์ ํ•ฉํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
  • SnappyCodec : Snappy์™€ ๋น„์Šทํ•œ ์••์ถ•๋ฅ ์„ ๊ฐ€์ง€๋ฉฐ, ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ CPU ์‚ฌ์šฉ๋ฅ ์ด ๋‚ฎ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์••์ถ• ์†๋„๋Š” Snappy๋ณด๋‹ค ๋Š๋ฆฌ๋ฉฐ, ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ๋Š” ํšจ๊ณผ์ ์ด์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • ์ด ์™ธ์—๋„ LZ4, Zstd ๋“ฑ ๋‹ค๋ฅธ ์••์ถ• ๋ฐฉ์‹๋„ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค.

์ฒ˜์Œ ๋ฐฑ์—”๋“œ๋ถ„์€ ์ฃผ๋กœ ์‚ฌ์šฉ๋˜๋Š” gzip-json ์••์ถ•-ํŒŒ์ผํ˜•์‹ ๋ฐฉ์‹์„ ์„ ํƒํ•˜์…”์„œ ์•Œ๋ ค์ฃผ์…จ๋‹ค. ํ•˜์ง€๋งŒ, ๋น…๋ฐ์ดํ„ฐ๋ฅผ ์ž์ฃผ ์กฐํšŒํ•˜๊ณ  ๋‹ค๋ฃจ๊ธฐ์—๋Š” ๋‚ด ์ƒ๊ฐ์—” snappy-parquet๋„ ๋‚˜์˜์ง€ ์•Š์„ ๊ฒƒ ๊ฐ™์•˜๋‹ค. ๋ฌด์—‡๋ณด๋‹ค ์Œ“์ธ ๋ฐ์ดํ„ฐ๋ฅผ ์ž์ฃผ ์กฐํšŒํ•˜๊ฑฐ๋‚˜ ์•ž์œผ๋กœ ๋ถ„์„๋ฉด์—์„œ ์—ฌ๋Ÿฌ๋ฒˆ ๊ฑด๋“ค์ด๊ณ  ์‹ถ์€ ๋‚ด ์ž…์žฅ์—์„œ ์ „์ž๋ณด๋‹จ ํ›„์ž์ธ ์ชฝ์ด ์†๋„๋ฉด์—์„œ๋„, ๋น„์šฉ๋ฉด์—์„œ๋„ ๋‚˜์„ ๊ฒƒ ๊ฐ™์•„ ๋น„๊ต ๋ถ„์„ ํ•ด๋ณด๊ธฐ๋กœ ํ–ˆ๋‹ค.


# pip install pyarrow python-snappy

import gzip, snappy, time, os, json
import pyarrow as pa
import pyarrow.parquet as pq
import json, pandas as pd
from pathlib import Path

# ํŒŒ์ผ ํฌ๊ธฐ ๋ณ€ํ™˜ ํ•จ์ˆ˜
def convert_size(size_bytes):
    import math
    if size_bytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return "%s %s" % (s, size_name[i])

BASE_PATH = Path('์›๋ณธํŒŒ์ผ๊ฒฝ๋กœ')
logfile_path = BASE_PATH/'ํŒŒ์ผ๋ช….json'
gzipfile = BASE_PATH/'ํŒŒ์ผ๋ช….gz'
pqfile = BASE_PATH/'ํŒŒ์ผ๋ช….parquet'
file_size = os.path.getsize(logfile_path)

print('์›๋ณธjson ํŒŒ์ผ ํฌ๊ธฐ : ', convert_size(file_size), 'bytes')


# json ํŒŒ์ผ์„ -> gzip ์œผ๋กœ ์••์ถ• .gz ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•ด์„œ ์ €์žฅ
start = time.time()
with open(logfile_path,'rb') as f_in:
    with gzip.open(gzipfile,'wb') as f_out:
        f_out.writelines(f_in)
end = time.time()
print(f"gzip์ฒ˜๋ฆฌ ์‹œ๊ฐ„ : {end - start:.5f} ์ดˆ")
print(f"gzip ์••์ถ• ํ›„ ํŒŒ์ผ ํฌ๊ธฐ : ", convert_size(os.path.getsize(gzipfile)), 'bytes')


# json ํŒŒ์ผ์„ -> snappy๋กœ ์••์ถ• .parquet ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•ด์„œ ์ €์žฅ
start = time.time()
with open(logfile_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
df = pd.json_normalize(data)
pq.write_table(
    pa.Table.from_pandas(df),
    pqfile,
    compression='snappy'
)
end = time.time()
print(f"snappy์ฒ˜๋ฆฌ ์‹œ๊ฐ„ : {end - start:.5f} ์ดˆ")
print(f"snappy ์••์ถ• ํ›„ ํŒŒ์ผ ํฌ๊ธฐ : ", convert_size(os.path.getsize(pqfile)), 'bytes')

๊ทธ ๊ฒฐ๊ณผ, ์•„๋ž˜์™€ ๊ฐ™์ด ๋‚˜์™”๋‹ค.

๊ฒฐ๊ตญ ์šฐ๋ฆฌ๋Š” ์šฉ๋Ÿ‰๋ณด๋‹ค๋Š” ์ฒ˜๋ฆฌ ์†๋„์— ๋” ๋น„์ค‘์„ ๋‘๊ธฐ๋กœ ํ•˜๊ณ  snappy-parquet ํŽ˜์–ด๋ฅผ ์„ ํƒํ•˜๊ธฐ๋กœ ํ–ˆ๋‹ค.


๋‹ค์‹œ ๋‘ ๋ฒˆ์งธ ๋ฌธ์ œ ํ•ด๊ฒฐ

๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ณ  MySQL Workbench์—์„œ ์ง์ ‘ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ jsonํŒŒ์ผ๋กœ exportํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ํƒํ•ด, snappy๋ฅผ ์ด์šฉํ•ด parquet ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๊ณ  ์ˆ˜๋™์œผ๋กœ S3์— ์—…๋กœ๋“œ ํ•˜์˜€๋‹ค.

import pyarrow as pa
import pyarrow.parquet as pq
import json, pandas as pd
from pathlib import Path

'''
json ํŒŒ์ผ์„ snappy๋กœ ์••์ถ•ํ•˜์—ฌ parquet ํŒŒ์ผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์ฝ”๋“œ
'''

BASE_PATH = Path('json ํŒŒ์ผ์ด ๋“ค์–ด์žˆ๋Š” ํด๋”')
pq_dir = BASE_PATH / 'parquet'
pq_dir.mkdir(exist_ok=True)

for logfile_path in BASE_PATH.glob('*.json'):
    data = []
    with open(logfile_path, 'r', encoding='utf-8') as f:
    
    	# ์ˆ˜์ง‘๋‹จ์—์„œ ๊ฐœํ–‰๋ฌธ์ž๋ฅผ ์ œ๊ฑฐํ•˜์ง€ ์•Š์•˜๊ณ , db์—์„œ ๊ฐ’์„ ๋นผ์˜ค๋‹ค๋ณด๋‹ˆ null๊ฐ’์ด ์žˆ์–ด์„œ json.load๋ฅผ ์“ธ ์ˆ˜ ์—†์—ˆ๋‹ค.
        decoder = json.JSONDecoder()
        buffer = ""
        for line in f:
            buffer += line.strip()
            try:
                while buffer:
                    obj, idx = decoder.raw_decode(buffer)
                    data.append(obj)
                    buffer = buffer[idx:].lstrip()
            except json.JSONDecodeError:
                pass

    df = pd.json_normalize(data)

    pqfile = pq_dir / (logfile_path.stem + '.parquet')
    pq.write_table(
        pa.Table.from_pandas(df),
        pqfile,
        compression='snappy'
    )

profile
์™„๋ฃŒ์ฃผ์˜

0๊ฐœ์˜ ๋Œ“๊ธ€