๐ŸŒŠ Delta Lake ์ž…๋ฌธ์ž๋ฅผ ์œ„ํ•œ ๊ฐ€์ด๋“œ - ์‹ค์ „ํŽธ(Part 1. ๋กœ์ปฌ ํ™˜๊ฒฝ)

NewNewDaddyยท2025๋…„ 3์›” 24์ผ
1

๋ฐ์ดํ„ฐ ๋ถ„์„

๋ชฉ๋ก ๋ณด๊ธฐ
8/9
post-thumbnail

0. INTRO

  • ์•ž์„  ๊ธ€ ๐ŸŒŠ Delta Lake ์ž…๋ฌธ์ž๋ฅผ ์œ„ํ•œ ๊ฐ€์ด๋“œ - ์ด๋ก ํŽธ์—์„œ๋Š” Delta Lake์— ๋Œ€ํ•œ ์ด๋ก ์ ์ธ ๋‚ด์šฉ์„ ์ƒ์„ธํ•˜๊ฒŒ ๋‹ค๋ฃจ์–ด ๋ณด์•˜์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ๊ธ€์—์„œ๋Š” Pyspark Docker Container ํ™˜๊ฒฝ์—์„œ Delta Lake์˜ ๊ธฐ๋Šฅ์„ ์‹ค์Šตํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.
  • ์ด๋ฒˆ ์‹ค์Šต์—์„œ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๋Š” ๋„๊ตฌ๋กœ Pyspark๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ, delta ์œ ํ˜•์˜ ํŒŒ์ผ๋“ค์€ ๋กœ์ปฌ ๋””๋ ‰ํ† ๋ฆฌ์— ์ €์žฅ๋˜์–ด ๊ด€๋ฆฌ๋ฉ๋‹ˆ๋‹ค.
  • ๋„์ปค ์ปจํ…Œ์ด๋„ˆ์˜ ๊ฒฝ์šฐ ์ดํ›„ ํด๋ผ์šฐ๋“œ ํ™˜๊ฒฝ์„ ์—ฐ๋™ํ•œ ์‹ค์Šต๊นŒ์ง€ ๊ณ ๋ คํ•˜์˜€์„ ๋•Œ ์ œ๊ฐ€ ๋”ฐ๋กœ ์ƒ์„ฑํ•œ hyunsoolee0506/pyspark-cloud:3.5.1 ์ด๋ฏธ์ง€๋กœ ์ƒ์„ฑํ•˜์‹œ๋Š” ๊ฒƒ์„ ๊ถŒ์žฅ๋“œ๋ฆฝ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์ด๋ฒˆ ๋กœ์ปฌ ํ™˜๊ฒฝ ์‹ค์Šต์˜ ๊ฒฝ์šฐ๋Š” Google Colab์—์„œ ์ง„ํ–‰ํ•ด๋„ ๋ฌด๋ฐฉํ•ฉ๋‹ˆ๋‹ค.
  • ์‹ค์Šต์—์„œ๋Š” ์•„๋ž˜ delta_data.zipํŒŒ์ผ ์•ˆ์— ์žˆ๋Š” ๋‘ ๊ฐ€์ง€ CSV ํŒŒ์ผ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์˜€์Šต๋‹ˆ๋‹ค.

    ๐Ÿ‘‰ delta_data.zip


1๏ธโƒฃ ์‹ค์Šต ํ™˜๊ฒฝ ์„ค์ •

โ–ช 1) Docker Container ์ƒ์„ฑ

  • ์‚ฌ์šฉ์ž์˜ ์ปดํ“จํ„ฐ์—์„œ volume์œผ๋กœ ์‚ฌ์šฉํ•  ๋””๋ ‰ํ† ๋ฆฌ์™€ ์ปจํ…Œ์ด๋„ˆ ๋‚ด๋ถ€ /workspace/spark ๋””๋ ‰ํ† ๋ฆฌ๊ฐ€ ๋งคํ•‘๋˜๋„๋ก ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
docker run -d \
    --name pyspark \
    -p 8888:8888 \
    -p 4040:4040 \
    -v [์‚ฌ์šฉ์ž ๋””๋ ‰ํ† ๋ฆฌ]:/workspace/spark \
    hyunsoolee0506/pyspark-cloud:3.5.1
  • ์œ„ ๋ช…๋ น์–ด ์‹คํ–‰ ํ›„ 8888 ํฌํŠธ๋กœ ์ ‘์†ํ•˜๋ฉด juypter lab ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์œผ๋กœ ๋“ค์–ด์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

โ–ช 2) ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์„ค์น˜

  • ์‹ค์Šต์— ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์„ ์„ค์น˜ํ•ฉ๋‹ˆ๋‹ค. hyunsoolee0506/pyspark-cloud:3.5.1 ์ด๋ฏธ์ง€์—๋Š” ์ด๋ฏธ ์„ค์น˜๋˜์–ด ์žˆ์ง€๋งŒ colab์˜ ๊ฒฝ์šฐ ์•„๋ž˜ ์ฝ”๋“œ ์‹คํ–‰์„ ํ†ตํ•ด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์„ ์„ค์น˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
pip install pyspark==3.5.1 delta-spark==3.2.0 pyarrow findspark

โ–ช 3) Pyspark delta lake ํ™˜๊ฒฝ ์„ค์ •

from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

builder = SparkSession.builder.appName("DeltaLakeLocal") \
    .enableHiveSupport() \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

2๏ธโƒฃ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ํ…Œ์ด๋ธ” ์ƒ์„ฑ

โ–ช 1) ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ƒ์„ฑ

  • deltalake_db๋ผ๋Š” ์ด๋ฆ„์˜ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
spark.sql("CREATE DATABASE IF NOT EXISTS deltalake_db")

spark.sql("SHOW DATABASES").show()

---
+------------+
|   namespace|
+------------+
|     default|
|deltalake_db|
+------------+

โ–ช 2) csv ํƒ€์ž… ํ…Œ์ด๋ธ” ์ƒ์„ฑ

  • trainer_data.csv ํŒŒ์ผ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋  trainer ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
- ํ…Œ์ด๋ธ” ์ด๋ฆ„ : trainer
- ์Šคํ‚ค๋งˆ : 
  - id โ†’ INT
  - name โ†’ STRING
  - age โ†’ INT
  - hometown โ†’ STRING
  - prefer_type โ†’ STRING
  - badge_count โ†’ INT
  - level โ†’ STRING
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING csv
OPTIONS (
  path '[trainer_data.csv ํŒŒ์ผ ๊ฒฝ๋กœ]',
  header 'true',
  inferSchema 'true',
  delimiter ','
)
"""

spark.sql(query)

# ํ…Œ์ด๋ธ” ์ƒ์„ฑ ํ™•์ธ
spark.sql("SHOW TABLES FROM deltalake_db").show()

---
+------------+---------+-----------+
|   namespace|tableName|isTemporary|
+------------+---------+-----------+
|deltalake_db|  trainer|      false|
+------------+---------+-----------+

3๏ธโƒฃ delta ํƒ€์ž… ํ…Œ์ด๋ธ” ์ƒ์„ฑ

  • ๊ธฐ์กด์— ์žˆ๋Š” csv ํŒŒ์ผ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ”๋กœ delta ์œ ํ˜•์˜ ํ…Œ์ด๋ธ” ์ƒ์„ฑ๊ณผ ๋™์‹œ์— ๋„ฃ์„ ์ˆ˜๋Š” ์—†๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ž˜์™€ ๊ฐ™์ด ๋‘ ๋‹จ๊ณ„๋ฅผ ๊ฑฐ์ณ delta ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•˜์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค.
    1. delta ์œ ํ˜• ๋นˆ ํ…Œ์ด๋ธ” ์ƒ์„ฑ
    2. delta ํ…Œ์ด๋ธ”์— csv ํ…Œ์ด๋ธ” ๋ฐ์ดํ„ฐ ์‚ฝ์ž…

โ–ช 1) ํ…Œ์ด๋ธ” ์ƒ์„ฑ

  • trainer_delta๋ผ๋Š” ์ด๋ฆ„์˜ delta ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
  • /workspace/spark/deltalake/delta_local/trainer_delta/ ํ•ด๋‹น ๊ฒฝ๋กœ ์•„๋ž˜์— delta ํ…Œ์ด๋ธ” ๊ด€๋ จ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋˜๋„๋ก ์„ค์ •ํ•˜์˜€์Šต๋‹ˆ๋‹ค.
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING delta
LOCATION '/workspace/spark/deltalake/delta_local/trainer_delta/'
"""
spark.sql(query)

โ–ช 2) ๋ฐ์ดํ„ฐ ์‚ฝ์ž…

  • ์œ„์—์„œ ์ƒ์„ฑํ•˜์˜€๋˜ trainer ํ…Œ์ด๋ธ”์˜ ๋ฐ์ดํ„ฐ๋ฅผ trainer_delta ํ…Œ์ด๋ธ”์— ์‚ฝ์ž…ํ•ฉ๋‹ˆ๋‹ค.
query = """
INSERT INTO deltalake_db.trainer_delta
SELECT * FROM deltalake_db.trainer;
"""
spark.sql(query)
  • ๋ฐ์ดํ„ฐ ์‚ฝ์ž…์ด ์™„๋ฃŒ๊ฐ€ ๋˜๋ฉด delta ํ…Œ์ด๋ธ” ๋””๋ ‰ํ† ๋ฆฌ์— _delta_log/ ํด๋”์™€ parquet ํŒŒ์ผ์ด ์ƒˆ๋กญ๊ฒŒ ์ƒ์„ฑ์ด ๋œ ๊ฒƒ์„ ํ™•์ธํ•ด๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

4๏ธโƒฃ delta ํƒ€์ž… ํ…Œ์ด๋ธ” ์ฝ๊ธฐ

  • pyspark์—์„œ๋Š” ์กฐ๊ธˆ์”ฉ ๋‹ค๋ฅธ ๋ฐฉ์‹์œผ๋กœ ์ €์žฅ๋œ ํ…Œ์ด๋ธ”์„ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

โ–ช 1) spark์—์„œ ๊ธฐ๋ณธ ์ฝ๊ธฐ

LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta'

df = spark.read.format("delta").load(LOCAL_DELTA_PATH)

df.show(5)

---
+---+------+---+--------+-----------+-----------+------------+
| id|  name|age|hometown|prefer_type|badge_count|       level|
+---+------+---+--------+-----------+-----------+------------+
|  1| Brian| 28|   Seoul|   Electric|          8|      Master|
|  3| Susan| 18| Gwangju|       Rock|          7|      Expert|
|  6| Vicki| 17| Daejeon|        Ice|          4|Intermediate|
|  9|Olivia| 45| Incheon|    Psychic|          3|Intermediate|
| 10|  Mark| 16| Gangwon|       Fire|          4|Intermediate|
+---+------+---+--------+-----------+-----------+------------+
only showing top 5 rows

โ–ช 2) delta.์œผ๋กœ ์ฝ๊ธฐ

query = f"SELECT * FROM delta.`{LOCAL_DELTA_PATH}`"

spark.sql(query)

โ–ช 3) hive catalog์—์„œ ์ฝ๊ธฐ

spark.table('deltalake_db.trainer_delta')

5๏ธโƒฃ ํ…Œ์ด๋ธ” ์ˆ˜์ • ํ›„ ์ €์žฅ

  • ์ด๋ฒˆ์—๋Š” ํ…Œ์ด๋ธ” ๋‚ด์šฉ์„ ์ˆ˜์ •ํ•˜์—ฌ ๊ธฐ์กด์— ์ €์žฅ๋˜์–ด ์žˆ๋˜ ๋””๋ ‰ํ† ๋ฆฌ์— ๋ฎ์–ด์“ฐ๋Š” ๊ณผ์ •์„ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์ดํ›„์— ์žˆ์„ ํ…Œ์ด๋ธ” ๋ณ€๊ฒฝ ์ด๋ ฅ ์กฐํšŒ๋‚˜ delta lake์˜ ํ•ต์‹ฌ ๊ธฐ๋Šฅ์ธ Time Travel ์ฟผ๋ฆฌ๋ฅผ ์‹ค์Šตํ•ด๋ณด๊ธฐ ์œ„ํ•œ ๊ณผ์ •์ž…๋‹ˆ๋‹ค.

โ–ช 1) 'Beginner' ์ œ์™ธ ํ›„ ์ €์žฅ

# Beginner ์ œ์™ธํ•œ dataframe ์ƒ์„ฑ
df_1 = df.filter(F.col('level') != 'Beginner')

# ๊ธฐ์กด ๊ฒฝ๋กœ์— ๋ฎ์–ด์“ฐ๊ธฐ
df_1.write \
    .format('delta') \
    .mode('overwrite') \
    .save(LOCAL_DELTA_PATH)
    
# ๋ฐ์ดํ„ฐ ํ™•์ธ
df = spark.read.format("delta").load(LOCAL_DELTA_PATH)
df.select('level').distinct().show()

---
+------------+
|       level|
+------------+
|      Expert|
|    Advanced|
|      Master|
|Intermediate|
+------------+

โ–ช 2) 'Advanced' ์ œ์™ธ ํ›„ ์ €์žฅ

# Advanced ์ œ์™ธํ•œ dataframe ์ƒ์„ฑ
df_2 = df_1.filter(F.col('level') != 'Advanced')

# ๊ธฐ์กด ๊ฒฝ๋กœ์— ๋ฎ์–ด์“ฐ๊ธฐ
df_2.write \
    .format('delta') \
    .mode('overwrite') \
    .save(LOCAL_DELTA_PATH)

# ๋ฐ์ดํ„ฐ ํ™•์ธ
df = spark.read.format("delta").load(LOCAL_DELTA_PATH)
df.select('level').distinct().show()

---
+------------+
|       level|
+------------+
|      Expert|
|      Master|
|Intermediate|
+------------+
  • ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฎ์–ด์”Œ์›Œ์ง์— ๋”ฐ๋ผ parquet ํŒŒ์ผ์ด ์ถ”๊ฐ€๋˜๊ณ , _delta_log/ ํด๋” ๋‚ด์— ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ(.json ํŒŒ์ผ) ์—ญ์‹œ ์ถ”๊ฐ€๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

6๏ธโƒฃ ๋ณ€๊ฒฝ ์ด๋ ฅ(history) ์กฐํšŒ ๋ฐ Time Travel ์ฟผ๋ฆฌ

โ–ช 1) History ์กฐํšŒ

  • delta ํ…Œ์ด๋ธ”์— ๋Œ€ํ•œ ๋ณ€๊ฒฝ ์ด๋ ฅ์„ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.
  • ํ˜„์žฌ๊นŒ์ง€ ํ…Œ์ด๋ธ”์€ ์ฒ˜์Œ ์ƒ์„ฑ(CREATE) ํ›„ WRITE ๊ฐ€ ์ด 3๋ฒˆ ๋ฐœ์ƒํ•œ ๊ตฌ์กฐ์ž…๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ version ์—ญ์‹œ 0,1,2,3 ์ด๋ ‡๊ฒŒ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.
    • VERSION 0 โ†’ trainer_delta ํ…Œ์ด๋ธ” ์ƒ์„ฑ ์ƒํƒœ, ๋ฐ์ดํ„ฐ X
    • VERSION 1 โ†’ trainer ํ…Œ์ด๋ธ”์—์„œ ๋ฐ์ดํ„ฐ ์‚ฝ์ž…๋œ ์ตœ์ดˆ ์ƒํƒœ
    • VERSION 2 โ†’ 'Beginner' ํ–‰ ์ œ์™ธ ํ›„ ์ €์žฅ๋œ ์ƒํƒœ
    • VERSION 3 โ†’ Advanced' ํ–‰ ์ œ์™ธ ํ›„ ์ €์žฅ๋œ ์ƒํƒœ
query = "DESCRIBE HISTORY deltalake_db.trainer_delta"

spark.sql(query).show(vertical=True, truncate=False)

---
-RECORD 0--------------------------------------------------------------------------------------------------------------
 version             | 3                                                                                               
 timestamp           | 2025-03-21 02:09:39.085                                                                         
 userId              | NULL                                                                                            
 userName            | NULL                                                                                            
 operation           | WRITE                                                                                           
 operationParameters | {mode -> Overwrite, partitionBy -> []}                                                          
 job                 | NULL                                                                                            
 notebook            | NULL                                                                                            
 clusterId           | NULL                                                                                            
 readVersion         | 2                                                                                               
 isolationLevel      | Serializable                                                                                    
 isBlindAppend       | false                                                                                           
 operationMetrics    | {numFiles -> 1, numOutputRows -> 42, numOutputBytes -> 3125}                                    
 userMetadata        | NULL                                                                                            
 engineInfo          | Apache-Spark/3.5.1 Delta-Lake/3.2.0                                                             
-RECORD 1--------------------------------------------------------------------------------------------------------------
 version             | 2                                                                                               
 timestamp           | 2025-03-21 02:08:32.646                                                                         
 userId              | NULL                                                                                            
 userName            | NULL                                                                                            
 operation           | WRITE                                                                                           
 operationParameters | {mode -> Overwrite, partitionBy -> []}                                                          
 job                 | NULL                                                                                            
 notebook            | NULL                                                                                            
 clusterId           | NULL                                                                                            
 readVersion         | 1                                                                                               
 isolationLevel      | Serializable                                                                                    
 isBlindAppend       | false                                                                                           
 operationMetrics    | {numFiles -> 1, numOutputRows -> 85, numOutputBytes -> 3868}                                    
 userMetadata        | NULL                                                                                            
 engineInfo          | Apache-Spark/3.5.1 Delta-Lake/3.2.0                                                             
-RECORD 2--------------------------------------------------------------------------------------------------------------
 version             | 1                                                                                               
 timestamp           | 2025-03-21 01:46:21.446                                                                         
 userId              | NULL                                                                                            
 userName            | NULL                                                                                            
 operation           | WRITE                                                                                           
 operationParameters | {mode -> Append, partitionBy -> []}                                                             
 job                 | NULL                                                                                            
 notebook            | NULL                                                                                            
 clusterId           | NULL                                                                                            
 readVersion         | 0                                                                                               
 isolationLevel      | Serializable                                                                                    
 isBlindAppend       | true                                                                                            
 operationMetrics    | {numFiles -> 1, numOutputRows -> 90, numOutputBytes -> 3980}                                    
 userMetadata        | NULL                                                                                            
 engineInfo          | Apache-Spark/3.5.1 Delta-Lake/3.2.0                                                             
-RECORD 3--------------------------------------------------------------------------------------------------------------
 version             | 0                                                                                               
 timestamp           | 2025-03-21 01:43:25.471                                                                         
 userId              | NULL                                                                                            
 userName            | NULL                                                                                            
 operation           | CREATE TABLE                                                                                    
 operationParameters | {partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}} 
 job                 | NULL                                                                                            
 notebook            | NULL                                                                                            
 clusterId           | NULL                                                                                            
 readVersion         | NULL                                                                                            
 isolationLevel      | Serializable                                                                                    
 isBlindAppend       | true                                                                                            
 operationMetrics    | {}                                                                                              
 userMetadata        | NULL                                                                                            
 engineInfo          | Apache-Spark/3.5.1 Delta-Lake/3.2.0                                                    

โ–ช 2) Time Travel - Version

  • ํ…Œ์ด๋ธ” ๋ณ€๊ฒฝ์‹œ๋งˆ๋‹ค ๋ถ€์—ฌ๋œ ๋ฒ„์ „ ๋ฒˆํ˜ธ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํŠน์ • ๋ฒ„์ „์— ํ•ด๋‹นํ•˜๋Š” ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์„ ๋ถˆ๋Ÿฌ์˜ต๋‹ˆ๋‹ค.

๐Ÿ‘‰ ์ตœ์ดˆ ๋ฒ„์ „(version 0) ํ…Œ์ด๋ธ” ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

df_pre = spark.read \
    .format("delta") \
    .option("versionAsof", 0) \
    .load(LOCAL_DELTA_PATH)
    
df_pre.select('level').distinct().show()

---
+-----+
|level|
+-----+
+-----+

๐Ÿ‘‰ version 2 ํ…Œ์ด๋ธ” ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

df_pre = spark.read \
    .format("delta") \
    .option("versionAsof", 2) \
    .load(LOCAL_DELTA_PATH)
    
df_pre.select('level').distinct().show()

---
+------------+
|       level|
+------------+
|      Expert|
|    Advanced|
|      Master|
|Intermediate|
+------------+

๐Ÿ‘‰ SQL๋กœ Time Travel ์ฟผ๋ฆฌํ•˜๊ธฐ

df_pre = spark.sql("SELECT * FROM deltalake_db.trainer_delta VERSION AS OF 3")

df_pre.select('level').distinct().show()
---
+------------+
|       level|
+------------+
|      Expert|
|      Master|
|Intermediate|
+------------+

โ–ช 3) Time Travel - Timestamp

  • ํ…Œ์ด๋ธ” ๋ณ€๊ฒฝ ์‹œ๊ฐ„์„ ๊ธฐ์ค€์œผ๋กœ ์กฐํšŒํ•˜๋Š” ๋ฐฉ๋ฒ•์œผ๋กœ, ์ง€์ •ํ•œ ์‹œ๊ฐ„(TIMESTAMP) ๊ธฐ์ค€์œผ๋กœ ํ•ด๋‹น ์‹œ์ ์— ์กด์žฌํ–ˆ๋˜ Delta ํ…Œ์ด๋ธ”์˜ ์ƒํƒœ(๋ฒ„์ „)๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.

๐Ÿ‘‰ ์ง€์ •ํ•œ ์‹œ๊ฐ„๋Œ€์˜ ํ…Œ์ด๋ธ” ์ƒํƒœ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

TABLE_TIMESTAMP = "2025-03-21T02:09:00"

spark.read.format("delta") \
    .option("timestampAsOf", TABLE_TIMESTAMP) \
    .table("deltalake_db.trainer_delta")

๐Ÿ‘‰ SQL๋กœ ์ง€์ • ์‹œ๊ฐ„๋Œ€์˜ ํ…Œ์ด๋ธ” ๋ถˆ๋Ÿฌ์˜ค๊ธฐ

TABLE_TIMESTAMP = "2025-03-21T02:09:00"

spark.sql(f"SELECT * FROM deltALake_db.trainer_delta TIMESTAMP AS OF '{TABLE_TIMESTAMP}'")

7๏ธโƒฃ ์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ ์ž‘์—…

  • Delta Lake๋Š” ๊ธฐ์กด ํ…Œ์ด๋ธ” ์Šคํ‚ค๋งˆ์™€ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ๋ฅผ ์“ฐ๋ ค๊ณ  ํ•˜๋ฉด ์—๋Ÿฌ๊ฐ€ ๋‚˜๋„๋ก ํ•˜๋Š” '์Šคํ‚ค๋งˆ ๊ฐ•์ œ(Strict Schema Enforcement)' ์˜ต์…˜์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ๊ธฐ์กด ์ž‘์—…ํ•˜๋˜ ํ…Œ์ด๋ธ”์˜ ์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ์ด ์ผ์–ด๋‚ฌ๋‹ค๋ฉด, ํŠน์ • ์˜ต์…˜์„ ์ถ”๊ฐ€ํ•ด์ฃผ์–ด์•ผ ๋ฎ์–ด์“ฐ๊ธฐ๊ฐ€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.
  • ์‹ค์Šต ๋‚ด์šฉ์€ ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.
- ๊ธฐ์กด ์ปฌ๋Ÿผ : ['id', 'name', 'age', 'hometown', 'prefer_type', 'badge_count', 'level']
- ๋ณ€๊ฒฝ๋œ ํ…Œ์ด๋ธ” ์ปฌ๋Ÿผ : ['id', 'name', 'age', 'hometown', 'prefer_type', 'badge_count', 'level', 'dummy_col']

๐Ÿ‘‰ 'dummy_col' ์ด๋ผ๋Š” ์ปฌ๋Ÿผ์ด ์ถ”๊ฐ€๋˜์–ด ์Šคํ‚ค๋งˆ๊ฐ€ ๋ณ€๊ฒฝ๋œ ํ…Œ์ด๋ธ” ๋ฎ์–ด์“ฐ๊ธฐ

โ–ช 1) ํ‘œ์ค€ ์“ฐ๊ธฐ - ์ž‘์—… ์‹คํŒจ

LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta'

# ํ…Œ์ด๋ธ” ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
df = spark.table("deltalake_db.trainer_delta")

# 'dummy_col' ์ปฌ๋Ÿผ ์ถ”๊ฐ€
df_diff = df.withColumn('dummy_col', F.lit(1))

# ์Šคํ‚ค๋งˆ ํ•ฉ์น˜๊ธฐ ์‹œ๋„
df_diff.write \
    .format('delta') \
    .mode('overwrite') \
    .save(LOCAL_DELTA_PATH)
    
# ๋ฎ์–ด์“ฐ๋ ค๋Š” ํ…Œ์ด๋ธ”์˜ ์Šคํ‚ค๋งˆ๊ฐ€ ๋‹ฌ๋ผ ์•„๋ž˜์˜ ์—๋Ÿฌ ๋ฐœ์ƒ
# ๐Ÿ‘‡๐Ÿ‘‡๐Ÿ‘‡๐Ÿ‘‡๐Ÿ‘‡
---
AnalysisException: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 31dbae5e-d042-467b-9454-e483fdad97bb).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- age: integer (nullable = true)
-- hometown: string (nullable = true)
-- prefer_type: string (nullable = true)
-- badge_count: integer (nullable = true)
-- level: string (nullable = true)


Data schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- age: integer (nullable = true)
-- hometown: string (nullable = true)
-- prefer_type: string (nullable = true)
-- badge_count: integer (nullable = true)
-- level: string (nullable = true)
-- dummy_col: integer (nullable = true)

         
To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.

โ–ช 2) ์Šคํ‚ค๋งˆ ํ•ฉ์น˜๊ธฐ ์˜ต์…˜๊ณผ ํ•จ๊ป˜ ์“ฐ๊ธฐ

  • ์Šคํ‚ค๋งˆ๊ฐ€ ๋‹ค๋ฅธ ํ…Œ์ด๋ธ”์„ ๋ฎ์–ด์“ฐ๊ธฐ ์œ„ํ•ด์„œ๋Š” option("mergeSchema", "true") ์˜ต์…˜์„ ์ถ”๊ฐ€ํ•ด์ฃผ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
df_diff.write \
    .format('delta') \
    .mode('overwrite') \
    .option("mergeSchema", "true") \
    .save(LOCAL_DELTA_PATH)

8๏ธโƒฃ ํŒŒ์ผ ์ƒํƒœ ์ตœ์ ํ™”

  • https://docs.databricks.com/aws/en/sql/language-manual/delta-optimize

  • Delta ํ…Œ์ด๋ธ”์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ๊ณ„์† ํŒŒ์ผ์ด ์ ์žฌ๋˜๋Š” ํ˜•์‹์ด๊ธฐ ๋•Œ๋ฌธ์— ์‹œ๊ฐ„์ด ์ง€๋‚จ์— ๋”ฐ๋ผ ์ž‘์€ ํŒŒ์ผ๋“ค์ด ๋งŽ์ด ์ƒ๊ธฐ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ๋˜๋ฉด ์ฟผ๋ฆฌ ์„ฑ๋Šฅ ์ €ํ•˜์™€ ์ฝ๊ธฐ ์˜ค๋ฒ„ํ—ค๋“œ ์ฆ๊ฐ€๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์ด ๋•Œ OPTIMIZE๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ํฐ ํŒŒ์ผ๋กœ ๋ณ‘ํ•ฉํ•˜์—ฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    ์ตœ์ ํ™” ๋ฐฉ์‹์„ค๋ช…
    ๊ธฐ๋ณธ Optimize์ž‘์€ ํŒŒ์ผ ๋ณ‘ํ•ฉ, ์ฝ๊ธฐ ์„ฑ๋Šฅ ํ–ฅ์ƒ
    Z-Ordering์ž์ฃผ ํ•„ํ„ฐ๋งํ•˜๋Š” ์ปฌ๋Ÿผ ๊ธฐ์ค€ ์ •๋ ฌ โ†’ ์Šค์บ” ์ค„์—ฌ ์ฟผ๋ฆฌ ์„ฑ๋Šฅ ํ–ฅ์ƒ
    ํŒŒํ‹ฐ์…˜ ๊ธฐ๋ฐ˜ OptimizeํŠน์ • ๋‚ ์งœ/์ง€์—ญ ๋“ฑ ์ž์ฃผ ์กฐํšŒ๋˜๋Š” ํŒŒํ‹ฐ์…˜๋งŒ ์„ ํƒ์  ์ตœ์ ํ™”

โ–ช 1) ํ‘œ์ค€ ์ตœ์ ํ™”

  • delta lake๊ฐ€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ํ‘œ์ค€ ์ตœ์ ํ™” ๋ฐฉ์‹์„ ์ ์šฉํ•ฉ๋‹ˆ๋‹ค.
query = "OPTIMIZE deltalake_db.trainer_delta"

spark.sql(query)

โ–ช 2) Z-Ordering ์ตœ์ ํ™”

  • ํŠน์ • ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ์˜ ๋ฌผ๋ฆฌ์  ์ €์žฅ ์ˆœ์„œ๋ฅผ ์ตœ์ ํ™”ํ•˜๋Š” ๊ธฐ๋Šฅ.
  • ์ž์ฃผ ํ•„ํ„ฐ๋งํ•˜๋Š” ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ Z-Order๋ฅผ ๊ฑธ๋ฉด ์ฟผ๋ฆฌ์‹œ ๋ถˆํ•„์š”ํ•œ ํŒŒ์ผ ์Šค์บ”์„ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
query = """
OPTIMIZE deltalake_db.trainer_delta
ZORDER BY (trainer_id, region)  
"""

spark.sql(query)

โ–ช 3) ํŒŒํ‹ฐ์…˜ ์ตœ์ ํ™”

  • ์ „์ฒด ํ…Œ์ด๋ธ”์„ ๋Œ€์ƒ์œผ๋กœ ์ตœ์ ํ™”ํ•˜์ง€ ์•Š๊ณ , ํŒŒํ‹ฐ์…”๋‹๋œ ํŠน์ • ๋ฒ”์œ„์˜ ๋ฐ์ดํ„ฐ๋งŒ ๋ณ‘ํ•ฉํ•ฉ๋‹ˆ๋‹ค.
query = """
OPTIMIZE deltalake_db.trainer_delta
WHERE level = 'Master' 
"""

spark.sql(query)

9๏ธโƒฃ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ ์‚ญ์ œ(VACUUM)

  • https://docs.databricks.com/aws/en/sql/language-manual/delta-vacuum
  • Delta Lake๋Š” ๋ฐ์ดํ„ฐ์˜ ์ˆ˜์ •์ด๋‚˜ ์‚ญ์ œ ๋“ฑ์ด ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ๊ณผ๊ฑฐ์˜ parquet ํŒŒ์ผ๋“ค์€ ๋‹ค ๋‚จ์•„์žˆ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
  • ๊ณผ๊ฑฐ ๋ฒ„์ „์˜ ํŒŒ์ผ๋“ค์„ ๋” ์ด์ƒ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋ฐ ๊ณ„์† ๋‚จ๊ฒจ๋†“๋Š” ๊ฒƒ์€ ๋‚ญ๋น„์ด๊ธฐ ๋•Œ๋ฌธ์— VACUUM ๊ธฐ๋Šฅ์„ ํ™œ์šฉํ•˜์—ฌ ํŠน์ • ๊ธฐ๊ฐ„ ์ด์ „์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ญ์ œํ•˜๋Š” ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • delta ์œ ํ˜•์˜ ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ ๊ณผ๊ฑฐ์˜ ํŒŒ์ผ ๋ฐ์ดํ„ฐ๋ฅผ ์ง€์šธ ๋•Œ๋Š” ๋””๋ ‰ํ† ๋ฆฌ์—์„œ ์ง์ ‘ ์‚ญ์ œํ•˜๋ฉด ์•ˆ๋˜๊ณ  VACUUM ๋ช…๋ น์„ ํ†ตํ•ด ์ง€์›Œ์•ผ ํ…Œ์ด๋ธ”์˜ ์ •ํ•ฉ์„ฑ์„ ํ•ด์น˜์ง€ ์•Š๊ณ  ์ดํ›„์—๋„ ์›ํ™œํ•œ ์ž‘์—…์ด ๊ฐ€๋Šฅํ•ด์ง‘๋‹ˆ๋‹ค.
  • VACUUM ์ž‘์—… ๋ฐœ์ƒ์‹œ ์‚ญ์ œ๋œ ๋‚ ์งœ ์ด์ „์œผ๋กœ๋Š” Time Travel ํ•˜์—ฌ ์กฐํšŒํ•˜๋Š” ๊ฒƒ์ด ๋ถˆ๊ฐ€๋Šฅํ•ด์ง‘๋‹ˆ๋‹ค.
  • ํŒŒ์ผ์˜ ๊ธฐ๋ณธ ์œ ์ง€ ๊ธฐ๊ฐ„์€ 168์‹œ๊ฐ„(7์ผ)์ด๋ฉฐ, spark config ์ˆ˜์ •์„ ํ†ตํ•ด ์œ ์ง€ ๊ธฐ๊ฐ„์„ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

โ–ช 1) ๊ธฐ๋ณธ ์œ ์ง€ ๊ธฐ๊ฐ„ ์„ค์ • ํ•ด์ œ

  • spark์— ๊ธฐ๋ณธ์ ์œผ๋กœ ์„ค์ •๋˜์–ด ์žˆ๋Š” ์„ค์ •์„ ํ•ด์ œํ•ด ์ฃผ์–ด์•ผ retention ๊ธฐ๊ฐ„์„ ์ปค์Šคํ…€ํ•˜๊ฒŒ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
# ์„ค์ • ํ™•์ธ
spark.conf.get("spark.databricks.delta.retentionDurationCheck.enabled")
-> 'true'

# ์œ ์ง€ ๊ธฐ๊ฐ„ ์„ค์ • ํ•ด์ œ
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

โ–ช 2) VACUUM ๋ช…๋ น์–ด ์‹คํ–‰

  • ์‚ฌ์šฉ์ž๊ฐ€ ์ง€์ •ํ•œ ๊ธฐ๊ฐ„ ์ด์ „์— ์ƒ์„ฑ๋œ parquet ํŒŒ์ผ์€ ์‚ญ์ œํ•˜๋„๋ก VACUUM ๋ช…๋ น์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  • DRY RUN ์˜ต์…˜์€ ์‹ค์ œ๋กœ ์ž‘์—…์€ ๋˜์ง€ ์•Š๋„๋ก ํ•˜๋Š” ์„ค์ •์ž…๋‹ˆ๋‹ค.
# ๊ธฐ๋ณธ VACUUM ๋ช…๋ น (168์‹œ๊ฐ„ ์ด์ „์˜ ํŒŒ์ผ ์‚ญ์ œ)
spark.sql("VACUUM deltalake_db.trainer_delta").show(truncate=False)

# ํ˜„์žฌ ๋ฒ„์ „ ์ด์ „์˜ ํŒŒ์ผ๋“ค ์‚ญ์ œ
spark.sql("VACUUM deltalake_db.trainer_delta RETAIN 0 HOURS DRY RUN").show(truncate=False)

# 2์ผ ์ด์ „์˜ ํŒŒ์ผ๋“ค ์‚ญ์ œ
spark.sql("VACUUM deltalake_db.trainer_delta RETAIN 2 DAYS DRY RUN").show(truncate=False)

โ–ช 3) ํ…Œ์ด๋ธ” ์ƒ์„ฑ์‹œ ์œ ์ง€ ๊ธฐ๊ฐ„ ์„ค์ •

  • delta ์œ ํ˜• ํ…Œ์ด๋ธ” ์ƒ์„ฑ์‹œ ๊ธฐ๋ณธ retention ๊ธฐ๊ฐ„์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta_2 (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING delta
LOCATION '/workspace/spark/deltalake/delta_local/trainer_delta_2/'
TBLPROPERTIES ('delta.deletedFileRetentionDuration' = 'interval 2 days');
"""

spark.sql(query)

๐Ÿ”Ÿ Parquet to Delta ๋ณ€ํ™˜

โ–ช 1) ์ผ๋ฐ˜ parquet ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜

๐Ÿ‘‰ fish_data.csv ๋ฐ์ดํ„ฐ๋ฅผ parquet์œผ๋กœ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

# csv ํŒŒ์ผ ์ฝ์–ด์˜ค๊ธฐ
fish = spark.read.option('header', 'true').csv('fish_data.csv')

# ๋กœ์ปฌ ๋””๋ ‰ํ† ๋ฆฌ ์ €์žฅ + Catalog ์ €์žฅ
fish.write \
    .mode('overwrite') \
    .format('parquet') \
    .option('path', '/workspace/spark/deltalake/delta_local/fish_parquet/') \
    .saveAsTable('deltalake_db.fish_parquet')

๐Ÿ‘‰ parquet์œผ๋กœ ์ €์žฅ๋˜์–ด ์žˆ๋˜ ๋ฐ์ดํ„ฐ๋ฅผ delta๋กœ ๋ณ€ํ™˜

query = """
CONVERT TO DELTA
parquet.`/workspace/spark/deltalake/delta_local/fish_parquet/`
"""

spark.sql(query)

โ–ช 2) ํŒŒํ‹ฐ์…˜ ๋œ parquet ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜

๐Ÿ‘‰ Species ์ปฌ๋Ÿผ์œผ๋กœ ํŒŒํ‹ฐ์…˜ ๋œ parquet ๋ฐ์ดํ„ฐ ์“ฐ๊ธฐ

fish_df.write.mode('overwrite')\
    .format('parquet') \
    .partitionBy('Species') \
    .option('path', '/workspace/spark/deltalake/delta_local/fish_parquet_partitioned/') \
    .saveAsTable('deltalake_db.fish_parquet_partitioned')

๐Ÿ‘‰ ํŒŒํ‹ฐ์…˜ ๋œ parquet ๋ฐ์ดํ„ฐ๋ฅผ delta๋กœ ๋ณ€ํ™˜

query = """
CONVERT TO DELTA 
parquet.`/workspace/spark/deltalake/delta_local/fish_parquet_partitioned/`
PARTITIONED BY (Species STRING)
"""
spark.sql(query)

์ฐธ๊ณ ์ž๋ฃŒ

profile
๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด์˜ ์ž‘์—…๊ณต๊ฐ„ / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

3๊ฐœ์˜ ๋Œ“๊ธ€

comment-user-thumbnail
2025๋…„ 3์›” 27์ผ

์•ˆ๋…•ํ•˜์„ธ์š” ์ข‹์€ ๊ธ€ ์ž˜๋ดค์Šต๋‹ˆ๋‹ค. ์งˆ๋ฌธ์ด ํ•˜๋‚˜ ์žˆ๋Š”๋ฐ, ์Šคํ‚ค๋งˆ๋ฅผ ๊ฐ•์ œ๋ณ‘ํ•ฉํ•˜๋Š” ๊ฒƒ ๊นŒ์ง€๋Š” ์ดํ•ดํ–‡๋Š”๋ฐ, ๊ธฐ์กด ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด์กดํ•˜๋ฉด์„œ migration ํ•  ๋•Œ๋„ ์ด ๋ฐฉ์‹์ด ๊ฐ€๋Šฅํ•œ๊ฑด๊ฐ€์š”?

2๊ฐœ์˜ ๋‹ต๊ธ€