delta-spark
λΌμ΄λΈλ¬λ¦¬λ₯Ό νμ©ν΄λ³΄λ λ΄μ©μ λ΄μμ΅λλ€.hyunsoolee0506/pyspark-cloud:3.5.1
μ΄λ―Έμ§λ‘ 컨ν
μ΄λλ₯Ό μμ±νμλ κ²μ κΆμ₯λ립λλ€λ§ μ΄λ² μ€μ΅μ κ²½μ°λ Google Colabμμ μ§νν΄λ 무방ν©λλ€.delta_data.zip
νμΌ μμ μλ λ κ°μ§ CSV νμΌ λ°μ΄ν°λ₯Ό μ¬μ©νμμ΅λλ€.π delta_data.zip
/workspace/spark
λλ ν λ¦¬κ° λ§€νλλλ‘ μ€μ ν©λλ€.docker run -d \
--name pyspark \
-p 8888:8888 \
-p 4040:4040 \
-v [μ¬μ©μ λλ ν 리]:/workspace/spark \
hyunsoolee0506/pyspark-cloud:3.5.1
8888
ν¬νΈλ‘ μ μνλ©΄ juypter lab κ°λ° νκ²½μ μ μν μ μμ΅λλ€.hyunsoolee0506/pyspark-cloud:3.5.1
μ΄λ―Έμ§μλ μ΄λ―Έ μ€μΉλμ΄ μμ§λ§ colabμ κ²½μ° μλ μ½λ μ€νμ ν΅ν΄ λΌμ΄λΈλ¬λ¦¬λ€μ μ€μΉν΄μΌ ν©λλ€.pip install pyspark==3.5.1 delta-spark==3.2.0 pyarrow findspark
from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
builder = SparkSession.builder.appName("DeltaLakeLocal") \
.enableHiveSupport() \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS deltalake_db")
spark.sql("SHOW DATABASES").show()
---
+------------+
| namespace|
+------------+
| default|
|deltalake_db|
+------------+
"""
ν
μ΄λΈ μ΄λ¦ : trainer
μ€ν€λ§ :
- id β INT
- name β STRING
- age β INT
- hometown β STRING
- prefer_type β STRING
- badge_count β INT
- level β STRING
"""
spark.sql(f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING csv
OPTIONS (
path '/workspace/spark/deltalake/dataset/trainer_data.csv',
header 'true',
inferSchema 'true',
delimiter ','
)
""")
spark.sql("SHOW TABLES FROM deltalake_db").show()
---
+------------+-------------+-----------+
| namespace| tableName|isTemporary|
+------------+-------------+-----------+
|deltalake_db| trainer| false|
+------------+-------------+-----------+
LOCAL_DELTA_PATH
λ³μμ delta
ν
μ΄λΈμ΄ μ μ₯λ λλ ν 리λ₯Ό μ μ₯ν©λλ€. μ΄ λλ ν 리λ λ‘컬 λλ ν λ¦¬κ° λ μλ μκ³ s3λ GCS κ°μ ν΄λΌμ°λ μ€ν 리μ§μ κ²½λ‘κ° λ μλ μμ΅λλ€.
LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta/'
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING delta
LOCATION '{LOCAL_DELTA_PATH}'
"""
spark.sql(query)
spark.sql("SHOW TABLES FROM deltalake_db").show()
---
+------------+-------------+-----------+
| namespace| tableName|isTemporary|
+------------+-------------+-----------+
|deltalake_db| trainer| false|
|deltalake_db|trainer_delta| false|
+------------+-------------+-----------+
delta
ν
μ΄λΈμ μ½μ
ν©λλ€.query = """
INSERT INTO deltalake_db.trainer_delta
SELECT * FROM deltalake_db.trainer;
"""
spark.sql(query)
spark.sql('SELECT * FROM deltalake_db.trainer_delta').show(5)
---
+---+-----------+---+--------+-----------+-----------+------------+
| id| name|age|hometown|prefer_type|badge_count| level|
+---+-----------+---+--------+-----------+-----------+------------+
| 1| Brian| 28| Seoul| Electric| 8| Master|
| 2| Sabrina| 23| Busan| Water| 6| Advanced|
| 3| Susan| 18| Gwangju| Rock| 7| Expert|
| 4| Martin| 20| Incheon| Grass| 5| Advanced|
| 5| Gabrielle| 30| Daegu| Flying| 6| Advanced|
+---+-----------+---+--------+-----------+-----------+------------+
_delta_log
ν΄λκ° μμ±λ κ²μ νμΈν μ μμ΅λλ€.delta
νμμΈμ§ νμΈν©λλ€.spark
λ μμ μμ±ν SparkSessionμ λν κ°μ΄ λ΄κ²¨μλ λ³μμ
λλ€.DeltaTable.isDeltaTable(spark, LOCAL_DELTA_PATH)
---
True
1) μ μ₯λ ν μ΄λΈ μ΄λ¦μΌλ‘ μ½μ΄μ€κΈ°
dt = DeltaTable.forName(spark, "deltalake_db.trainer_delta")
2) ν μ΄λΈμ΄ μ μ₯λ κ²½λ‘λ‘ μ½μ΄μ€κΈ°
dt = DeltaTable.forPath(spark, LOCAL_DELTA_PATH)
delta.tables.DeltaTable
νμ
μΌλ‘ μ μ₯λ©λλ€. λ°λΌμ λ°μ΄ν°λ₯Ό μ‘°ννκΈ° μν΄μλ spark dataframeμΌλ‘ λ³ν ν show()
λ©μλλ‘ μ‘°νν©λλ€.dt.toDF().show(5)
---
+---+---------+---+--------+-----------+-----------+--------+
| id| name|age|hometown|prefer_type|badge_count| level|
+---+---------+---+--------+-----------+-----------+--------+
| 1| Brian| 28| Seoul| Electric| 8| Master|
| 2| Sabrina| 23| Busan| Water| 6|Advanced|
| 3| Susan| 18| Gwangju| Rock| 7| Expert|
| 4| Martin| 20| Incheon| Grass| 5|Advanced|
| 5|Gabrielle| 30| Daegu| Flying| 6|Advanced|
+---+---------+---+--------+-----------+-----------+--------+
delta.tables.DeltaTable
νμ
μ λΉμ΄μλ ν
μ΄λΈμ μμ±ν©λλ€.create
λ©μλλ§ μ€μ΅ν΄λ³΄λλ‘ νκ² μ΅λλ€.create
: μλ‘μ΄ DeltaTableμ μμ±ν©λλ€. ν
μ΄λΈμ΄ μ΄λ―Έ μ‘΄μ¬νλ©΄ μ€λ₯κ° λ°μν©λλ€.createIfNotExists
: μλ‘μ΄ DeltaTableμ μμ±ν©λλ€. ν
μ΄λΈμ΄ μ΄λ―Έ μ‘΄μ¬ν΄λ μ€λ₯κ° λμ§ μμ΅λλ€.createOrReplace
: μλ‘μ΄ DeltaTableμ μμ±νκ±°λ λμΌν μ΄λ¦μ κΈ°μ‘΄ ν
μ΄λΈμ λ체ν©λλ€.my_dt = DeltaTable.create(spark) \
.tableName("my_table") \
.addColumn("id", "INT") \
.addColumn("name", "STRING") \
.addColumn("age", "INT") \
.execute()
my_dt.toDF().show()
---
+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+
createOrReplace
λ©μλμ λΉμ·νκ² κΈ°μ‘΄ DeltaTableμ μλ‘μ΄ μ€ν€λ§μ ν
μ΄λΈλ‘ λ체ν λ μ¬μ©ν©λλ€.df = spark.createDataFrame([('Ryan', 31), ('Alice', 27), ('Ruby', 24)], ["name", "age"])
my_dt = DeltaTable.replace(spark) \
.tableName("my_table") \
.addColumns(df.schema) \
.execute()
my_dt.show()
---
+----+---+
|name|age|
+----+---+
+----+---+
dt.update(
condition="id >= 5 AND id <= 10",
set={'level' : "'Delta_Update'"}
)
prefer_type 컬λΌμμ λ°μ΄ν°κ° `Rock'μΈ νμ λͺ¨λ μμ ν©λλ€.
dt.delete(
condition="prefer_type = 'Rock'"
)
merge()
λ ν
μ΄λΈμ λ°μ΄ν°λ₯Ό upsert(μ
λ°μ΄νΈ λλ μ½μ
)νκ±°λ μμ νλ λ° λ§€μ° μ μ©ν κΈ°λ₯μ
λλ€.whenMatchedDelete
: μμ€μ λμ ν
μ΄λΈμ λ μ½λκ° λ§€μΉλ λ, ν΄λΉ λ μ½λλ₯Ό μμ whenMatchedUpdate
: μμ€μ λμ ν
μ΄λΈμ λ μ½λκ° λ§€μΉλ λ, ν΄λΉ λ μ½λλ₯Ό μ
λ°μ΄νΈwhenMatchedUpdateAll
: μμ€μ λμ ν
μ΄λΈμ λ μ½λκ° λ§€μΉλ λ, λͺ¨λ 컬λΌμ μμ€ λ°μ΄ν°λ‘ μ
λ°μ΄νΈwhenNotMatchedBySourceDelete
: μμ€ λ°μ΄ν°μ μλ λμ ν
μ΄λΈμ λ μ½λλ₯Ό μμ whenNotMatchedBySourceUpdate
: μμ€ λ°μ΄ν°μ μλ λμ ν
μ΄λΈμ λ μ½λλ₯Ό μ
λ°μ΄νΈwhenNotMatchedInsert
: μμ€ λ°μ΄ν°κ° λμ ν
μ΄λΈμ μλ κ²½μ°, μλ‘μ΄ λ μ½λλ₯Ό μ½μ
whenNotMatchedInsertAll
: μμ€ λ°μ΄ν°κ° λμ ν
μ΄λΈμ μλ κ²½μ°, λͺ¨λ 컬λΌμ μ½μ
withSchemaEvolution
: μ€ν€λ§κ° λ³κ²½λ κ²½μ°(μ: μμ€ λ°μ΄ν°μ μλ‘μ΄ μ»¬λΌμ΄ μΆκ°λ¨), λμ ν
μ΄λΈμ μ€ν€λ§λ₯Ό μλμΌλ‘ μ
λ°μ΄νΈ1) λμ ν μ΄λΈκ³Ό μμ€ ν μ΄λΈ μμ±
# λμ ν
μ΄λΈ β κΈ°μ‘΄ trainer ν
μ΄λΈμμ 5κ° νλ§ μΆμΆ
dt.delete(
condition="id > 5"
)
# μμ€ ν
μ΄λΈ
data = [
(1, "Brian", 29, "Seoul", "Electric", 9, "GrandMaster"),
(3, "Susan", 19, "Gwangju", "Rock", 8, "Master"),
(7, "Alex", 25, "Jeju", "Fire", 3, "Beginner"),
(8, "Emily", 22, "Ulsan", "Psychic", 5, "Intermediate")
]
columns = ["id", "name", "age", "hometown", "prefer_type", "badge_count", "level"]
source_df = spark.createDataFrame(data, columns)
2) merge μμ μν
source_df
ν
μ΄λΈ λ°μ΄ν° μ€ κΈ°μ‘΄ dt
μ idκ° κ²ΉμΉλ νμ μ
λ°μ΄νΈ, idκ° μλ νμ μΆκ°νλ μμ
μ μνν©λλ€.dt.alias("target") \
.merge(
source=source_df.alias("source"),
condition="target.id = source.id"
) \
.whenMatchedUpdate(
set={
"name": "source.name",
"age": "source.age",
"hometown": "source.hometown",
"prefer_type": "source.prefer_type",
"badge_count": "source.badge_count",
"level": "source.level"
}
) \
.whenNotMatchedInsert(
values={
"id": "source.id",
"name": "source.name",
"age": "source.age",
"hometown": "source.hometown",
"prefer_type": "source.prefer_type",
"badge_count": "source.badge_count",
"level": "source.level"
}
) \
.execute()
dt.toDF().show()
---
+---+---------+---+--------+-----------+-----------+------------+
| id| name|age|hometown|prefer_type|badge_count| level|
+---+---------+---+--------+-----------+-----------+------------+
| 1| Brian| 29| Seoul| Electric| 9| GrandMaster|
| 2| Sabrina| 23| Busan| Water| 6| Advanced|
| 3| Susan| 19| Gwangju| Rock| 8| Master|
| 4| Martin| 20| Incheon| Grass| 5| Advanced|
| 5|Gabrielle| 30| Daegu| Flying| 6| Advanced|
| 7| Alex| 25| Jeju| Fire| 3| Beginner|
| 8| Emily| 22| Ulsan| Psychic| 5|Intermediate|
+---+---------+---+--------+-----------+-----------+------------+
dt.detail().show(truncate=False)
---
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+
|format|id |name|description|location |createdAt |lastModified |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures |
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+
|delta |e3db23a9-cc4c-4700-95ee-a8b4e06dfbf9|NULL|NULL |file:/workspace/spark/deltalake/delta_local/trainer_delta|2025-03-28 07:31:56.941|2025-04-01 01:15:22.165|[] |[] |1 |3984 |{} |1 |2 |[appendOnly, invariants]|
+------+------------------------------------+----+-----------+---------------------------------------------------------+-----------------------+-----------------------+----------------+-----------------+--------+-----------+----------+----------------+----------------+------------------------+
dt.history().show(truncate=False)
---
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|3 |2025-04-01 01:15:22.165|NULL |NULL |DELETE |{predicate -> ["(prefer_type#3178 = Rock)"]} |NULL|NULL |NULL |2 |Serializable |false |{numRemovedFiles -> 1, numRemovedBytes -> 3997, numCopiedRows -> 89, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 789, numDeletionVectorsUpdated -> 0, numDeletedRows -> 1, scanTimeMs -> 494, numAddedFiles -> 1, numAddedBytes -> 3984, rewriteTimeMs -> 294} |NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|2 |2025-04-01 01:13:45.637|NULL |NULL |UPDATE |{predicate -> ["((id#3174 >= 5) AND (id#3174 <= 10))"]} |NULL|NULL |NULL |1 |Serializable |false |{numRemovedFiles -> 1, numRemovedBytes -> 3980, numCopiedRows -> 84, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 1604, numDeletionVectorsUpdated -> 0, scanTimeMs -> 984, numAddedFiles -> 1, numUpdatedRows -> 6, numAddedBytes -> 3997, rewriteTimeMs -> 618}|NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|1 |2025-03-28 07:32:09.347|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 1, numOutputRows -> 90, numOutputBytes -> 3980} |NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
|0 |2025-03-28 07:31:57.505|NULL |NULL |CREATE TABLE|{partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}}|NULL|NULL |NULL |NULL |Serializable |true |{} |NULL |Apache-Spark/3.5.1 Delta-Lake/3.2.0|
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
generate
ν¨μλ₯Ό μ€ννλ©΄ delta ν
μ΄λΈ λλ ν 리μ _symlink_format_manifest
λλ ν λ¦¬κ° μμ±λκ³ , κ·Έ μμ νμ¬ λ²μ μ parquet νμΌμ κ°λ¦¬ν€λ λ©λνμ€νΈ νμΌμ΄ μμ±λ©λλ€._symlink_format_manifest
λ₯Ό μ¬μ©νλ©΄ delta ν
μ΄λΈμ parquet νμΌλ‘ ννν λ§€λνμ€νΈλ₯Ό μ 곡νλ―λ‘, μ΄λ¬ν μμ§μμ ν
μ΄λΈμ 쿼리ν μ μμ΅λλ€.dt.generate("symlink_format_manifest")
dt.restoreToVersion(1)
dt.toDF().show(10)
SEARCH_TIME
μ μ μ₯λ μκ°μ μ‘΄μ¬νλ ν
μ΄λΈ λͺ¨μ΅μ μ‘°νν©λλ€.SEARCH_TIME = '2025-03-28 07:32:00'
dt.restoreToTimestamp(SEARCH_TIME)
dt.toDF().show(10)
executeZOrderBy
μ΅μ
μ μ§μ λ 컬λΌμ λν΄ Z-Order ν΄λ¬μ€ν°λ§μ μ μ©νμ¬ λ°μ΄ν°λ₯Ό 물리μ μΌλ‘ μ¬λ°°μΉμν΅λλ€. 쿼리μμ ν΄λΉ 컬λΌμ λν νν°λ§μ΄λ μ‘°μΈμ΄ λΉλ²ν λ ν¨κ³Όμ μ
λλ€.# νμ€ μ΅μ ν λ°©λ²
dt.optimize().executeCompaction()
# νΉμ 컬λΌμ λμμΌλ‘ μ΅μ ν
dt.optimize().executeZOrderBy('level')
retentionHours
νλΌλ―Έν°λ₯Ό ν΅ν΄ νΉμ μκ° μ΄νμ λ°μ΄ν°λ₯Ό μ§μ°λλ‘ ν μ μμ΅λλ€.dt.vacuum(
retentionHours=10
)
trainer_data.csv
νμΌμ μ½μ΄μ ν
μ΄λΈκ³Ό κ²½λ‘μ parquet
νμ
μΌλ‘ μ μ₯ν©λλ€.# 1. CSV νμΌμ DataFrameμΌλ‘ μ½κΈ°
DATA_PATH = '/workspace/spark/deltalake/dataset/trainer_data.csv'
SAVE_PATH = '/workspace/spark/deltalake/delta_local/spark-warehouse/trainer_parquet/'
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)
# 2. ν
μ΄λΈ μμ±
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_parquet (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING parquet
LOCATION '{SAVE_PATH}'
"""
spark.sql(query)
# 3. DataFrameμ ν
μ΄λΈκ³Ό κ²½λ‘μ μ μ₯
df.write.mode("overwrite") \
.option("path", SAVE_PATH) \
.saveAsTable("deltalake_db.trainer_parquet")
# μ μ₯λ ν
μ΄λΈμ deltaλ‘ λ³ν
DeltaTable.convertToDelta(spark, "deltalake_db.trainer_parquet")
# λλ ν 리μ μ μ₯λ νμΌ λ°μ΄ν°λ₯Ό deltaλ‘ λ³ν
DeltaTable.convertToDelta(spark, f"parquet.`{SAVE_PATH}`")