Broadcast

이상민·2023년 3월 26일
0

spark

목록 보기
10/17

broadcast variable

Standalone Cluster 실행 (SPARK_WORKER_INSTANCES=3 start-worker.sh --cores 8 --memory 8G)
$ ./sbin/start-master.sh --host spark-master-01 --port 7177 --webui-port 8180
$ SPARK_WORKER_INSTANCES=3 ./sbin/start-worker.sh spark://spark-master-01:7177 --host spark-master-01 --port 12345 --webui-port 8181 --cores 8 --memory 8G --work-dir ./work_dir_4_worker

Spark Shell 실행
$ ./bin/spark-shell --master spark://spark-master-01:7177 --executor-cores 2 --executor-memory 2G --total-executor-cores 6
scala> val m = Array(1, 2, 3)
m: Array[Int] = Array(1, 2, 3)

scala> val broadcastVar = sc.broadcast(m)
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

scala> broadcastVar.value.hashCode
res1: Int = 1036277238

scala> m.hashCode
res2: Int = 1036277238

broadcast 이전의 m과 broadcastVar은 동일

scala> val rdd = sc.makeRDD(1 to 100, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:23

scala> val rdd1 = rdd .map{r => println(s"map #1 broadcastVar.value.hashCode ${broadcastVar.value.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}"); r}
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:24

scala> val rdd2 = rdd1.repartition(10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:23

repartition은 shuffle 용도로 stage를 다르게 만들기 위함

scala> val rdd3 = rdd2.map{r => println(s"map #2 broadcastVar.value.hashCode ${broadcastVar.value.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}"); r}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:24

scala> rdd3.count
res3: Long = 100

master web UI: http://spark-master-01:8180/ Running Applications (1) -> Application ID -> Executor Summary (3) -> stdout에서 확인
같은 executor 내부의 hashCode는 전부 동일 -> broadcast()값은 한 번만 복사됨
taskId, stageId는 다를 수도 있음

driver variable vs broadcast variable

scala> val data = Array(1, 2, 3)
data: Array[Int] = Array(1, 2, 3)

scala> val bcData = sc.broadcast(data)
bcData: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> bcData.value
res0: Array[Int] = Array(1, 2, 3)

scala> bcData.value.hashCode
res1: Int = 1380065170

scala> data.hashCode
res2: Int = 1380065170

web ui에서 보다 편하게 보기 위해 scala 중지 후 재실행 함

scala> val rdd = sc.makeRDD(1 to 10, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:23

scala> rdd.getNumPartitions
res3: Int = 5

scala> val rdd1 = rdd.map{r =>
     |   println(s"""
     |     map #1 data ${data.toList} data.hashCode ${data.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}
     |     map #1 bcData ${bcData.value.toList} bcData.hashCode ${bcData.value.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}
     |   """)
     |   r
     | }
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> val rdd2 = rdd1.repartition(10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:23

scala> rdd2.getNumPartitions
res4: Int = 10

scala> val rdd3 = rdd2.map{r =>
     |   println(s"""
     |     map #2 data ${data.toList} data.hashCode ${data.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}
     |     map #2 bcData ${bcData.value.toList} bcData.hashCode ${bcData.value.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId} stageId ${org.apache.spark.TaskContext.get.stageId}
     |   """)
     |   r
     | }
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:25

scala> rdd3.count
res5: Long = 10

scala> rdd3.collect
res6: Array[Int] = Array(1, 2, 7, 5, 6, 8, 3, 4, 9, 10)

master web UI: http://spark-master-01:8180/ Running Applications (1) -> Application ID -> Executor Summary (3) -> stdout에서 확인
data.hashCode(driver)는 executer 내부에서 taskId 별로 hashCode가 다름
bcData.hashCode(broadcast)는 executer 내부에서 동일

0개의 댓글