hyunsoolee0506/pyspark-cloud:3.5.1
์ด๋ฏธ์ง๋ก ์์ฑํ์๋ ๊ฒ์ ๊ถ์ฅ๋๋ฆฝ๋๋ค. ํ์ง๋ง ์ด๋ฒ ๋ก์ปฌ ํ๊ฒฝ ์ค์ต์ ๊ฒฝ์ฐ๋ Google Colab์์ ์งํํด๋ ๋ฌด๋ฐฉํฉ๋๋ค.delta_data.zip
ํ์ผ ์์ ์๋ ๋ ๊ฐ์ง CSV ํ์ผ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ์์ต๋๋ค.๐ delta_data.zip
/workspace/spark
๋๋ ํ ๋ฆฌ๊ฐ ๋งคํ๋๋๋ก ์ค์ ํฉ๋๋ค.docker run -d \
--name pyspark \
-p 8888:8888 \
-p 4040:4040 \
-v [์ฌ์ฉ์ ๋๋ ํ ๋ฆฌ]:/workspace/spark \
hyunsoolee0506/pyspark-cloud:3.5.1
8888
ํฌํธ๋ก ์ ์ํ๋ฉด juypter lab ๊ฐ๋ฐ ํ๊ฒฝ์ผ๋ก ๋ค์ด์ฌ ์ ์์ต๋๋ค.hyunsoolee0506/pyspark-cloud:3.5.1
์ด๋ฏธ์ง์๋ ์ด๋ฏธ ์ค์น๋์ด ์์ง๋ง colab์ ๊ฒฝ์ฐ ์๋ ์ฝ๋ ์คํ์ ํตํด ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค์ ์ค์นํด์ผ ํฉ๋๋ค.pip install pyspark==3.5.1 delta-spark==3.2.0 pyarrow findspark
from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
builder = SparkSession.builder.appName("DeltaLakeLocal") \
.enableHiveSupport() \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
deltalake_db
๋ผ๋ ์ด๋ฆ์ ์๋ก์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ์์ฑํฉ๋๋ค.spark.sql("CREATE DATABASE IF NOT EXISTS deltalake_db")
spark.sql("SHOW DATABASES").show()
---
+------------+
| namespace|
+------------+
| default|
|deltalake_db|
+------------+
trainer_data.csv
ํ์ผ์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋ trainer
ํ
์ด๋ธ์ ์์ฑํฉ๋๋ค.- ํ
์ด๋ธ ์ด๋ฆ : trainer
- ์คํค๋ง :
- id โ INT
- name โ STRING
- age โ INT
- hometown โ STRING
- prefer_type โ STRING
- badge_count โ INT
- level โ STRING
query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING csv
OPTIONS (
path '[trainer_data.csv ํ์ผ ๊ฒฝ๋ก]',
header 'true',
inferSchema 'true',
delimiter ','
)
"""
spark.sql(query)
# ํ
์ด๋ธ ์์ฑ ํ์ธ
spark.sql("SHOW TABLES FROM deltalake_db").show()
---
+------------+---------+-----------+
| namespace|tableName|isTemporary|
+------------+---------+-----------+
|deltalake_db| trainer| false|
+------------+---------+-----------+
csv
ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๋ก delta
์ ํ์ ํ
์ด๋ธ ์์ฑ๊ณผ ๋์์ ๋ฃ์ ์๋ ์๊ธฐ ๋๋ฌธ์ ์๋์ ๊ฐ์ด ๋ ๋จ๊ณ๋ฅผ ๊ฑฐ์ณ delta
ํ
์ด๋ธ์ ์์ฑํ์ฌ์ผ ํฉ๋๋ค.delta
์ ํ ๋น ํ
์ด๋ธ ์์ฑdelta
ํ
์ด๋ธ์ csv
ํ
์ด๋ธ ๋ฐ์ดํฐ ์ฝ์
trainer_delta
๋ผ๋ ์ด๋ฆ์ delta
ํ
์ด๋ธ์ ์์ฑํฉ๋๋ค./workspace/spark/deltalake/delta_local/trainer_delta/
ํด๋น ๊ฒฝ๋ก ์๋์ delta
ํ
์ด๋ธ ๊ด๋ จ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋๋ก ์ค์ ํ์์ต๋๋ค.query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING delta
LOCATION '/workspace/spark/deltalake/delta_local/trainer_delta/'
"""
spark.sql(query)
trainer
ํ
์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ trainer_delta
ํ
์ด๋ธ์ ์ฝ์
ํฉ๋๋ค.query = """
INSERT INTO deltalake_db.trainer_delta
SELECT * FROM deltalake_db.trainer;
"""
spark.sql(query)
_delta_log/
ํด๋์ parquet ํ์ผ์ด ์๋กญ๊ฒ ์์ฑ์ด ๋ ๊ฒ์ ํ์ธํด๋ณผ ์ ์์ต๋๋ค.LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta'
df = spark.read.format("delta").load(LOCAL_DELTA_PATH)
df.show(5)
---
+---+------+---+--------+-----------+-----------+------------+
| id| name|age|hometown|prefer_type|badge_count| level|
+---+------+---+--------+-----------+-----------+------------+
| 1| Brian| 28| Seoul| Electric| 8| Master|
| 3| Susan| 18| Gwangju| Rock| 7| Expert|
| 6| Vicki| 17| Daejeon| Ice| 4|Intermediate|
| 9|Olivia| 45| Incheon| Psychic| 3|Intermediate|
| 10| Mark| 16| Gangwon| Fire| 4|Intermediate|
+---+------+---+--------+-----------+-----------+------------+
only showing top 5 rows
delta.
์ผ๋ก ์ฝ๊ธฐquery = f"SELECT * FROM delta.`{LOCAL_DELTA_PATH}`"
spark.sql(query)
spark.table('deltalake_db.trainer_delta')
# Beginner ์ ์ธํ dataframe ์์ฑ
df_1 = df.filter(F.col('level') != 'Beginner')
# ๊ธฐ์กด ๊ฒฝ๋ก์ ๋ฎ์ด์ฐ๊ธฐ
df_1.write \
.format('delta') \
.mode('overwrite') \
.save(LOCAL_DELTA_PATH)
# ๋ฐ์ดํฐ ํ์ธ
df = spark.read.format("delta").load(LOCAL_DELTA_PATH)
df.select('level').distinct().show()
---
+------------+
| level|
+------------+
| Expert|
| Advanced|
| Master|
|Intermediate|
+------------+
# Advanced ์ ์ธํ dataframe ์์ฑ
df_2 = df_1.filter(F.col('level') != 'Advanced')
# ๊ธฐ์กด ๊ฒฝ๋ก์ ๋ฎ์ด์ฐ๊ธฐ
df_2.write \
.format('delta') \
.mode('overwrite') \
.save(LOCAL_DELTA_PATH)
# ๋ฐ์ดํฐ ํ์ธ
df = spark.read.format("delta").load(LOCAL_DELTA_PATH)
df.select('level').distinct().show()
---
+------------+
| level|
+------------+
| Expert|
| Master|
|Intermediate|
+------------+
_delta_log/
ํด๋ ๋ด์ ๋ฉํ๋ฐ์ดํฐ(.json
ํ์ผ) ์ญ์ ์ถ๊ฐ๋๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.delta
ํ
์ด๋ธ์ ๋ํ ๋ณ๊ฒฝ ์ด๋ ฅ์ ์กฐํํฉ๋๋ค.trainer_delta
ํ
์ด๋ธ ์์ฑ ์ํ, ๋ฐ์ดํฐ Xtrainer
ํ
์ด๋ธ์์ ๋ฐ์ดํฐ ์ฝ์
๋ ์ต์ด ์ํquery = "DESCRIBE HISTORY deltalake_db.trainer_delta"
spark.sql(query).show(vertical=True, truncate=False)
---
-RECORD 0--------------------------------------------------------------------------------------------------------------
version | 3
timestamp | 2025-03-21 02:09:39.085
userId | NULL
userName | NULL
operation | WRITE
operationParameters | {mode -> Overwrite, partitionBy -> []}
job | NULL
notebook | NULL
clusterId | NULL
readVersion | 2
isolationLevel | Serializable
isBlindAppend | false
operationMetrics | {numFiles -> 1, numOutputRows -> 42, numOutputBytes -> 3125}
userMetadata | NULL
engineInfo | Apache-Spark/3.5.1 Delta-Lake/3.2.0
-RECORD 1--------------------------------------------------------------------------------------------------------------
version | 2
timestamp | 2025-03-21 02:08:32.646
userId | NULL
userName | NULL
operation | WRITE
operationParameters | {mode -> Overwrite, partitionBy -> []}
job | NULL
notebook | NULL
clusterId | NULL
readVersion | 1
isolationLevel | Serializable
isBlindAppend | false
operationMetrics | {numFiles -> 1, numOutputRows -> 85, numOutputBytes -> 3868}
userMetadata | NULL
engineInfo | Apache-Spark/3.5.1 Delta-Lake/3.2.0
-RECORD 2--------------------------------------------------------------------------------------------------------------
version | 1
timestamp | 2025-03-21 01:46:21.446
userId | NULL
userName | NULL
operation | WRITE
operationParameters | {mode -> Append, partitionBy -> []}
job | NULL
notebook | NULL
clusterId | NULL
readVersion | 0
isolationLevel | Serializable
isBlindAppend | true
operationMetrics | {numFiles -> 1, numOutputRows -> 90, numOutputBytes -> 3980}
userMetadata | NULL
engineInfo | Apache-Spark/3.5.1 Delta-Lake/3.2.0
-RECORD 3--------------------------------------------------------------------------------------------------------------
version | 0
timestamp | 2025-03-21 01:43:25.471
userId | NULL
userName | NULL
operation | CREATE TABLE
operationParameters | {partitionBy -> [], clusterBy -> [], description -> NULL, isManaged -> false, properties -> {}}
job | NULL
notebook | NULL
clusterId | NULL
readVersion | NULL
isolationLevel | Serializable
isBlindAppend | true
operationMetrics | {}
userMetadata | NULL
engineInfo | Apache-Spark/3.5.1 Delta-Lake/3.2.0
๐ ์ต์ด ๋ฒ์ (version 0) ํ ์ด๋ธ ๋ถ๋ฌ์ค๊ธฐ
df_pre = spark.read \
.format("delta") \
.option("versionAsof", 0) \
.load(LOCAL_DELTA_PATH)
df_pre.select('level').distinct().show()
---
+-----+
|level|
+-----+
+-----+
๐ version 2 ํ ์ด๋ธ ๋ถ๋ฌ์ค๊ธฐ
df_pre = spark.read \
.format("delta") \
.option("versionAsof", 2) \
.load(LOCAL_DELTA_PATH)
df_pre.select('level').distinct().show()
---
+------------+
| level|
+------------+
| Expert|
| Advanced|
| Master|
|Intermediate|
+------------+
๐ SQL๋ก Time Travel ์ฟผ๋ฆฌํ๊ธฐ
df_pre = spark.sql("SELECT * FROM deltalake_db.trainer_delta VERSION AS OF 3")
df_pre.select('level').distinct().show()
---
+------------+
| level|
+------------+
| Expert|
| Master|
|Intermediate|
+------------+
๐ ์ง์ ํ ์๊ฐ๋์ ํ ์ด๋ธ ์ํ ๋ถ๋ฌ์ค๊ธฐ
TABLE_TIMESTAMP = "2025-03-21T02:09:00"
spark.read.format("delta") \
.option("timestampAsOf", TABLE_TIMESTAMP) \
.table("deltalake_db.trainer_delta")
๐ SQL๋ก ์ง์ ์๊ฐ๋์ ํ ์ด๋ธ ๋ถ๋ฌ์ค๊ธฐ
TABLE_TIMESTAMP = "2025-03-21T02:09:00"
spark.sql(f"SELECT * FROM deltALake_db.trainer_delta TIMESTAMP AS OF '{TABLE_TIMESTAMP}'")
- ๊ธฐ์กด ์ปฌ๋ผ : ['id', 'name', 'age', 'hometown', 'prefer_type', 'badge_count', 'level']
- ๋ณ๊ฒฝ๋ ํ
์ด๋ธ ์ปฌ๋ผ : ['id', 'name', 'age', 'hometown', 'prefer_type', 'badge_count', 'level', 'dummy_col']
๐ 'dummy_col' ์ด๋ผ๋ ์ปฌ๋ผ์ด ์ถ๊ฐ๋์ด ์คํค๋ง๊ฐ ๋ณ๊ฒฝ๋ ํ
์ด๋ธ ๋ฎ์ด์ฐ๊ธฐ
LOCAL_DELTA_PATH = '/workspace/spark/deltalake/delta_local/trainer_delta'
# ํ
์ด๋ธ ๋ถ๋ฌ์ค๊ธฐ
df = spark.table("deltalake_db.trainer_delta")
# 'dummy_col' ์ปฌ๋ผ ์ถ๊ฐ
df_diff = df.withColumn('dummy_col', F.lit(1))
# ์คํค๋ง ํฉ์น๊ธฐ ์๋
df_diff.write \
.format('delta') \
.mode('overwrite') \
.save(LOCAL_DELTA_PATH)
# ๋ฎ์ด์ฐ๋ ค๋ ํ
์ด๋ธ์ ์คํค๋ง๊ฐ ๋ฌ๋ผ ์๋์ ์๋ฌ ๋ฐ์
# ๐๐๐๐๐
---
AnalysisException: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 31dbae5e-d042-467b-9454-e483fdad97bb).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- age: integer (nullable = true)
-- hometown: string (nullable = true)
-- prefer_type: string (nullable = true)
-- badge_count: integer (nullable = true)
-- level: string (nullable = true)
Data schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- age: integer (nullable = true)
-- hometown: string (nullable = true)
-- prefer_type: string (nullable = true)
-- badge_count: integer (nullable = true)
-- level: string (nullable = true)
-- dummy_col: integer (nullable = true)
To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.
Note that the schema can't be overwritten when using
'replaceWhere'.
option("mergeSchema", "true")
์ต์
์ ์ถ๊ฐํด์ฃผ์ด์ผ ํฉ๋๋ค.df_diff.write \
.format('delta') \
.mode('overwrite') \
.option("mergeSchema", "true") \
.save(LOCAL_DELTA_PATH)
https://docs.databricks.com/aws/en/sql/language-manual/delta-optimize
Delta ํ ์ด๋ธ์ ๊ธฐ๋ณธ์ ์ผ๋ก ๊ณ์ ํ์ผ์ด ์ ์ฌ๋๋ ํ์์ด๊ธฐ ๋๋ฌธ์ ์๊ฐ์ด ์ง๋จ์ ๋ฐ๋ผ ์์ ํ์ผ๋ค์ด ๋ง์ด ์๊ธฐ๊ฒ ๋ฉ๋๋ค. ์ด๋ ๊ฒ ๋๋ฉด ์ฟผ๋ฆฌ ์ฑ๋ฅ ์ ํ์ ์ฝ๊ธฐ ์ค๋ฒํค๋ ์ฆ๊ฐ๊ฐ ๋ฐ์ํฉ๋๋ค. ์ด ๋ OPTIMIZE๋ฅผ ํตํด ๋ฐ์ดํฐ๋ฅผ ํฐ ํ์ผ๋ก ๋ณํฉํ์ฌ ์ฑ๋ฅ์ ํฅ์์ํฌ ์ ์์ต๋๋ค.
์ต์ ํ ๋ฐฉ์ | ์ค๋ช |
---|---|
๊ธฐ๋ณธ Optimize | ์์ ํ์ผ ๋ณํฉ, ์ฝ๊ธฐ ์ฑ๋ฅ ํฅ์ |
Z-Ordering | ์์ฃผ ํํฐ๋งํ๋ ์ปฌ๋ผ ๊ธฐ์ค ์ ๋ ฌ โ ์ค์บ ์ค์ฌ ์ฟผ๋ฆฌ ์ฑ๋ฅ ํฅ์ |
ํํฐ์ ๊ธฐ๋ฐ Optimize | ํน์ ๋ ์ง/์ง์ญ ๋ฑ ์์ฃผ ์กฐํ๋๋ ํํฐ์ ๋ง ์ ํ์ ์ต์ ํ |
query = "OPTIMIZE deltalake_db.trainer_delta"
spark.sql(query)
query = """
OPTIMIZE deltalake_db.trainer_delta
ZORDER BY (trainer_id, region)
"""
spark.sql(query)
query = """
OPTIMIZE deltalake_db.trainer_delta
WHERE level = 'Master'
"""
spark.sql(query)
delta
์ ํ์ ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ ๊ณผ๊ฑฐ์ ํ์ผ ๋ฐ์ดํฐ๋ฅผ ์ง์ธ ๋๋ ๋๋ ํ ๋ฆฌ์์ ์ง์ ์ญ์ ํ๋ฉด ์๋๊ณ VACUUM
๋ช
๋ น์ ํตํด ์ง์์ผ ํ
์ด๋ธ์ ์ ํฉ์ฑ์ ํด์น์ง ์๊ณ ์ดํ์๋ ์ํํ ์์
์ด ๊ฐ๋ฅํด์ง๋๋ค.VACUUM
์์
๋ฐ์์ ์ญ์ ๋ ๋ ์ง ์ด์ ์ผ๋ก๋ Time Travel ํ์ฌ ์กฐํํ๋ ๊ฒ์ด ๋ถ๊ฐ๋ฅํด์ง๋๋ค.# ์ค์ ํ์ธ
spark.conf.get("spark.databricks.delta.retentionDurationCheck.enabled")
-> 'true'
# ์ ์ง ๊ธฐ๊ฐ ์ค์ ํด์
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
VACUUM
๋ช
๋ น์ ์ํํฉ๋๋ค.DRY RUN
์ต์
์ ์ค์ ๋ก ์์
์ ๋์ง ์๋๋ก ํ๋ ์ค์ ์
๋๋ค.# ๊ธฐ๋ณธ VACUUM ๋ช
๋ น (168์๊ฐ ์ด์ ์ ํ์ผ ์ญ์ )
spark.sql("VACUUM deltalake_db.trainer_delta").show(truncate=False)
# ํ์ฌ ๋ฒ์ ์ด์ ์ ํ์ผ๋ค ์ญ์
spark.sql("VACUUM deltalake_db.trainer_delta RETAIN 0 HOURS DRY RUN").show(truncate=False)
# 2์ผ ์ด์ ์ ํ์ผ๋ค ์ญ์
spark.sql("VACUUM deltalake_db.trainer_delta RETAIN 2 DAYS DRY RUN").show(truncate=False)
delta
์ ํ ํ
์ด๋ธ ์์ฑ์ ๊ธฐ๋ณธ retention ๊ธฐ๊ฐ์ ์ค์ ํฉ๋๋ค.query = f"""
CREATE TABLE IF NOT EXISTS deltalake_db.trainer_delta_2 (
id INT,
name STRING,
age INT,
hometown STRING,
prefer_type STRING,
badge_count INT,
level STRING
)
USING delta
LOCATION '/workspace/spark/deltalake/delta_local/trainer_delta_2/'
TBLPROPERTIES ('delta.deletedFileRetentionDuration' = 'interval 2 days');
"""
spark.sql(query)
parquet
ํํ๋ก ์ ์ฅ๋์ด ์๋ ๋ฐ์ดํฐ๋ฅผ delta
์ ํ์ ํ
์ด๋ธ ๋ฐ์ดํฐ๋ก ๋ณํํ๋ ๊ธฐ๋ฅ์
๋๋ค.๐ fish_data.csv
๋ฐ์ดํฐ๋ฅผ parquet์ผ๋ก ์ ์ฅํฉ๋๋ค.
# csv ํ์ผ ์ฝ์ด์ค๊ธฐ
fish = spark.read.option('header', 'true').csv('fish_data.csv')
# ๋ก์ปฌ ๋๋ ํ ๋ฆฌ ์ ์ฅ + Catalog ์ ์ฅ
fish.write \
.mode('overwrite') \
.format('parquet') \
.option('path', '/workspace/spark/deltalake/delta_local/fish_parquet/') \
.saveAsTable('deltalake_db.fish_parquet')
๐ parquet
์ผ๋ก ์ ์ฅ๋์ด ์๋ ๋ฐ์ดํฐ๋ฅผ delta
๋ก ๋ณํ
query = """
CONVERT TO DELTA
parquet.`/workspace/spark/deltalake/delta_local/fish_parquet/`
"""
spark.sql(query)
๐ Species
์ปฌ๋ผ์ผ๋ก ํํฐ์
๋ parquet ๋ฐ์ดํฐ ์ฐ๊ธฐ
fish_df.write.mode('overwrite')\
.format('parquet') \
.partitionBy('Species') \
.option('path', '/workspace/spark/deltalake/delta_local/fish_parquet_partitioned/') \
.saveAsTable('deltalake_db.fish_parquet_partitioned')
๐ ํํฐ์
๋ parquet ๋ฐ์ดํฐ๋ฅผ delta
๋ก ๋ณํ
query = """
CONVERT TO DELTA
parquet.`/workspace/spark/deltalake/delta_local/fish_parquet_partitioned/`
PARTITIONED BY (Species STRING)
"""
spark.sql(query)
์๋ ํ์ธ์ ์ข์ ๊ธ ์๋ดค์ต๋๋ค. ์ง๋ฌธ์ด ํ๋ ์๋๋ฐ, ์คํค๋ง๋ฅผ ๊ฐ์ ๋ณํฉํ๋ ๊ฒ ๊น์ง๋ ์ดํดํ๋๋ฐ, ๊ธฐ์กด ๋ฐ์ดํฐ๋ฅผ ๋ณด์กดํ๋ฉด์ migration ํ ๋๋ ์ด ๋ฐฉ์์ด ๊ฐ๋ฅํ๊ฑด๊ฐ์?