🌊 Delta Lake μž…λ¬Έμžλ₯Ό μœ„ν•œ κ°€μ΄λ“œ - μ‹€μ „νŽΈ(Part 2. delta-spark 라이브러리 ν™œμš©)

NewNewDaddyΒ·2025λ…„ 4μ›” 1일
0

데이터 뢄석

λͺ©λ‘ 보기
9/9
post-thumbnail

0. INTRO

  • μ•žμ„  κΈ€ 🌊 Delta Lake μž…λ¬Έμžλ₯Ό μœ„ν•œ κ°€μ΄λ“œ - μ‹€μ „νŽΈ(Part 1. 둜컬 ν™˜κ²½)μ—μ„œλŠ” Pyspark Docker Container ν™˜κ²½μ—μ„œ Pysparkλ₯Ό ν™œμš©ν•˜μ—¬ delta μœ ν˜•μ˜ νŒŒμΌλ“€μ„ μƒμ„±ν•˜κ³  λ‹€λ€„λ³΄λŠ” μ‹€μŠ΅μ„ μ§„ν–‰ν•˜μ˜€μŠ΅λ‹ˆλ‹€.
  • 이번 Part 2 μ‹€μŠ΅μ—μ„œλŠ” μ„ΈλΆ€ λ‚΄μš©μ€ μœ μ‚¬ν•˜μ§€λ§Œ, Delta ν˜•μ‹μ˜ 데이터λ₯Ό Spark둜 보닀 κ°„νŽΈν•˜κ²Œ λ‹€λ£° 수 μžˆλ„λ‘ λ„μ™€μ£ΌλŠ” delta-spark 라이브러리λ₯Ό ν™œμš©ν•΄λ³΄λŠ” λ‚΄μš©μ„ λ‹΄μ•˜μŠ΅λ‹ˆλ‹€.
  • μ‹€μŠ΅ ν™˜κ²½μ˜ 경우 hyunsoolee0506/pyspark-cloud:3.5.1 μ΄λ―Έμ§€λ‘œ μ»¨ν…Œμ΄λ„ˆλ₯Ό μƒμ„±ν•˜μ‹œλŠ” 것을 ꢌμž₯λ“œλ¦½λ‹ˆλ‹€λ§Œ 이번 μ‹€μŠ΅μ˜ κ²½μš°λŠ” Google Colabμ—μ„œ 진행해도 λ¬΄λ°©ν•©λ‹ˆλ‹€.
  • μ‹€μŠ΅μ—μ„œλŠ” μ•„λž˜ delta_data.zip파일 μ•ˆμ— μžˆλŠ” 두 κ°€μ§€ CSV 파일 데이터λ₯Ό μ‚¬μš©ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

    πŸ‘‰ delta_data.zip


1️⃣ κΈ°λ³Έ ν™˜κ²½ μ„ΈνŒ…

β–ͺ 1) Docker Container 생성

  • μ‚¬μš©μžμ˜ μ»΄ν“¨ν„°μ—μ„œ volume으둜 μ‚¬μš©ν•  디렉토리와 μ»¨ν…Œμ΄λ„ˆ λ‚΄λΆ€ /workspace/spark 디렉토리가 λ§€ν•‘λ˜λ„λ‘ μ„€μ •ν•©λ‹ˆλ‹€.
docker run -d \
    --name pyspark \
    -p 8888:8888 \
    -p 4040:4040 \
    -v [μ‚¬μš©μž 디렉토리]:/workspace/spark \
    hyunsoolee0506/pyspark-cloud:3.5.1
  • μœ„ λͺ…λ Ήμ–΄ μ‹€ν–‰ ν›„ 8888 포트둜 μ ‘μ†ν•˜λ©΄ juypter lab 개발 ν™˜κ²½μ— 접속할 수 μžˆμŠ΅λ‹ˆλ‹€.

β–ͺ 2) 라이브러리 μ„€μΉ˜

  • μ‹€μŠ΅μ— ν•„μš”ν•œ λΌμ΄λΈŒλŸ¬λ¦¬λ“€μ„ μ„€μΉ˜ν•©λ‹ˆλ‹€. hyunsoolee0506/pyspark-cloud:3.5.1 μ΄λ―Έμ§€μ—λŠ” 이미 μ„€μΉ˜λ˜μ–΄ μžˆμ§€λ§Œ colab의 경우 μ•„λž˜ μ½”λ“œ 싀행을 톡해 λΌμ΄λΈŒλŸ¬λ¦¬λ“€μ„ μ„€μΉ˜ν•΄μ•Ό ν•©λ‹ˆλ‹€.
pip install pyspark==3.5.1 delta-spark==3.2.0 pyarrow findspark

β–ͺ 3) Pyspark delta lake ν™˜κ²½ μ„€μ •

from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

builder = SparkSession.builder.appName("DeltaLakeLocal") \
    .enableHiveSupport() \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

2️⃣ delta ν˜•μ‹ ν…Œμ΄λΈ” 생성

β–ͺ 1) λ°μ΄ν„°λ² μ΄μŠ€ 생성

spark.sql("CREATE DATABASE IF NOT EXISTS deltalake_db")

spark.sql("SHOW DATABASES").show()

---
+------------+
|   namespace|
+------------+
|     default|
|deltalake_db|
+------------+

β–ͺ 2) 일반 ν…Œμ΄λΈ” 생성

"""
ν…Œμ΄λΈ” 이름 : trainer
μŠ€ν‚€λ§ˆ : 
  - id β†’ INT
  - name β†’ STRING
  - age β†’ INT
  - hometown β†’ STRING
  - prefer_type β†’ STRING
  - badge_count β†’ INT
  - level β†’ STRING
""" 

spark.sql(f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING csv
OPTIONS (
  path '/workspace/spark/deltalake/dataset/trainer_data.csv',
  header 'true',
  inferSchema 'true',
  delimiter ','
)
""")

spark.sql("SHOW TABLES FROM deltalake_db").show()

---
+------------+-------------+-----------+
|   namespace|    tableName|isTemporary|
+------------+-------------+-----------+
|deltalake_db|      trainer|      false|
+------------+-------------+-----------+

β–ͺ 3) delta ν…Œμ΄λΈ” 생성

LOCAL_DELTA_PATH λ³€μˆ˜μ— delta ν…Œμ΄λΈ”μ΄ μ €μž₯될 디렉토리λ₯Ό μ €μž₯ν•©λ‹ˆλ‹€. 이 λ””λ ‰ν† λ¦¬λŠ” 둜컬 디렉토리가 될 μˆ˜λ„ 있고 s3λ‚˜ GCS 같은 ν΄λΌμš°λ“œ μŠ€ν† λ¦¬μ§€μ˜ κ²½λ‘œκ°€ 될 μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta/'

query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING delta
LOCATION '{LOCAL_DELTA_PATH}'
"""
spark.sql(query)

spark.sql("SHOW TABLES FROM deltalake_db").show()

---
+------------+-------------+-----------+
|   namespace|    tableName|isTemporary|
+------------+-------------+-----------+
|deltalake_db|      trainer|      false|
|deltalake_db|trainer_delta|      false|
+------------+-------------+-----------+

β–ͺ 4) delta ν…Œμ΄λΈ”μ— 데이터 μ‚½μž…

  • CSV둜 μƒμ„±ν•œ 일반 ν…Œμ΄λΈ”μ˜ 데이터λ₯Ό delta ν…Œμ΄λΈ”μ— μ‚½μž…ν•©λ‹ˆλ‹€.
query = """
INSERT INTO deltalake_db.trainer_delta
SELECT * FROM deltalake_db.trainer;
"""
spark.sql(query)

spark.sql('SELECT * FROM deltalake_db.trainer_delta').show(5)

---
+---+-----------+---+--------+-----------+-----------+------------+
| id|       name|age|hometown|prefer_type|badge_count|       level|
+---+-----------+---+--------+-----------+-----------+------------+
|  1|      Brian| 28|   Seoul|   Electric|          8|      Master|
|  2|    Sabrina| 23|   Busan|      Water|          6|    Advanced|
|  3|      Susan| 18| Gwangju|       Rock|          7|      Expert|
|  4|     Martin| 20| Incheon|      Grass|          5|    Advanced|
|  5|  Gabrielle| 30|   Daegu|     Flying|          6|    Advanced|
+---+-----------+---+--------+-----------+-----------+------------+
  • 디렉토리λ₯Ό ν™•μΈν•˜λ©΄ μ•„λž˜μ™€ 같이 _delta_log 폴더가 μƒμ„±λœ 것을 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

3️⃣ DeltaTable둜 데이터 읽기

β–ͺ 1) delta ν˜•μ‹μΈμ§€ 확인

  • 디렉토리에 μ €μž₯된 파일이 delta ν˜•μ‹μΈμ§€ ν™•μΈν•©λ‹ˆλ‹€.
  • μ—¬κΈ°μ„œ νŒŒλΌλ―Έν„°μ— λ“€μ–΄κ°€λŠ” sparkλŠ” μœ„μ— μƒμ„±ν•œ SparkSession에 λŒ€ν•œ 값이 λ‹΄κ²¨μžˆλŠ” λ³€μˆ˜μž…λ‹ˆλ‹€.
DeltaTable.isDeltaTable(spark, LOCAL_DELTA_PATH)

---
True

β–ͺ 2) delta ν…Œμ΄λΈ” 읽기

  • delta ν…Œμ΄λΈ”μ„ 읽은 방식은 두 κ°€μ§€κ°€ μžˆμŠ΅λ‹ˆλ‹€.

1) μ €μž₯된 ν…Œμ΄λΈ” μ΄λ¦„μœΌλ‘œ μ½μ–΄μ˜€κΈ°

dt = DeltaTable.forName(spark, "deltalake_db.trainer_delta")

2) ν…Œμ΄λΈ”μ΄ μ €μž₯된 경둜둜 μ½μ–΄μ˜€κΈ°

dt = DeltaTable.forPath(spark, LOCAL_DELTA_PATH)

β–ͺ 3) DeltaTable을 spark dataframe으둜 λ³€ν™˜

  • μœ„μ˜ μ½”λ“œλ‘œ μ½μ–΄μ˜€κ²Œ 되면 delta.tables.DeltaTable νƒ€μž…μœΌλ‘œ μ €μž₯λ©λ‹ˆλ‹€. λ”°λΌμ„œ 데이터λ₯Ό μ‘°νšŒν•˜κΈ° μœ„ν•΄μ„œλŠ” spark dataframe으둜 λ³€ν™˜ ν›„ show() λ©”μ†Œλ“œλ‘œ μ‘°νšŒν•©λ‹ˆλ‹€.
dt.toDF().show(5)

---
+---+---------+---+--------+-----------+-----------+--------+
| id|     name|age|hometown|prefer_type|badge_count|   level|
+---+---------+---+--------+-----------+-----------+--------+
|  1|    Brian| 28|   Seoul|   Electric|          8|  Master|
|  2|  Sabrina| 23|   Busan|      Water|          6|Advanced|
|  3|    Susan| 18| Gwangju|       Rock|          7|  Expert|
|  4|   Martin| 20| Incheon|      Grass|          5|Advanced|
|  5|Gabrielle| 30|   Daegu|     Flying|          6|Advanced|
+---+---------+---+--------+-----------+-----------+--------+

4️⃣ DeltaTable ν˜•μ‹ ν…Œμ΄λΈ” 생성

β–ͺ 1) create

  • delta.tables.DeltaTable νƒ€μž…μ˜ λΉ„μ–΄μžˆλŠ” ν…Œμ΄λΈ”μ„ μƒμ„±ν•©λ‹ˆλ‹€.
  • create κ΄€λ ¨ν•΄μ„œλŠ” μ•„λž˜ μ„Έ κ°€μ§€ μ’…λ₯˜κ°€ μžˆλŠ”λ° ν™œμš©λ²•μ΄ λΉ„μŠ·ν•˜λ―€λ‘œ create λ©”μ†Œλ“œλ§Œ μ‹€μŠ΅ν•΄λ³΄λ„λ‘ ν•˜κ² μŠ΅λ‹ˆλ‹€.
    • create : μƒˆλ‘œμš΄ DeltaTable을 μƒμ„±ν•©λ‹ˆλ‹€. ν…Œμ΄λΈ”μ΄ 이미 μ‘΄μž¬ν•˜λ©΄ 였λ₯˜κ°€ λ°œμƒν•©λ‹ˆλ‹€.
    • createIfNotExists : μƒˆλ‘œμš΄ DeltaTable을 μƒμ„±ν•©λ‹ˆλ‹€. ν…Œμ΄λΈ”μ΄ 이미 μ‘΄μž¬ν•΄λ„ 였λ₯˜κ°€ λ‚˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.
    • createOrReplace : μƒˆλ‘œμš΄ DeltaTable을 μƒμ„±ν•˜κ±°λ‚˜ λ™μΌν•œ μ΄λ¦„μ˜ κΈ°μ‘΄ ν…Œμ΄λΈ”μ„ λŒ€μ²΄ν•©λ‹ˆλ‹€.
my_dt = DeltaTable.create(spark) \
          .tableName("my_table") \
          .addColumn("id", "INT") \
          .addColumn("name", "STRING") \
          .addColumn("age", "INT") \
          .execute()
          
my_dt.toDF().show()

---
+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+

β–ͺ 2) replace

  • μœ„μ˜ createOrReplace λ©”μ†Œλ“œμ™€ λΉ„μŠ·ν•˜κ²Œ κΈ°μ‘΄ DeltaTable을 μƒˆλ‘œμš΄ μŠ€ν‚€λ§ˆμ˜ ν…Œμ΄λΈ”λ‘œ λŒ€μ²΄ν•  λ•Œ μ‚¬μš©ν•©λ‹ˆλ‹€.
df = spark.createDataFrame([('Ryan', 31), ('Alice', 27), ('Ruby', 24)], ["name", "age"])

my_dt = DeltaTable.replace(spark) \
    .tableName("my_table") \
    .addColumns(df.schema) \
    .execute()

my_dt.show()

---
+----+---+
|name|age|
+----+---+
+----+---+

5️⃣ DeltaTable μ—…λ°μ΄νŠΈ, μ‚­μ œ, 병합

β–ͺ 1) UPDATE

  • dataframeμ—μ„œ idκ°€ 5~10에 ν•΄λ‹Ήν•˜λŠ” row의 level μ»¬λŸΌμ„ 'Delta_Update'둜 λ³€κ²½ν•©λ‹ˆλ‹€.
dt.update(
    condition="id >= 5 AND id <= 10",
    set={'level' : "'Delta_Update'"}
)

β–ͺ 2) DELETE

prefer_type μ»¬λŸΌμ—μ„œ 데이터가 `Rock'인 행을 λͺ¨λ‘ μ‚­μ œν•©λ‹ˆλ‹€.

dt.delete(
    condition="prefer_type = 'Rock'"
)

β–ͺ 3) MERGE

  • merge()λŠ” ν…Œμ΄λΈ”μ— 데이터λ₯Ό upsert(μ—…λ°μ΄νŠΈ λ˜λŠ” μ‚½μž…)ν•˜κ±°λ‚˜ μ‚­μ œν•˜λŠ” 데 맀우 μœ μš©ν•œ κΈ°λŠ₯μž…λ‹ˆλ‹€.
  • merge λ©”μ†Œλ“œμ˜ μ˜΅μ…˜
    • whenMatchedDelete : μ†ŒμŠ€μ™€ λŒ€μƒ ν…Œμ΄λΈ”μ˜ λ ˆμ½”λ“œκ°€ 맀칭될 λ•Œ, ν•΄λ‹Ή λ ˆμ½”λ“œλ₯Ό μ‚­μ œ
    • whenMatchedUpdate : μ†ŒμŠ€μ™€ λŒ€μƒ ν…Œμ΄λΈ”μ˜ λ ˆμ½”λ“œκ°€ 맀칭될 λ•Œ, ν•΄λ‹Ή λ ˆμ½”λ“œλ₯Ό μ—…λ°μ΄νŠΈ
    • whenMatchedUpdateAll : μ†ŒμŠ€μ™€ λŒ€μƒ ν…Œμ΄λΈ”μ˜ λ ˆμ½”λ“œκ°€ 맀칭될 λ•Œ, λͺ¨λ“  μ»¬λŸΌμ„ μ†ŒμŠ€ λ°μ΄ν„°λ‘œ μ—…λ°μ΄νŠΈ
    • whenNotMatchedBySourceDelete : μ†ŒμŠ€ 데이터에 μ—†λŠ” λŒ€μƒ ν…Œμ΄λΈ”μ˜ λ ˆμ½”λ“œλ₯Ό μ‚­μ œ
    • whenNotMatchedBySourceUpdate : μ†ŒμŠ€ 데이터에 μ—†λŠ” λŒ€μƒ ν…Œμ΄λΈ”μ˜ λ ˆμ½”λ“œλ₯Ό μ—…λ°μ΄νŠΈ
    • whenNotMatchedInsert : μ†ŒμŠ€ 데이터가 λŒ€μƒ ν…Œμ΄λΈ”μ— μ—†λŠ” 경우, μƒˆλ‘œμš΄ λ ˆμ½”λ“œλ₯Ό μ‚½μž…
    • whenNotMatchedInsertAll : μ†ŒμŠ€ 데이터가 λŒ€μƒ ν…Œμ΄λΈ”μ— μ—†λŠ” 경우, λͺ¨λ“  μ»¬λŸΌμ„ μ‚½μž…
    • withSchemaEvolution : μŠ€ν‚€λ§ˆκ°€ λ³€κ²½λœ 경우(예: μ†ŒμŠ€ 데이터에 μƒˆλ‘œμš΄ 컬럼이 좔가됨), λŒ€μƒ ν…Œμ΄λΈ”μ˜ μŠ€ν‚€λ§ˆλ₯Ό μžλ™μœΌλ‘œ μ—…λ°μ΄νŠΈ

1) λŒ€μƒ ν…Œμ΄λΈ”κ³Ό μ†ŒμŠ€ ν…Œμ΄λΈ” 생성

# λŒ€μƒ ν…Œμ΄λΈ” β†’ κΈ°μ‘΄ trainer ν…Œμ΄λΈ”μ—μ„œ 5개 ν–‰λ§Œ μΆ”μΆœ
dt.delete(
    condition="id > 5"
)

# μ†ŒμŠ€ ν…Œμ΄λΈ”
data = [
    (1, "Brian", 29, "Seoul", "Electric", 9, "GrandMaster"),
    (3, "Susan", 19, "Gwangju", "Rock", 8, "Master"),
    (7, "Alex", 25, "Jeju", "Fire", 3, "Beginner"),
    (8, "Emily", 22, "Ulsan", "Psychic", 5, "Intermediate")
]

columns = ["id", "name", "age", "hometown", "prefer_type", "badge_count", "level"]
source_df = spark.createDataFrame(data, columns)

2) merge μž‘μ—… μˆ˜ν–‰

  • source_df ν…Œμ΄λΈ” 데이터 쀑 κΈ°μ‘΄ dt와 idκ°€ κ²ΉμΉ˜λŠ” 행은 μ—…λ°μ΄νŠΈ, idκ°€ μ—†λŠ” 행은 μΆ”κ°€ν•˜λŠ” μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€.
dt.alias("target") \
    .merge(
        source=source_df.alias("source"),
        condition="target.id = source.id"
    ) \
    .whenMatchedUpdate(
        set={
            "name": "source.name",
            "age": "source.age",
            "hometown": "source.hometown",
            "prefer_type": "source.prefer_type",
            "badge_count": "source.badge_count",
            "level": "source.level"
        }
    ) \
    .whenNotMatchedInsert(
        values={
            "id": "source.id",
            "name": "source.name",
            "age": "source.age",
            "hometown": "source.hometown",
            "prefer_type": "source.prefer_type",
            "badge_count": "source.badge_count",
            "level": "source.level"
        }
    ) \
    .execute()
    
dt.toDF().show()

---
+---+---------+---+--------+-----------+-----------+------------+
| id|     name|age|hometown|prefer_type|badge_count|       level|
+---+---------+---+--------+-----------+-----------+------------+
|  1|    Brian| 29|   Seoul|   Electric|          9| GrandMaster|
|  2|  Sabrina| 23|   Busan|      Water|          6|    Advanced|
|  3|    Susan| 19| Gwangju|       Rock|          8|      Master|
|  4|   Martin| 20| Incheon|      Grass|          5|    Advanced|
|  5|Gabrielle| 30|   Daegu|     Flying|          6|    Advanced|
|  7|     Alex| 25|    Jeju|       Fire|          3|    Beginner|
|  8|    Emily| 22|   Ulsan|    Psychic|          5|Intermediate|
+---+---------+---+--------+-----------+-----------+------------+

6️⃣ DeltaTable 메타데이터 쑰회

β–ͺ 1) detail

  • Delta ν…Œμ΄λΈ”μ˜ 상세 정보(μŠ€ν‚€λ§ˆ, 속성, 메타데이터 λ“±)λ₯Ό 확인할 λ•Œ μ‚¬μš©ν•©λ‹ˆλ‹€.
dt.detail().show(truncate=False)

---
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+
|format|id                                  |name|description|location                                                 |createdAt              |lastModified           |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures           |
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+
|delta |e3db23a9-cc4c-4700-95ee-a8b4e06dfbf9|NULL|NULL       |file:/workspace/spark/deltalake/delta_local/trainer_delta|2025-03-28 07:31:56.941|2025-04-01 01:15:22.165|[]              |[]               |1       |3984       |{}        |1               |2               |[appendOnly, invariants]|
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+

β–ͺ 2) history

  • Delta ν…Œμ΄λΈ”μ— μˆ˜ν–‰λœ μž‘μ—… 기둝(μ“°κΈ°, μ—…λ°μ΄νŠΈ, μ‚­μ œ λ“±)을 확인할 λ•Œ μ‚¬μš©ν•©λ‹ˆλ‹€.
dt.history().show(truncate=False)

---
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                            |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                                                                                                                                              |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|3      |2025-04-01 01:15:22.165|NULL  |NULL    |DELETE      |{predicate -> ["(prefer_type#3178 = Rock)"]}                                                   |NULL|NULL    |NULL     |2          |Serializable  |false        |{numRemovedFiles -> 1, numRemovedBytes -> 3997, numCopiedRows -> 89, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 789, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 494, numAddedFiles -> 1, numAddedBytes -> 3984, rewriteTimeMs -> 294} |NULL        |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|2      |2025-04-01 01:13:45.637|NULL  |NULL    |UPDATE      |{predicate -> ["((id#3174 >= 5) AND (id#3174 <= 10))"]}                                        |NULL|NULL    |NULL     |1          |Serializable  |false        |{numRemovedFiles -> 1, numRemovedBytes -> 3980, numCopiedRows -> 84, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1604, numDeletionVectorsUpdated -> 0, scanTimeMs -> 984, numAddedFiles -> 1, numUpdatedRows -> 6, numAddedBytes -> 3997, rewriteTimeMs -> 618}|NULL        |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|1      |2025-03-28 07:32:09.347|NULL  |NULL    |WRITE       |{mode -> Append, partitionBy -> []}                                                            |NULL|NULL    |NULL     |0          |Serializable  |true         |{numFiles -> 1, numOutputRows -> 90, numOutputBytes -> 3980}                                                                                                                                                                                                                                                                  |NULL        |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|0      |2025-03-28 07:31:57.505|NULL  |NULL    |CREATE TABLE|{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}}|NULL|NULL    |NULL     |NULL       |Serializable  |true         |{}                                                                                                                                                                                                                                                                                                                            |NULL        |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+

β–ͺ 3) generate

  • Delta Lake ν…Œμ΄λΈ”μ˜ 데이터λ₯Ό μ™ΈλΆ€ μ‹œμŠ€ν…œ(예: Apache Hive, Presto, Amazon Athena λ“±)μ—μ„œ 읽을 수 μžˆλ„λ‘ λ§€λ‹ˆνŽ˜μŠ€νŠΈ νŒŒμΌμ„ μƒμ„±ν•˜λŠ” ν•¨μˆ˜μž…λ‹ˆλ‹€.
  • generate ν•¨μˆ˜λ₯Ό μ‹€ν–‰ν•˜λ©΄ delta ν…Œμ΄λΈ” 디렉토리에 _symlink_format_manifest 디렉토리가 μƒμ„±λ˜κ³ , κ·Έ μ•ˆμ— ν˜„μž¬ λ²„μ „μ˜ parquet νŒŒμΌμ„ κ°€λ¦¬ν‚€λŠ” λ©”λ‹ˆνŽ˜μŠ€νŠΈ 파일이 μž‘μ„±λ©λ‹ˆλ‹€.
  • Presto, Trino, Amazon Athena, Apache Hive와 같은 엔진은 Delta Lake의 둜그 기반 νŠΈλžœμž­μ…˜ μ‹œμŠ€ν…œμ„ 직접 μ§€μ›ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. ν•˜μ§€λ§Œ _symlink_format_manifestλ₯Ό μ‚¬μš©ν•˜λ©΄ delta ν…Œμ΄λΈ”μ„ parquet 파일둜 ν‘œν˜„ν•œ λ§€λ‹ˆνŽ˜μŠ€νŠΈλ₯Ό μ œκ³΅ν•˜λ―€λ‘œ, μ΄λŸ¬ν•œ μ—”μ§„μ—μ„œ ν…Œμ΄λΈ”μ„ 쿼리할 수 μžˆμŠ΅λ‹ˆλ‹€.
dt.generate("symlink_format_manifest")


7️⃣ Time Travel 쿼리

  • λ°μ΄ν„°μ˜ κ³Όκ±° 버전을 μ‘°νšŒν•˜λŠ” Time Travel 쿼리의 경우 νŠΉμ • 버전을 κΈ°μ€€μœΌλ‘œ μ‘°νšŒν•˜λŠ” κ²½μš°μ™€ νŠΉμ • μ‹œκ°„(timestamp)을 κΈ°μ€€μœΌλ‘œ μ‘°νšŒν•˜λŠ” 경우, μ΄λ ‡κ²Œ 두 κ°€μ§€κ°€ κ°€λŠ₯ν•©λ‹ˆλ‹€.

β–ͺ 1) Version

dt.restoreToVersion(1)

dt.toDF().show(10)

β–ͺ 2) Timestamp

  • SEARCH_TIME에 μ €μž₯된 μ‹œκ°„μ— μ‘΄μž¬ν–ˆλ˜ ν…Œμ΄λΈ” λͺ¨μŠ΅μ„ μ‘°νšŒν•©λ‹ˆλ‹€.
SEARCH_TIME = '2025-03-28 07:32:00'

dt.restoreToTimestamp(SEARCH_TIME)

dt.toDF().show(10)

8️⃣ 파일 μƒνƒœ μ΅œμ ν™”

  • μž‘κ²Œ λ‚˜λˆ μ„œ μ €μž₯된 parquet νŒŒμΌλ“€μ„ 합쳐 μš©λŸ‰μ€ 크게, 파일 μˆ˜λŠ” 적게 λ§Œλ“€μ–΄ 데이터λ₯Ό κ΄€λ¦¬ν•©λ‹ˆλ‹€. 이 과정을 톡해 Delta ν…Œμ΄λΈ”μ˜ 쿼리 μ„±λŠ₯을 ν–₯μƒμ‹œν‚¬ 수 μžˆμŠ΅λ‹ˆλ‹€.
  • executeZOrderBy μ˜΅μ…˜μ€ μ§€μ •λœ μ»¬λŸΌμ— λŒ€ν•΄ Z-Order ν΄λŸ¬μŠ€ν„°λ§μ„ μ μš©ν•˜μ—¬ 데이터λ₯Ό 물리적으둜 μž¬λ°°μΉ˜μ‹œν‚΅λ‹ˆλ‹€. μΏΌλ¦¬μ—μ„œ ν•΄λ‹Ή μ»¬λŸΌμ— λŒ€ν•œ ν•„ν„°λ§μ΄λ‚˜ 쑰인이 λΉˆλ²ˆν•  λ•Œ νš¨κ³Όμ μž…λ‹ˆλ‹€.

β–ͺ 1) OPTIMIZE

# ν‘œμ€€ μ΅œμ ν™” 방법
dt.optimize().executeCompaction()

# νŠΉμ • μ»¬λŸΌμ„ λŒ€μƒμœΌλ‘œ μ΅œμ ν™”
dt.optimize().executeZOrderBy('level')

β–ͺ 2) VACUUM

  • 더 이상 ν•„μš” μ—†λŠ” 였래된 데이터 버전을 μ‚­μ œν•˜μ—¬ μ €μž₯ 곡간을 확보할 λ•Œ μ‚¬μš©ν•©λ‹ˆλ‹€.
  • 기본은 7일(168μ‹œκ°„)이며, retentionHours νŒŒλΌλ―Έν„°λ₯Ό 톡해 νŠΉμ • μ‹œκ°„ μ΄ν›„μ˜ 데이터λ₯Ό μ§€μš°λ„λ‘ ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
dt.vacuum(
    retentionHours=10
)

9️⃣ Parquet to Delta λ³€ν™˜

β–ͺ 1) parquet 파일 μ €μž₯

  • trainer_data.csv νŒŒμΌμ„ 읽어와 ν…Œμ΄λΈ”κ³Ό κ²½λ‘œμ— parquet νƒ€μž…μœΌλ‘œ μ €μž₯ν•©λ‹ˆλ‹€.
# 1. CSV νŒŒμΌμ„ DataFrame으둜 읽기
DATA_PATH = '/workspace/spark/deltalake/dataset/trainer_data.csv'
SAVE_PATH = '/workspace/spark/deltalake/delta_local/spark-warehouse/trainer_parquet/'

df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

# 2. ν…Œμ΄λΈ” 생성
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_parquet (
  id INT,
  name STRING,
  age INT,
  hometown STRING,
  prefer_type STRING,
  badge_count INT,
  level STRING
)
USING parquet
LOCATION '{SAVE_PATH}'
"""
spark.sql(query)

# 3. DataFrame을 ν…Œμ΄λΈ”κ³Ό κ²½λ‘œμ— μ €μž₯
df.write.mode("overwrite") \
    .option("path", SAVE_PATH) \
    .saveAsTable("deltalake_db.trainer_parquet")

β–ͺ 2) delta둜 λ³€ν™˜

# μ €μž₯된 ν…Œμ΄λΈ”μ„ delta둜 λ³€ν™˜
DeltaTable.convertToDelta(spark, "deltalake_db.trainer_parquet")

# 디렉토리에 μ €μž₯된 파일 데이터λ₯Ό delta둜 λ³€ν™˜
DeltaTable.convertToDelta(spark, f"parquet.`{SAVE_PATH}`")

참고자료

profile
데이터 μ—”μ§€λ‹ˆμ–΄μ˜ μž‘μ—…κ³΅κ°„ / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 λŒ“κΈ€