Passing Functions to Spark

이상민·2023년 3월 25일
0

spark

목록 보기
7/17

local

$ ./bin/spark-shell

master를 따로 설정하지 않으면 local의 모든 core를 사용하도록 master가 설정 됨

scala> var counter = 0
counter: Int = 0

counter 변수는 driver 내부에 존재

partition: 1

scala> val rdd = sc.parallelize(1 to 10, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.getNumPartitions
res0: Int = 1

scala> rdd.foreach{x => counter += x; println(s"counter : $counter");}
counter : 1                                                         (0 + 1) / 1]
counter : 3
counter : 6
counter : 10
counter : 15
counter : 21
counter : 28
counter : 36
counter : 45
counter : 55

scala 문법중 s-str은 python의 f-str과 유사. 전체 str에서 $이후는 변수를 가지고 옴
위의 counter는 executor 내부에 존재. driver -> executor로 복사한 후 함수를 실행한 결과를 가져옴

scala> println(s"Counter value : $counter ")
Counter value : 0

driver 내부의 counter를 불러오므로 초기에 설정한 값인 0이 출력

partition: 2

scala> val rdd2 = sc.parallelize(1 to 10, 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23

scala> rdd2.getNumPartitions
res3: Int = 2

scala> rdd2.foreach{x => counter += x; println(s"task ${org.apache.spark.TaskContext.get.taskAttemptId} value ${x} counter : $counter");}
task 1 value 1 counter : 1
task 1 value 2 counter : 3
task 1 value 3 counter : 6
task 1 value 4 counter : 10
task 1 value 5 counter : 15
task 2 value 6 counter : 6
task 2 value 7 counter : 13
task 2 value 8 counter : 21
task 2 value 9 counter : 30
task 2 value 10 counter : 40

scala> println(s"Counter value : $counter ")
Counter value : 0

cluster

$ ./sbin/start-master.sh
$ ./sbin/start-worker.sh spark://spark-master-01:7177

$ ./bin/spark-shell --master spark://spark-master-01:7177
scala> var counter = 0
counter: Int = 0

scala> val rdd = sc.parallelize(1 to 10, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.getNumPartitions
res0: Int = 1

scala> rdd.foreach{x => counter += x; println(s"counter : $counter");}

아무것도 출력이 되지 않는다. standalone이므로 executor에서 실행되어 driver에서는 확인 불가능하다. Running Applications(1) -> stdout에서 counter를 확인할 수 있다.

Driver의 변수를 Tasks로 복사하기

scala> val m = Array(1,2,3,4,5)
m: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val data = sc.makeRDD(1 to 10, 2)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:23

scala> val data1 = data.map{r => println(s"map #1 m.hashCode ${m.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId}"); r}
data1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:24

scala> val data2 = data1.map{r => println(s"map #2 m.hashCode ${m.hashCode} taskAttemptId ${org.apache.spark.TaskContext.get.taskAttemptId}"); r}
data2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:24

scala> data2.getNumPartitions
res6: Int = 2

scala> m.hashCode
res7: Int = 473819174

scala> data2.count
res8: Long = 10

아래는 stdout에서 확인

map #1 m.hashCode 370057016 taskAttemptId 3
map #2 m.hashCode 370057016 taskAttemptId 3
map #1 m.hashCode 370057016 taskAttemptId 3
map #2 m.hashCode 370057016 taskAttemptId 3
map #1 m.hashCode 370057016 taskAttemptId 3
map #2 m.hashCode 370057016 taskAttemptId 3
map #1 m.hashCode 370057016 taskAttemptId 3
map #2 m.hashCode 370057016 taskAttemptId 3
map #1 m.hashCode 370057016 taskAttemptId 3
map #2 m.hashCode 370057016 taskAttemptId 3
map #1 m.hashCode 1116474388 taskAttemptId 4
map #2 m.hashCode 1116474388 taskAttemptId 4
map #1 m.hashCode 1116474388 taskAttemptId 4
map #2 m.hashCode 1116474388 taskAttemptId 4
map #1 m.hashCode 1116474388 taskAttemptId 4
map #2 m.hashCode 1116474388 taskAttemptId 4
map #1 m.hashCode 1116474388 taskAttemptId 4
map #2 m.hashCode 1116474388 taskAttemptId 4
map #1 m.hashCode 1116474388 taskAttemptId 4
map #2 m.hashCode 1116474388 taskAttemptId 4

각 task별로 hashcode가 다른 것을 확인 가능 -> task별로 변수가 각각 복사됨

같은 코드 다시 수행

scala> data2.count
res9: Long = 10

map #1 m.hashCode 112823513 taskAttemptId 5
map #2 m.hashCode 112823513 taskAttemptId 5
map #1 m.hashCode 112823513 taskAttemptId 5
map #2 m.hashCode 112823513 taskAttemptId 5
map #1 m.hashCode 112823513 taskAttemptId 5
map #2 m.hashCode 112823513 taskAttemptId 5
map #1 m.hashCode 112823513 taskAttemptId 5
map #2 m.hashCode 112823513 taskAttemptId 5
map #1 m.hashCode 112823513 taskAttemptId 5
map #2 m.hashCode 112823513 taskAttemptId 5
map #1 m.hashCode 258219112 taskAttemptId 6
map #2 m.hashCode 258219112 taskAttemptId 6
map #1 m.hashCode 258219112 taskAttemptId 6
map #2 m.hashCode 258219112 taskAttemptId 6
map #1 m.hashCode 258219112 taskAttemptId 6
map #2 m.hashCode 258219112 taskAttemptId 6
map #1 m.hashCode 258219112 taskAttemptId 6
map #2 m.hashCode 258219112 taskAttemptId 6
map #1 m.hashCode 258219112 taskAttemptId 6
map #2 m.hashCode 258219112 taskAttemptId 6

액션 실행 때 마다 변수 카피 반복됨

scala> var count = 0
scala> val rdd = sc.textFile("README.md")
scala> rdd.partitions.size
scala> rdd.foreach { n =>
count = count + 1
val tc = org.apache.spark.TaskContext.get
println(s"""|-------------------
|partitionId: ${tc.partitionId}
|element: ${n}
|stageId: ${tc.stageId}
|attemptNum: ${tc.attemptNumber}
|taskAttemptId: ${tc.taskAttemptId}
|count: ${count}
|-------------------""".stripMargin)
}
scala> println(count)
-------------------
partitionId: 0
element: # Apache Spark
stageId: 0
attemptNum: 0
taskAttemptId: 0
count: 1
-------------------
...
-------------------
partitionId: 1
element:     ./bin/pyspark
stageId: 0
attemptNum: 0
taskAttemptId: 1
count: 2
-------------------

stageId, taskAttemptId는 spark Web UI: http://spark-master-01:4040/ 의 stages에서 확인 가능
driver의 변수를 executor쪽으로 task별로 copy될 때 network를 타고 copy된다 -> serializable해야 한다.

not serializable

scala> class Customer(val id: String)
defined class Customer

scala> val n = new Customer("skybluelee")
n: Customer = Customer@78500fc9

scala> val rdd = sc.range(1, 10, numSlices=3)
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:23

scala> rdd.map{r => println(s"n.hashCode ${n.hashCode} task ${org.apache.spark.TaskContext.get.taskAttemptId}"); r}.count
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: Customer

이를 해결하기 위해서

scala> class Customer(val id: String) extends Serializable
defined class Customer

scala> val n = new Customer("skybluelee")
n: Customer = Customer@402a6c1

scala> val rdd = sc.range(1, 10, numSlices=3)
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at range at <console>:23

scala> rdd.map{r => println(s"n.hashCode ${n.hashCode} task ${org.apache.spark.TaskContext.get.taskAttemptId}"); r}.count
res4: Long = 9
-------------------
n.hashCode 760345915 task 3
n.hashCode 760345915 task 3
n.hashCode 760345915 task 3
n.hashCode 1881337265 task 2
n.hashCode 1881337265 task 2
n.hashCode 1881337265 task 2
n.hashCode 1294539176 task 4
n.hashCode 1294539176 task 4
n.hashCode 1294539176 task 4

3개의 partition 확인 가능

0개의 댓글