Accumulator

이상민·2023년 3월 26일
0

spark

목록 보기
11/17

task들의 결과 값을 driver에서 확인 가능

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).repartition(10).foreach(x => accum.add(x))

scala> accum.value
res1: Long = 10

scala> sc.parallelize(Array(1, 2, 3, 4)).repartition(10).foreach(x => accum.add(x))

scala> accum.value
res3: Long = 20

값이 계속해서 증가하지만
Spark shell web UI: http://spark-master-01:4040/ 안에서는 이전의 값 확인 불가능하며 각 executor의 결과와 최종 결과만 나와있다.

scala> val rdd = sc.range(1, 5)
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[11] at range at <console>:23

scala> rdd.foreach(accum.add(_))

scala> accum.value
res5: Long = 30

scala> rdd.foreach{x => accum.add(x); accum.add(x);}

scala> accum.value
res7: Long = 50
scala> val accum2 = sc.collectionAccumulator[Long]("My Accumlator2")
accum2: org.apache.spark.util.CollectionAccumulator[Long] = CollectionAccumulator(id: 151, name: Some(My Accumlator2), value: [])

scala> rdd.foreach{x => accum.add(x); accum2.add(x);}

scala> accum.value
res9: Long = 60

scala> accum2.value
res10: java.util.List[Long] = [1, 3, 2, 4]

0개의 댓글