java.lang.OutOfMemoryError: GC overhead limit exceeded
spark.table(..)
, spark.sql(..)
로 테이블 접근만 했는데 GC OOM 발생한 상황InMemoryFileIndex
단계에서 OOM 발생%jdbc(hive)
DESCRIBE EXTENDED <db_name>.<table_name> PARTITION (base_dt='2025-04-09')
Detailed Partition Information
을 통해 partition 확인시, 파일 수가 적지 않았던 상황spark.table(..)
, spark.sql(..)
에서는 schema 지정이 불가능하였음.basePath
옵션으로 테이블 루트 디렉토리를 명시, 파티션 추론에 힌트 제공# footer 읽지 않도록 schema 명시
from pyspark.sql.types import StructType, StructField, StringType
base_path = "hdfs://..."
schema = StructType([
StructField("user_id", StringType()),
StructField("...", StringType()),
...
])
df = spark.read.schema(schema).option("basePath", base_path).parquet(f"{base_path}/base_dt={}")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")
.schema(...)
명시할 경우 대소문자 구분.schema(...)
를 사용할 경우 Spark는 footer를 무시하고 지정된 schema 이름으로 직접 매핑 시도table_schema = StructType([
StructField("user_id", StringType()),
StructField("...", StringType()),
....
])
spark.read.schema(table_schema).option('basePath', basePath).parquet("...")
# 정확히 일치하도록 schema 정의
StructField("base_dt", StringType())
basePath
옵션을 지정하지 않고 partition key 를 포함한 full path 로 접근시, partition key 는 select 를 통해 가져올 수 없었다.
basePath
옵션으로 테이블 루트 디렉토리를 명시하는 것으로 해결
Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
strict mode
로 되어 있음 → 모든 partition column을 동적으로 지정할 경우 insert 에서 에러 발생insertInto()
사용 시 Spark는 partition column을 DataFrame 컬럼 값으로 넣기 때문에nonstrict
로 세션 설정 변경 필요spark.sql("SET hive.exec.dynamic.partition=true")
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")
partitionBy
와 insertInto
는 같이 사용 불가insertInto
는 파티션 정보가 메타스토어 테이블에 정의되어있을 때 사용하고 이를 따르기 때문# 오류 발생
df2.write.format('hive').mode("append").partitionBy('base_dt').insertInto(target_table)
insertInto()
column 수, 순서 불일치 문제The target table has 31 column(s), but the inserted data has 7 column(s)
insertInto()
는 테이블 스키마와 DataFrame 스키마가 정확히 일치해야한다.insert
하려면 모든 컬럼이 있어야 하고 순서도 일치해야 정상 동작insertInto
는 name-based 가 아닌 순서 position-based (순서 기반) 이기 때문에 타깃 테이블의 컬럼 순서에 맞춰 재정렬 후 insert 해야한다.saveAsTable
은 name-based 이기 때문에 무관)# 파티션 컬럼 값이 동적으로 쓸 수 있도록 nonstrict 모드로 설정
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
# insertInto 는 name 이 아닌 컬럼 순서이므로 타깃 테이블 순서에 맞춰 재정렬
target_schema = spark.table(target_table).schema
target_cols = [field.name for field in target_schema.fields]
df2 = df2.select(*target_cols)
df2.write.format('hive').mode("append").insertInto(target_table)