[Apache Spark] 성능 튜닝

연수·2021년 12월 10일
0

spark

목록 보기
23/26

잡의 실행 속도를 높이기 위한 성능 튜닝 방법을 알아보자!

  • 간접적인 성능 향상 → 전체 스파크 애플리케이션이나 스파크 잡에 영향을 미치게 된다.
  • 직접적인 성능 향상 → 개별 스파크 잡, 스테이지, 태스크, 코드 등 특정 영역에만 영향을 주므로 전체 스파크 애플리케이션이나 스파크 잡에는 영향을 미치지 않는다.

 

🧚 간접적인 성능 향상 기법

🏢 설계 방안

  • 좋은 설계 방안으로 애플리케이션을 설계해야 한다.

👉 스칼라 vs 자바 vs 파이썬 vs R

  • 사용자의 환경에 따라 언어를 달리 선택할 수 있다.
  • 단일 머신 머신러닝 → R
  • RDD 트랜스포메이션, UDF 사용하는 경우 → R과 파이썬은 지양
  • 사용자 정의 함수를 정의하면 데이터 타입과 처리 과정을 엄격하게 보장하기 어럽기 때문에, 애플리케이션 주 언어로 파이썬을 사용하고 일부를 스칼라로 변경하거나 스칼라 UDF를 정의해 사용하는 것이 좋다. (사용성, 유지 관리성, 성능 향상)

👉 DataFrame vs SQL vs Dataset vs RDD

  • DataFrame, Dataset, SQL의 속도는 동일하다.
  • 근본적으로 성능을 개선하고 싶다면 UDF 대신 DataFrame이나 SQL을 사용해야 한다. (스파크의 최적화 엔진이 더 '나은' RDD 코드를 만들어낸다.)
  • RDD를 사용하려면 스칼라나 자바를 사용하는 것이 좋다. 파이썬을 사용하면 많은 데이터를 직렬화해야 하는데, 매우 큰 데이터를 직렬화하려면 엄청난 비용이 발생하고 안전성이 떨어진다.

 

🔗 RDD 객체 직렬화

  • Kryo를 이용해 직접 정의한 데이터 타입을 직렬화할 수 있다.

 

🏔️ 클러스터 설정

  • 일반적으로 머신 자체의 성능을 모니터링하는 것이 클러스터 설정을 최적화하는 데 가장 많은 도움이 된다.
  • 클러스터/애플리케이션 규모 산정과 자원 공유 (자원 공유와 스케줄링)
  • 동적 할당

 

⏰ 스케줄링

  • 스케줄러 풀, 동적 할당, max-executor-cores 설정 등의 방법을 스파크 애플리케이션의 병렬 실행에 활용할 수 있다.
  • spark.scheduler.mode 값을 FAIR로 설정
  • --max-executor-cores 인수를 사용해 익스큐터의 코어 수를 조절
  • 클러스터 매니저에 따라 spark.cores.max 값을 설정해 기본값을 변경할 수 있다.

 

🪴 보관용 데이터

적절한 저장소 시스템과 데이터 포맷을 선택해야 한다.

🍄 파일 기반 장기 데이터 저장소

  • CSV 파일, 바이너리 blob 파일, 아파치 파케이 등 다양한 포맷 사용 가능
  • 바이너리 형태 → 구조적 API 사용, 저장된 데이터에 자주 접근하는 경우
  • CSV → 파싱 속도가 느리고 예외 상황이 자주 발생
  • 파케이 → 가장 효율적. 데이터를 바이너리 파일에 컬럼 지향 방식으로 저장한다. 쿼리에서 사용하지 않는 데이터를 빠르게 건너뛸 수 있도록 몇 가지 통계를 함께 저장한다. 스파크는 파케이와 잘 호환된다.

🤐 분할 가능한 파일 포맷과 압축

  • 분할 가능한 포맷을 사용하면 여러 태스크가 파일의 서로 다른 부분을 동시에 읽을 수 있다. → 병렬성 증가
  • ZIP이나 TAR 압축 파일은 분할 불가능. 반면 하둡이나 스파크 같은 병렬 처리 프레임워크는 gzip, bzip2 또는 lz4를 이용해 압축된 파일이라면 분할할 수 있다.

🏓 테이블 파티셔닝

  • 데이터의 날짜 필드 같은 키를 기준으로 개별 디렉터리에 파일을 저장하는 것
  • 키를 기준으로 데이터가 분할되었고 특정 범위의 데이터만 필요하다면 스파크는 관련 없는 데이터 파일을 건너뛸 수 있다.
  • 파티셔닝을 통해 쿼리에서 읽어야 하는 데이터양을 줄여 빠른 처리가 가능하다.
  • 너무 작은 단위로 분할하면 전체 파일의 목록을 읽을 때 오버헤드가 발생한다.

🪣 버켓팅

  • 데이터를 버켓팅하면 스파크가 조인이나 집계를 수행하는 방식에 따라 데이터를 사전 분할할 수 있다. (pre-partition)
  • 데이터를 한두 개 파티션에 치우치지 않고 전체 파티션에 균등하게 분산시킬 수 있다. → 성능과 안정성 향상
  • 주로 파티셔닝과 함께 사용된다.

📁 파일 수

  • 데이터를 파티션이나 버켓으로 구성하려면 파일 수와 저장하려는 파일 크기도 고려해야 한다.
  • 작은 파일이 많은 경우 파일 목록 조회와 파일 읽기 과정에서 부하(네트워크와 잡의 스케줄링 부하)가 발생한다. 반면 적은 수의 대용량 파일이 있다면 태스크 수행 시간이 길어진다. (trade-off)
  • 입력 데이터 파일이 최소 수십 메가바이트의 데이터를 갖도록 파일의 크기를 조정하는 것이 좋다.

🤠 데이터의 지역성

  • 네트워크를 통해 데이터 블록을 교환하지 않고 특정 데이터를 가진 노드에서 동작할 수 있도록 지정하는 것을 의미한다.
  • 저장소 시스템이 스파크와 동일한 노드에 있고 해당 시스템이 데이터 지역성 정보를 제공한다면 스파크는 입력 데이터 블록과 최대한 가까운 노드에 태스크를 할당하려 한다.

📉 통계 수집

  • 스파크의 구조적 API를 사용하면 비용 기반 쿼리 옵티마이저가 내부적으로 동작한다.
  • 비용 기반 옵티마이저가 작동하려면 사용 가능한 테이블과 관련된 통계를 수집 및 유지해야 한다.
  • 통계 → 테이블 수준, 컬럼 수준
  • 통계는 조인, 집계, 필터링 등에서 도움이 된다.

 

🔀 셔플 설정

  • 스파크의 외부 셔플 서비스를 설정하면 머신에서 실행되는 익스큐터가 바쁜 상황에서도 원격 머신에서 셔플 데이터를 읽을 수 있으므로 성능을 높일 수 있다.
  • 그러나 코드가 복잡하고 유지가 어려워 운영 환경에 적합하지 않을 수도 있다.
  • 셔플의 파티션 수는 모든 스파크 잡에서 매우 중요하다. 파티션 수가 너무 적으면 소수의 노드만 작업을 수행하기 때문에 데이터 치우침 현상이 발생한다. 반면 파티션 수가 너무 많으면 파티션을 처리하기 위한 태스크를 많이 실행해야 하므로 부하가 발생한다.
  • 파티션당 최소 수십 메가바이트의 데이터가 포함되는 것이 적당하다.

 

📝 메모리 부족과 가비지 컬렉션

❤️‍🩹 메모리 부족 원인

  1. 애플리케이션 실행 중 메모리를 너무 많이 사용

  2. 가비지 컬렉션이 자주 수행됨

  3. JVM 내에서 객체가 너무 많이 생성되어 더 이상 사용하지 않는 객체를 가비지 컬렉션이 정리하며 실행 속도가 느려짐

    → 구조적 API를 사용하면 스파크 잡의 효율성이 높아지고 JVM 객체를 전혀 생성하지 않으므로 해결 가능! 스파크 SQL은 내부 포맷으로 연산을 수행하기 때문에 메모리 압박을 크게 줄일 수 있다.

🧄 가비지 컬렉션 튜닝

  • JVM 메모리 관리
    • 자바 힙 공간은 Young 영역과 Old 영역으로 나누어진다. Young 영역은 수명이 짧은 객체를 유지한다. 반면 Old 영역은 오래 살아 있는 객체를 대상으로 한다.
    • Young 영역은 Eden, Survivor1, Survivor2 세 영역으로 다시 나뉜다.
  • 가비지 컬렉션 수행 절차
    1. Eden 영역이 가득 차면 Eden 영역에 대한 마이너 가비지 컬렉션이 실행된다. Eden 영역에서 살아남은 객체와 Survivor1의 객체는 Survivor2 영역으로 복제된다.
    2. 두 Survivor 영역을 교체한다.
    3. 객체가 아주 오래되었거나 Survivor2 영역이 가득 차면 객체는 Old 영역으로 옮겨진다.
    4. Old 영억이 거의 가득 차면 풀 가비지 컬렉션이 발생한다. 풀 가비지 컬렉션은 힙 공간의 모든 객체를 추적해 참조 정보가 없는 객체들을 제고한다. 그리고 나머지 객체를 빈 곳으로 옮기는 작업을 수행한다. (가장 느린 가비지 컬렉션 연산이다!!!)
  • 가비지 컬렉션 튜닝의 목표
    1. 수명이 긴 캐시 데이터셋을 Old 영역에 저장

    2. Young 영역에 수명이 짧은 모든 객체를 저장

      → 풀 가비지 컬렉션을 피한다!

  • 가비지 컬렉션 튜닝
    • 가비지 컬렉션 통계를 수집해 가비지 컬렉션의 발생 빈도를 확인한다.
    • 태스크가 완료되기 전에 풀 가비지 컬렉션이 자주 발생하면 태스크를 처리하기 위한 메모리가 부족하기 때문에 캐싱에 사용되는 메모리 양을 줄여야 한다.
    • 마이너 가비지 컬렉션은 많이 발생하지만 메이저 가비지 컬렉션이 자주 발생하지 않는다면 Eden 영역에 메모리를 더 할당한다.
    • HDFS를 사용한다면 태스크에서 사용할 메모리양은 HDFS에서 읽을 데이터 블록 크기로 추정할 수 있다.
    • G1GC 가비지 컬렉터 → 가비지 컬렉션이 병목 현상을 일으키고 각 영역의 크기를 늘려도 더 이상 부하를 줄일 수 없는 상황에서 성능을 높일 수 있다.
    • 풀 가비지 컬렉션의 발생 빈도를 관리하는 것이 가장 중요

 

🧚‍♀️ 직접적인 성능 향상 기법

🤲 병렬화

  • 병렬성을 높여 특정 스테이지의 처리 속도를 높인다.
  • spark.default.parallelism, spark.sql.shuffle.partitions 값을 클러스터 코어 수에 따라 설정
  • 스테이지에서 처리해야 할 데이터 양이 매우 많다면 클러스터의 CPU 코어당 최서 2~3개의 태스크를 할당한다.

 

🐡 향샹된 필터링

  • 스파크 잡에서 데이터 필터링 과정을 가장 먼저 수행한다.
  • 상황에 따라 데이터소스에 필터링을 위임하여 결과와 무관한 데이터를 스파크에서 읽지 않고 작업을 진행한다.
  • 최대한 이른 시점에 많은 양의 데이터를 필터링해 스파크 잡의 수행 속도를 높인다.

 

🎉 파티션 재분배와 병합

  • 파티션 재분배 과정은 셔플을 수반하지만, 클러스터 전체에 걸쳐 데이터가 균등하게 분배되므로 잡의 전체 실행 단계를 최적화할 수 있다.
  • 셔플 대신 노드의 파티션을 하나로 합치는 coalesce 메서드를 실행해 전체 파티션 수를 먼저 줄인다.
  • repartition 메서드는 부하를 분산하기 위해 네트워크로 데이터를 셔플링한다.
  • 파티션 재분배는 조인이나 cache 메서드 호출 시 매우 유용하다.
  • 파티션 재분배는 부하를 유발하지만 애플리케이션의 전체적인 성능과 잡의 병렬성을 높일 수 있다.

⁉️ 사용자 정의 파티셔닝

  • 잡이 여전히 느리게 동작하거나 불안정하다면 RDD를 이용한 사용자 정의 파티셔닝 기법을 적용해 DataFrame보다 더 정밀한 수준으로 클러스터 전반의 데이터 체계를 제어한다.

 

❌ 사용자 정의 함수 (UDF)

  • UDF 사용을 최대한 피한다.
  • UDF는 데이터를 JVM 객체로 변환하고 쿼리에서 레코드 당 여러 번 수행되므로 많은 자원을 소모한다.

 

😼 임시 데이터 저장소 (캐싱)

  • 캐싱은 클러스터의 익스큐터 전반에 걸쳐 만들어진 임시 저장소(메모리나 디스크)에 DataFrame, 테이블, 또는 RDD를 보관해 빠르게 접근할 수 있도록 한다.
  • 캐싱은 지연 연산이기 때문에 데이터에 접근해야 캐싱이 일어난다. 액션을 실행하는 시점에 데이터를 캐싱한다.
  • RDD는 물리적 데이터를 캐시에 저장하는 반면, 구조적 API 캐싱은 물리적 실행 계획을 기반으로 이루어진다.
  • 스파크의 cache 명령은 기본적으로 데이터를 메모리에 저장한다.
  • 예) 초기 DataFrame에서 CSV 파일을 읽어들인 다음 트랜스포메이션을 사용해 여러 신규 DataFrame을 만들어낸다. 이 과정에서 초기 DataFrame을 캐싱하는 코드를 추가해 반복적으로 재연산하는 과정을 피한다.

 

🍟 조인

  • 동등 조인은 최적화하기 쉬우므로 우선적으로 사용하는 것이 좋다.
  • 조인 순서 변경은 내부 조인을 사용해 필터링하는 것과 동일한 효과를 누릴 수 있다.
  • 브로드캐스트 조인 힌트를 사용하면 스파크가 쿼리 실행 계획을 생성할 때 지능적으로 계획을 세울 수 있다.
  • 안정성과 최적화를 위해 카테시안 조인이나 전체 외부 조인 사용은 최대한 피해야 한다.
  • 테이블 통계와 버켓팅은 조인에 상당한 영향을 미친다.

 

🦞 집계

  • 집계 전 충분히 많은 수의 파티션을 가질 수 있도록 데이터를 필터링하는 것이 최선의 방법이다.
  • RDD를 사용해 집계 수행 방식을 정확하게 제어하고 코드의 성능과 안정성을 개선한다.
  • 가능하면 groupByKey 대신 reduceByKey 사용!

 

🎙️ 브로드캐스트 변수

  • 다수의 UDF에서 큰 데이터 조각을 사용한다면 이를 개별 노드에 전송해 읽기 전용 복사본으로 저장하여 잡마다 데이터 조각을 재전송하는 과정을 생략한다.
  • 브로드캐스트 변수는 룩업 테이블이나 머신러닝 모델을 저장하는 데 사용할 수 있다.
  • SparkContext를 이용해 브로드캐스트 변수를 생성하고 임의의 객체를 브로드캐스트한 다음 태스크에서 참조하게 만들 수 있다.

❤️‍🔥 최적화 우선순위

  1. 파티셔닝과 효율적인 바이너리 포맷을 사용해 가능하면 작은 데이터를 읽는다.
  2. 충분한 병렬성을 보장하고 파티셔닝을 사용해 데이터 치우침(skew) 현상을 방지한다.
  3. 최적화된 코드를 제공하는 구조적 API를 최대한 활용한다.
    +스파크 잡에서 적합한 연산을 사용하고 있는지 확인한다.

 

[출처] 스파크 완벽 가이드 (빌 체임버스, 마테이 자하리아 지음)

profile
DCDI

0개의 댓글