[Spark] Pyspark 로 Hive 연동시 시행착오

Woong·2025년 4월 15일
0

Apache Spark

목록 보기
26/26

spark sql 로 테이블 로딩시 parquet 파일 수가 많아 OOM 발생

java.lang.OutOfMemoryError: GC overhead limit exceeded
  • spark.table(..), spark.sql(..) 로 테이블 접근만 했는데 GC OOM 발생한 상황
    • InMemoryFileIndex 단계에서 OOM 발생
    • Spark 는 디렉토리 하위 모든 Parquet 파일을 스캔 시도
%jdbc(hive)
DESCRIBE EXTENDED <db_name>.<table_name> PARTITION (base_dt='2025-04-09')
  • 위 hive 쿼리로 Detailed Partition Information 을 통해 partition 확인시, 파일 수가 적지 않았던 상황
    • spark 실행시 동적 스키마 추론 과정에서 Parquet footer를 읽으며 모든 파일의 스키마 병합을 시도, OOM 발생한 것으로 판단
    • -> table 스키마를 정의하는 것으로 schema 추론 단계를 스킵하여 해결
      • spark.table(..), spark.sql(..) 에서는 schema 지정이 불가능하였음.
    • 추가로 basePath 옵션으로 테이블 루트 디렉토리를 명시, 파티션 추론에 힌트 제공
# footer 읽지 않도록 schema 명시
from pyspark.sql.types import StructType, StructField, StringType

base_path = "hdfs://..."
schema = StructType([
    StructField("user_id", StringType()),
    StructField("...", StringType()),
	...
])

df = spark.read.schema(schema).option("basePath", base_path).parquet(f"{base_path}/base_dt={}")
  • 비고) Parquet schema 병합 방지하는 방법도 있으나, 크게 도움되지는 않았음
spark.conf.set("spark.sql.parquet.mergeSchema", "false")

Spark 에서 .schema(...) 명시할 경우 대소문자 구분

  • Parquet 포맷은 실제 컬럼 이름이 file footer에 저장
  • .schema(...)를 사용할 경우 Spark는 footer를 무시하고 지정된 schema 이름으로 직접 매핑 시도
    • spark 에서는 대소문자 구분을 하기 때문에 이때 컬럼 이름이 대소문자가 다르거나 오타가 있으면 해당 컬럼을 찾지 못하고 null 처리
table_schema = StructType([
    StructField("user_id", StringType()),
    StructField("...", StringType()),
    ....
])

spark.read.schema(table_schema).option('basePath', basePath).parquet("...")

# 정확히 일치하도록 schema 정의
StructField("base_dt", StringType()) 

partition key 를 column 으로 가져오기

  • basePath 옵션을 지정하지 않고 partition key 를 포함한 full path 로 접근시, partition key 는 select 를 통해 가져올 수 없었다.

    • spark 에서 partition key 를 포함한 full path 를 루트 디렉토리로 추론하기 때문
  • basePath 옵션으로 테이블 루트 디렉토리를 명시하는 것으로 해결

insertInto 관련 오류

Hive Dynamic Partition 에러

 Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict 
  • Hive 기본 설정은 strict mode로 되어 있음 → 모든 partition column을 동적으로 지정할 경우 insert 에서 에러 발생
  • insertInto() 사용 시 Spark는 partition column을 DataFrame 컬럼 값으로 넣기 때문에
    • dynamic insert로 판단되어 fail
    • nonstrict 로 세션 설정 변경 필요
spark.sql("SET hive.exec.dynamic.partition=true")
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")

partitionByinsertInto 는 같이 사용 불가

  • insertInto 는 파티션 정보가 메타스토어 테이블에 정의되어있을 때 사용하고 이를 따르기 때문
# 오류 발생
df2.write.format('hive').mode("append").partitionBy('base_dt').insertInto(target_table)
  • partition key 는 hive 테이블 생성시 정의하고 spark 단에서는 제거하여 해결

insertInto() column 수, 순서 불일치 문제

The target table has 31 column(s), but the inserted data has 7 column(s)
  • insertInto()는 테이블 스키마와 DataFrame 스키마가 정확히 일치해야한다.
  • 컬럼 수가 다르거나, 순서가 다르면 오류 발생
  • Hive 테이블에 insert하려면 모든 컬럼이 있어야 하고 순서도 일치해야 정상 동작
    • insertInto 는 name-based 가 아닌 순서 position-based (순서 기반) 이기 때문에 타깃 테이블의 컬럼 순서에 맞춰 재정렬 후 insert 해야한다.
    • (saveAsTable 은 name-based 이기 때문에 무관)
# 파티션 컬럼 값이 동적으로 쓸 수 있도록 nonstrict 모드로 설정
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")

# insertInto 는 name 이 아닌 컬럼 순서이므로 타깃 테이블 순서에 맞춰 재정렬
target_schema = spark.table(target_table).schema
target_cols = [field.name for field in target_schema.fields]
df2 = df2.select(*target_cols)

df2.write.format('hive').mode("append").insertInto(target_table)

0개의 댓글