Spark란
Spark는 분산처리 플랫폼이며 computing cluster로 이루어져있다. 이러한 cluster에 분산된 리소스는 YARN, Kubernetes, Mesos와 같은 리소스 매니저에의해 관리되며 대부분 YARN과 Kubernetes를 사용한다. Standalone모드도 지원하여 Local환경에서도 작동이 가능하다.
- YARN과 Kubernetes의 차이는?
요즘 YARN에서 Kubernetes로 넘어가는 추세다. YARN은 Spark와 버전충돌이 일어날 가능성이 있는 반면 K8s를 사용할 경우 Spark application을 컨테이너 환경에서 실행하기 때문에 의존성 관리가 더욱 수월하다. YARN에서는 네트워크를 새로 연결하고 Pod를 띄울 필요가 없어 Overhead가 적다고 할 수 있지만, 모든 Job에 대해서 동일한 Resource를 할당해야한다는 점때문에 비효율적일 수 있다. 또한 K8s 위에서 동작한다는 것은 K8s의 다양한 생태계를 사용할 수 있다는 장점이 있다.
pyspark job을 YARN에게 제출하면?
pyspark job을 YARN에게 제출하면 각 노드에 노드메니저를 통해 적당한 노드에 Application Master 컨테이너를 실행한다. Application Master 컨테이너 내에 Pyspark Driver를 실행하고 JVM application Driver를 실행한다. Pyspark driver와 JVM Driver는 Py4J를 통해 통신한다. 만약 Scala나 Java를 사용하게되면 Pyspark driver는 생성되지 않는다. 이어서 JVM driver는 YARN RM에게 드라이버 실행이 완료되었음을 알리고 YARN RM은 다시 노드 메니저를 통해 executor container를 실행한다. 이때 데이터의 Locality를 고려한다. 완료된 후 YARN RM은 application master container에게 container에 대한 상세 정보를 제출한다. executor에는 데이터 처리를 위해 JVM 어플리케이션을 실행하고 driver 어플리케이션은 각 executor의 실행을 관리/감독한다.
pyspark를 사용할때 udf 사용
pyspark를 사용할때 udf 사용을 지양하는데 그 이유는 executor에 실행되는 JVM 어플리케이션 외부에 Python Worker를 따로 두어 실행하기 때문이다. 이 경우 Python worker와 JVM사이에 데이터를 교환하는 과정에서 serialize/deserialize과정으로 인한 overhead가 발생할 수 있고 이 과정은 Spark이 직접 최적화에 관여하지 않기때문이다.
cluster vs client mode
cluster 모드는 driver가 cluster의 한 노드에서 실행되는 것을 말하며 client mode는 YARN에게 driver application 실행을 요청하지 않고 client machine에 직접 실행하고 YARN RM에게 executor 컨테이너의 실행을 요청하는 방식으로 동작한다. 대부분의 경우 cluster 모드를 사용하게 되는데 그 이유는 driver가 executor와 네트워크적으로 가까워 네트워크 지연이 덜 발생하여 성능측면에서 이점이 있다. 또한 client machine에 의존성이 없다. client mode는 주로 interactive한 테스크를 수행할때 사용한다. spark-shell, spark-sql, notebook. client mode를 사용하게 되면 client machine 의 driver를 죽임으로써 테스크를 쉽게 종료할 수 있다는 장점이 있다.
Stage, Task, Job
Job은 실행단위를 action으로 나눈것을 의미하며 Stage는 Spark cluster wide transformation 단위로 나눈 것을 의미한다. task는 stage내의 작업을 병렬적으로 수행되는 단위로 나눈 것을 의미한다.
action은 spark job을 Trigger하는 역할을 하며 그 예로는 take(), collect()등이 있다. transformation은 job을 Trigger하지 않는 연산이며 transformation은 narrow, wide transformation으로 나뉜다. narrow는 병렬적으로 수행되는 연산을 의미한다.(select,filter..), wide transformation은 suffle & sort에 연관된 연산을 의미한다(groupby, join...)
lazy evaluation
spark을 활용할때 일반적으로 우리가 알고싶은 정보는 action이 실행되고난 후에 얻게된다. Job은 여러 transformation으로 구성되는데 이것은 우리가 얻고자하는 결과로 가는 길을 안내하는 것일 뿐이다. 결국 Spark은 매 transformation을 바로 실행하는 것이 아니라 action이 수행되기까지의 transformation을 분석하여 지름길을 알려준다. 이처럼 바로 연산을 하지 않고 action이 수행되어야 연산하는 것을 lazy evaluation이다.
Spark 실행계획
Spark에는 저수준의 API인 RDD와 고수준의 API인 Dataframe, Dataset가 존재한다. RDD API는 물리적으로 분산된 데이터에 세부적인 제어가 필요할때 사용하게 된다. 하지만 대부분의 경우 DataFrame을 사용하는데 대부분의 경우 데이터에 대한 세부적인 제어를 사용자가 직접할 일이 별로 없으며 Spark SQL Engine이 자동으로 최적화를 해주기 때문에 RDD는 잘 사용하지 않는다. Dataset은 DataFrame에서 type의 안정성이 강화된 버전이다. dataset은 데이터의 타입을 컴파일 타임에 확인하고 dataframe을 사용할 경우 runtime이되어야 알 수 있다.
SparkSQL, DataFrmae을 실행하는 경우
1. unresolved logical plan로 표현되어 Spark SQL Engine에게 전달된다.
2. unresolved logical plan은 Catalog를 통해 parsing되어 코드상에 에러가 있는지 확인한다. 문제가 없다면 Logical plan이 된다.
3. Logical plan에대해 최적화 과정을 거쳐 Optimized Logical plan가 생성된다. 이 단계에서 Constant folding, Predicate pushdown, Partition pruning, Null propagation, Boolean expression simplification와 같은 최적화를 적용한다.
- Constant folding: compile time에 상수 표현을 인식하고 처리하는 최적화 과정
- Predicate pushdown: 조건절을 가능한 빠르게 적용하는 것, 파일을 읽어온 후 필터링하는 것이아니라 파일을 읽어올때부터 필요한 데이터만 필터링해서 읽는 것
- Partition pruning:
- Null propagation: nullable하지 않은 column에 대해 isnull연산을 수행했을때 isnull연산을 수행하지않고 false를 반환하는 최적화 과정
- Boolean expression simplification: Boolean expression의 연산을 단순화 하는 과정, 예를 들어
(A and B) or A
라는 표현식은 A
와 동일하고 연산량이 적다. Boolean expression simplification적용하면 (A and B) or A
는 A
로 바뀌어 실행된다.
- Optimized Logical plan은 하나 이상의 Pysical Plan을 생성한다. 이 과정에서 만약 실행 계획에 join 알고리즘이 포함되어 있다면 broadcast, sort merge, shuffle hash join 등을 적용한 각각의 pysical plan을 생성한다. 생성된 Pysical Plan들은 Cost Model을 통해 가장 비용효율적인 Pysical plan 하나가 선택된다.
- broadcast join
- 작은 테이블을 전체 노드에 전달하여 큰 테이블의 shuffle 없이 Join을 하는 방식
- 일반적으로 10MB이하의 작은 테이블이 존재할때 사용
- sort merge join
- 테이블을 정렬한 후 join하는 기법
- 두 테이블이 large data size를 가질 경우 유리
- shuffle 과 sort가 요구되며 skew 파티션이 생성될 가능성이 있음
- shuffle hash join
- 큰 테이블과 작은 테이블을 join할때 유리
- 테이블을 셔플한 후 둘 중 작은 테이블로 hash table을 만들어 join 하는 기법
- shuffling, hashing 모두 필요하기 때문에 자원을 많이 사용하는 단점이 있음
- 일반적으로 성능상의 이유로 sort merge join이 더 선호된다.
-
선택된 Pysical Plan은 RDD Operation을 위해 java byte code로 변환되어 실행된다. Spark는 결국 최적화된 Java byte code를 생성하기때문에 complier로 간주되기도 한다.
Spark 메모리 할당
-
spark.driver.memory
- driver container의 jvm process에 할당된 메모리의 크기
-
spark.driver.memoryOverhead
- driver container의 non-jvm process에 할당된 메모리의 크기
- driverMemory*spark.driver.memoryOverhead 와 384MB 중 큰 값으로 설정됨
-
spark.executor.memory
- executor container의 jvm process에 할당된 메모리의 크기
- 단 yarn.scheduler.maximum-allocation-mb 보다 작은 값으로 설정해야 한다.
- yarn.scheduler.maximum-allocation-mb: 컨테이너에 할당할 수 있는 최대 메모리 크기
- yarn.nodemanager.resource.memory-mb: 각 노드 매니저가 컨테이너 할당에 사용할 메모리 크기
-
spark.executor.memoryOverhead
- executor container의 non-jvm process에 할당된 메모리의 크기
- pyspark process, shuffle exchange, network read buffer 등에 사용됨
-
spark.executor.memory.offHeap.size
- off-heap memory를 활용하면 직접 메모리를 관리해 GC delay를 막을 수 있다.
- Storage Memory, Executor Memory로 활용할 수 있다.
spark.executor.memory.offHeap.enable
=true
-
spark.executor.pyspark.memroy
- 대부분의 pyspark application은 external python 라이브러리를 사용하지 않기때문에 default값은 0. python worker를 위한 추가적인 메모리가 필요하다면 사용
Spark 메모리 관리
-
JVM heap memory는 3부분으로 나뉜다. Reserved Memory, Spark Memory, User Memory
- Reserved Memory: Spark Engine을 위한 공간, 300MB로 고정
- Spark Memory: DataFrame Operation, Caching을 위한 공간
spark.memory.fraction
*spark.executor.memory
- 다시 Storage Memory Pool, Executor Memory Pool로 나뉨
- Storage Memory: Cache memory for DataFrame,
spark.memory.storageFraction
=0.5
- Executor Memory: Buffer memory for DataFrame Operations, short-lived(join...)
- executor container의 Slot(thread)들은 Storage Memory를 공유하고 각각의 Executor Memory를 가진다. Executor Memory를 나누는 방법은 Static, Unified memory manager가 있다. Static는 thread들에게 공평하게 공간을 미리 할당하는 것이고 Unified memory manager는 실행중인 thread들에 한해서 메모리를 할당하는 방법이다. Storage Memory와 Executor Memory는 엄격하게 구분되어 있는 것이 아니여서 빈영역이 있다면 서로의 영역을 공유할 수 있다.
- User Memory: Non-DataFrame Operation을 위한 공간
- User-defined data structures(hash map...), Spark internal metadata, UDFs created by user, RDD conversion operations, RDD lineage and dependency ...
- (1-
spark.memory.fraction
)*spark.executor.memory
Partition number 가 중요한 이유
-
partition의 수를 작게 설정하면
- partition 의 size가 커지고
- Task들은 더 많은 메모리를 필요로하고
- OOM이 발생할 가능성이 높아짐
-
partition의 수를 크게 설정하면
- partition의 size가 작아지고
- network fetch가 자주 발생하여 비효율적인 network I/O를 야기함
- Spark Task Scheduler에 더 많은 부하가 가해짐
AQE(Adaptive Query Execution)
-
Spark는 기본적으로 table에 대한 통계정보를 통해 쿼리를 최적화한다. 하지만 통계정보가 충분치 않을 수 도 있고 쿼리가 실행됨에 따라 나오는 중간결과에 따라 초기에 설정한 실행 계획이 최적의 것이 아닐 수 있다. AQE는 이러한 문제점을 해결한다.
-
wide transformation이 실행되면 shuffle & sort가 실행된다. 이때 partition에 대한 통계정보를 동적으로 계산하여 이 정보를 통해 쿼리 플랜을 조정한다.
-
spark.sql.shuffle.parittions
에의해 shuffle후 partition의 수를 설정할 수 있다. 하지만 실질적으로 데이터를 가진 partition의 수가 spark.sql.shuffle.parittions
보다 작다면 나머지는 빈 partition이 생성된다. 물론 빈 partition에 대해서는 task를 스킵하기만 하면되서 수 밀리초 정도밖에 걸리지 않지만 spark scheduler가 빈 Partition에 대해서도 scheduling, monitoring해야하기 때문에 비효율적이다. 이러한 문제를 해결하기위해 AQE는 동적으로 partition수를 조정한다.
-
Dynamically coalescing shuffle partitions
- 동적으로 shuffle partition의 수를 조절함
- 빈 partition은 없애고, 너무 작은 Partition이 있다면 다른 partition과 합쳐서 큰 partition을 만듬(coalesce)
spark.sql.adaptive.enable
: AQE를 enable
spark.sql.adaptive.coalescePartitions.initialPartitionNum
- partition 수의 최대값
- default value가 없어서 값을 설정하지 않으면
spark.sql.shuffle.parittions
로 설정됨
spark.sql.adaptive.coalescePartitions.minPartitionNum
- coalescing 혹은 partition들을 합칠때 partition 수의 최소값
- default value가 없어서 값을 설정하지 않으면
spark.default.parallelism
으로 설정됨
spark.sql.adaptive.advisoryPartitionSizeInBytes
- 권장되는 파티션 사이즈, default 64MB
spark.sql.adaptive.coalescePartitions.enabled
- 이 값을 true로 설정해야 AQE가 작은 partition을 알아서 합침
-
Dynamically switching join strategies
- 보통 큰 두 테이블을 Join할 경우 Sort merge join을 사용한다. 만약 join의 조건으로 인해 하나의 테이블이 수MB 정도로 작아진다면 Sort merge join보타 broadcast join이 유리할 것이다. 하지만 Spark은 실행하기 이전에 이 정보를 알지 못할 수도 있다. filter column에 대한 통계정보를 구하고 있지 않거나 통계정보가 최신정보가 아닐 수도 있다. AQE를 사용하면 이러한 통계정보를 실행하면서 갱신하여 최적의 join strategy로 변경할 수 있다.
spark.sql.adaptive.enable
: AQE를 enable
spark.sql.adaptive.localShuffleReader.enabled
- AQE broadcast join시 더욱 최적화하여 network traffic을 줄여주는 Custum shuffle reader
- default 값이 true, AQE를 사용한다면 이 값을 false로 할일은 없을 듯
-
Dynamically optimizing skew joins
- 분산 처리 기술에는 skew partition은 여러 비효율을 초래한다. skew partition으로 인해 cpu utilization이 낮아 지거나 OOM exceiption이 발생할 수 있다. OOM이 발생한 경우 application의 memory를 늘려 해결할 수 있지만 이것은 좋은 해결책이 아니다. Spark은 특정 executor의 메모리만 늘릴 수 없기때문에 메모리의 낭비가 일어날 수 있기때문이다. AQE를 사용하면 skew partition을 split해 문제를 해결해준다.
spark.sql.adaptive.enable
: AQE를 enable
spark.sql.adaptive.skewJoin.enabled
: skew join을 enable
spark.sql.adaptive.skewJoin.skewedPartitionFactor
- default 값은 5
- (*)partition 크기의 median의 5배보다 크고 skewedPartitionThresholdInBytes보다도 크면 skew partition으로 간주
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- default 값은 256MB
- (*) 조건과 동일
Dynamic Partition Pruning (DPP)
-
Fact table과 Dimesion table을 join하려고 할때 equi join하는 필드로 Fact table이 partitioning되어 있는 경우 Dimesion table의 필드를 기준으로 필터링하게되면 직접적으로 predicate pushdown을 적용할 수 없다.
-
이 경우 Dynamic Partition Pruning을 활성화하고 Dimesion table을 broadcast Join하면 Dimesion table을 기준으로 먼저 filtering한 서브쿼리를 이용해 Fact table으로부터 원하는 정보만 읽을 수 있다.
-
Dynamic Partition Pruning이 활성화되기 위한 조건
- Fact-Dimension 환경
- Fact table은 partitioning 되어 있어야함
- Spark Engine에게 dimension table을 broadcast hint 를 줘야함
Caching
- cahce(), persist() 를 통해 dataframe을 caching할 수 있음
- 둘다 lazy transformation이기때문에 action으로 trigger 해줘야함
- action으로 trigger해준다고 모든 것이 caching되는 것이 아니라 job에서 사용한 partition에 대해서만 caching하고 partition의 일부만 저장하는 일은 없다.
- cache vs persist
- cache, persist 모두 caching을 위한 메소드임
- cache() 메소드는 StorageLevel이 MEMORY_AND_DISK 로 설정됨
- persist는 StorageLevel을 custumizing할 수 있음
- useDisk, useMemory, useOffHeap(Boolean): caching을 어떤 곳에 할것인지
- deserialized(Boolean): 데이터를 어떤 format(serialized or deserialized)으로 저장할 것인지, deserialized format은 저장공간은 더 잡아먹지만 deserializing할 필요가 없어서 CPU 자원을 아낄 수 있다. deserialized format이 일반적으로 권장됨
- replication(int): 몇개를 복제해서 저장할 것인가
- caching을 해제할때는 unpersist() (uncache()는 존재하지 않음)
Broadcast
Repartition and Coalesce
- Repartition
- wide transformation이여서 shuffle/sort를 동반함
- shuffle은 비용이 많이 드는 작업이므로 특별한 이유가 없다면 repartition을 수행하면 안됨
- 시나리오
- 특정 column으로 자주 filtering될때
- 동일한 크기의 Partition으로 나누는 것이 필요할때
- 단순히 partition의 숫자를 줄이고 싶을때는 repartition 말고 coalesce를 사용 (shuffle때문)
- repartition
- Hash based partitioning
- columns를 지정해주지 않으면 동일한 크기의 파티션을 생성함
- partition num을 지정해주지 않으면
spark.sql.shuffle.parittions
으로 지정됨
- repartitionByRange: Range of values based partitioning, partition range를 정하기 위해 Sampling을 이용한다.
- Coalesce
- shuffle/sort없이 local partition을 합치는 역할만 함
- 오직 Partition의 숫자를 줄이기위해 사용
- skew partition을 발생시킬 수 있어 OOM의 위험이 있음
Hints
- 사용자가 Spark Engine에게 hint를 줄 수 있음, Spark Engine은 hint를 참고하지만 무조건적으로 따르지는 않음
- Spark SQL을 사용할때는
/* hint [ , ... ] */
와 같은 형태로 작성가능
- Datafrmae을 사용할 경우
.hint()
- Partitioning Hints
- COALESCE
- REPARTITION
- REPARTITION_BY_RANGE
- REBALANCE : query result를 특정 column을 기준으로 적당한 크기로 재조정해줌, AQE enable이 필요
- Join Hints
- BROADCAST alias BROADCASTJOIN and MAPJOIN
- MERGE alias SHUFFLE_MERGE and MERGEJOIN
- SHUFFLE_HASH
- SHUFFLE_REPLICATE_NL
Broadcast Variables
- worker node마다 caching해서 사용하는 immutable 변수
- lazy serialization: worker node가 적어도 해당 variable을 한번은 사용해야 caching됨
- udf가 variable을 사용하는 경우 broadcast variable을 사용하면 반복적인 serialization을 피할 수 있음
- broadcast variable vs lambda closure
- broadcast variable: worker node마다 caching해서 사용
- lambda closure: task 마다 serialization 필요
Accumulators
- Spark Low Level API이며 자주 사용되지는 않는다.
- Global mutable variable
- accumulator는 driver에 상주하며 executor는 internal communication을 통해 accumulator의 값을 조작한다.
- 데이터를 조작할때 동시에 count, sum과 같은 결과를 얻고 싶을때 사용
- Transformation, Action 과정에서 accumulator연산을 할 수 있지만 Transformation은 반복해서 실행될 가능성이 있기때문에 정확한 값을 얻지 못할수도 있다. 따라서 Action에만 적용하는 것을 권장함
- Scala를 사용하면 named accumulator를 사용할 수 있고 named accumulator는 Spark UI에서 확인할 수 있다.
- Custom accumulator를 생성할 수도 있다.
Speculative Execution
- task가 faulty worker node로 인해 완료되지 않는 것으로 의심되면 다른 node에서 해당 작업을 동시에 실행함
- 둘중 하나의 task가 완료되면 나머지 task는 Kill
- Speculative Execution는 Data Skew, 적은 메모리 할당으로 인한 실행 지연을 faulty worker node로 오인하는 문제가 발생할 수 있다.
- Speculative Execution는 overhead를 동반하기 때문에 대다수의 경우 선호되지 않는다.
- Speculative Execution는 비용이 많이 들기때문에 fine tuning을 위한 다양한 옵션을 제공한다
spark.speculation
를 true로 설정하면 사용할 수 있음 (default: false)
spark.speculation.interval
: 확인하는 주기
spark.speculation.multiplier
: 다른 task execution time의 median*multiplier 이상일때 Speculative Execution 고려
spark.speculation.quantile
: 전체 task가 해당 비율을 넘어가면 Speculative Execution을 고려
spark.speculation.minTaskRuntime
: Speculative Execution이 작동하기 위한 최소 task runtime
spark.speculation.task.duration.threshold
: task duration의 hard limit
Dynamic Resource Allocation
- Static Allocation
- application이 실행될때 resource manager에게 자원을 할당받고 application이 완료되기 전에는 자원을 반납하지 않음
- 현재 사용하고 있지 않더라도 할당받은 자원은 application이 끝날때까지 내놓지 않기때문에 resource utilization에 문제가 생길 수 있음
- FCFS 알고리즘
- Dynamic Allocation
- 자원을 필요한만큼만 사용하고 사용하지 않는 자원에 대해서는 release
- resource manager에 의해 수행되는 것이 아니라 Spark가 자체적으로 자원을 release, 자원이 필요할때는 resource manager에게 요청
spark.dynamicAllocation.enabled
spark.dynamicAllocation.shuffleTracking.enabled
:???
spark.dynamicAllocarion.executorIdleTimeout
: executor에서 작업이 얼마 동안 수행이 안되면 application이 executor를 release하는지
spark.dynamicAllocation.schedulerBacklogTimeout
: pending task가 이 기간동안 backlogged 되면 새로운 executor를 요청함
Spark Schedulers
- Spark는 기본적으로 job을 순차적으로 실행한다. job을 병렬처리하려면 multithreading을 사용하면 된다. 하지만 이 경우에 자원을 서로 차지하려는 경우가 생길 수 있다.
- Spark Job Scheduler는 기본적으로 FIFO 정책을 사용한다. 먼저 실행된 Job에게 자원을 할당해주고 그 후에 들어온 Job에게는 남은 자원내에서 할당해주는 방법이다.
- FAIR 정책을 사용할 수도 있는데, 이것은 Round robin 방식이다.
spark.scheduler.mode
: FAIR, FIFO