RDD Map-Side Join

이상민·2023년 3월 26일
0

spark

목록 보기
12/17

Map-Side Join

큰 테이블과 상대적으로 작은 테이블간 join할 때 사용
Large table <=> Fact table
Small table <=> Dimension table

Standalone Cluster 실행
$ ./sbin/start-master.sh
$ ./sbin/start-worker.sh spark://spark-master-01:7177

Spark Shell 실행
$ ./bin/spark-shell --master spark://spark-master-01:7177
- Fact table (Large table)....
scala> val flights = sc.parallelize(List(
     |   ("SEA", "JFK", "DL", "418",  "7:00"),
     |   ("SFO", "LAX", "AA", "1250", "7:05"),
     |   ("SFO", "JFK", "VX", "12",   "7:05"),
     |   ("JFK", "LAX", "DL", "424",  "7:10"),
     |   ("LAX", "SEA", "DL", "5737", "7:10")))
flights: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23

- Dimension table #1 (Small table)....
scala> val airports = sc.makeRDD(List(
     |   ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
     |   ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
     |   ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
     |   ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
airports: org.apache.spark.rdd.RDD[(String, String, String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:23

- Dimension table #2 (Small table)....
scala> val airlines = sc.parallelize(List(
     |   ("AA", "American Airlines"),
     |   ("DL", "Delta Airlines"),
     |   ("VX", "Virgin America")))
airlines: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala 결과 창에서 table들이 rdd에 즉 executor에 있음을 알 수 있다. broadcast를 하기 위해서는 driver -> executor로 전송하므로 table을 driver로 갖고 와야 한다.

// ("JFK", "John F. Kennedy International Airport", "New York", "NY")
scala> val airportsMap = sc.broadcast(airports.map{case (a, b, c, d) => (a, c)}.collectAsMap)
airportsMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(1)

scala> val airlinesMap = sc.broadcast(airlines.collectAsMap)
airlinesMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(3)

collect는 array 형태로 갖고 오고 collectAsMap은 map 형태 즉 (key, value) 형태(pair)로 가지고 온다. 따라서 collect와 달리 collectAsMap은 항상 사용할 수 없다.

// ("SEA", "JFK", "DL", "418",  "7:00")
scala> flights.map{case (a, b, c, d, e) => (
     |   airportsMap.value.get(a).get,
     |   airportsMap.value.get(b).get,
     |   airlinesMap.value.get(c).get,
     |   d,
     |   e)}.collect.foreach(println)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)

Rdd Map-Side Join(Shuffle)

Join 1

flights
("SEA", "JFK", "DL", "418",  "7:00")
airports
("JFK", "John F. Kennedy International Airport", "New York", "NY")

scala> val flightsJoin = flights.keyBy(_._1).join(airports.keyBy(_._1))
flightsJoin: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String, String, String)))] = MapPartitionsRDD[9] at join at <console>:24

flights.keyBy(._1) -> ("SEA", ("SEA", "JFK", "DL", "418", "7:00"))
airports.keyBy(
._1) -> ("JFK", ("JFK", "John F. Kennedy International Airport", "New York", "NY"))

scala> flightsJoin.collect.foreach(println)
(LAX,((LAX,SEA,DL,5737,7:10),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(SFO,((SFO,LAX,AA,1250,7:05),(SFO,San Francisco International Airport,San Francisco,CA)))
(SFO,((SFO,JFK,VX,12,7:05),(SFO,San Francisco International Airport,San Francisco,CA)))
(SEA,((SEA,JFK,DL,418,7:00),(SEA,Seattle-Tacoma International Airport,Seattle,WA)))
(JFK,((JFK,LAX,DL,424,7:10),(JFK,John F. Kennedy International Airport,New York,NY)))

scala> val flights2 = flightsJoin.map(x => (x._2._2._3, x._2._1._2, x._2._1._3, x._2._1._4,\
									        x._2._1._5))
flights2: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[10] at map at <console>:23

scala> flights2.collect.foreach(println)
(Los Angeles,SEA,DL,5737,7:10)
(San Francisco,LAX,AA,1250,7:05)
(San Francisco,JFK,VX,12,7:05)
(Seattle,JFK,DL,418,7:00)
(New York,LAX,DL,424,7:10)

Join2

scala> val flightsJoin2 = flights2.keyBy(_._2).join(airports.keyBy(_._1))
flightsJoin2: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String, String, String)))] = MapPartitionsRDD[15] at join at <console>:24

scala> flightsJoin2.collect.foreach(println)
(LAX,((San Francisco,LAX,AA,1250,7:05),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(LAX,((New York,LAX,DL,424,7:10),(LAX,Los Angeles International Airport,Los Angeles,CA)))
(SEA,((Los Angeles,SEA,DL,5737,7:10),(SEA,Seattle-Tacoma International Airport,Seattle,WA)))
(JFK,((San Francisco,JFK,VX,12,7:05),(JFK,John F. Kennedy International Airport,New York,NY)))
(JFK,((Seattle,JFK,DL,418,7:00),(JFK,John F. Kennedy International Airport,New York,NY)))

scala> val flights3 = flightsJoin2.map(x => (x._2._1._1, x._2._2._3, x._2._1._3, x._2._1._4, x._2._1._5))
flights3: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[16] at map at <console>:23

scala> flights3.collect.foreach(println)
(San Francisco,Los Angeles,AA,1250,7:05)
(New York,Los Angeles,DL,424,7:10)
(Los Angeles,Seattle,DL,5737,7:10)
(San Francisco,New York,VX,12,7:05)
(Seattle,New York,DL,418,7:00)

Join 3

scala> val flightsJoin3 = flights3.keyBy(_._3).join(airlines.keyBy(_._1))
flightsJoin3: org.apache.spark.rdd.RDD[(String, ((String, String, String, String, String), (String, String)))] = MapPartitionsRDD[21] at join at <console>:24

scala> flightsJoin3.collect.foreach(println)
(DL,((New York,Los Angeles,DL,424,7:10),(DL,Delta Airlines)))
(DL,((Los Angeles,Seattle,DL,5737,7:10),(DL,Delta Airlines)))
(DL,((Seattle,New York,DL,418,7:00),(DL,Delta Airlines)))
(AA,((San Francisco,Los Angeles,AA,1250,7:05),(AA,American Airlines)))
(VX,((San Francisco,New York,VX,12,7:05),(VX,Virgin America)))

scala> val flights4 = flightsJoin3.map(x => (x._2._1._1, x._2._1._2, x._2._2._2, x._2._1._4, x._2._1._5))
flights4: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[22] at map at <console>:23

scala> flights4.collect.foreach(println)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)

한번에 Join

scala> flights.
     |   keyBy(_._1).join(airports.keyBy(_._1)).
     |   map(x => (x._2._2._3, x._2._1._2, x._2._1._3, x._2._1._4, x._2._1._5)).
     |   keyBy(_._2).join(airports.keyBy(_._1)).
     |   map(x => (x._2._1._1, x._2._2._3, x._2._1._3, x._2._1._4, x._2._1._5)).
     |   keyBy(_._3).join(airlines.keyBy(_._1)).
     |   map(x => (x._2._1._1, x._2._1._2, x._2._2._2, x._2._1._4, x._2._1._5)).
     |   collect.foreach(println)
(New York,Los Angeles,Delta Airlines,424,7:10)
(Los Angeles,Seattle,Delta Airlines,5737,7:10)
(Seattle,New York,Delta Airlines,418,7:00)
(San Francisco,Los Angeles,American Airlines,1250,7:05)
(San Francisco,New York,Virgin America,12,7:05)

shuffle이 일어나지 않는 map side join이 shuffle이 일어나는 join에 비해 실행 속도가 빠르다

big table no shuffle

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,1343,1325,1451,1435,WN,588,N240WN,68,70,55,16,18,HOU,LIT,393,4,9,0,,0,16,0,0,0,0
2008,1,3,4,1125,1120,1247,1245,WN,1343,N523SW,82,85,71,2,5,HOU,MAF,441,3,8,0,,0,NA,NA,NA,NA,NA
...

scala> val flights_air = sc.textFile("/skybluelee/data/airline_on_time/2008.csv").
     |   filter(!_.startsWith("Year")).
     |   map{x =>
     |     val arr = x.split(",")
     |     val origin = arr(16)
     |     val dest = arr(17)
     |     val uniquecarrier = arr(8)
     |     val flightnum = arr(9)
     |     val deptime = arr(4)
     |     (origin, dest, uniquecarrier, flightnum, deptime)
     |   }
flights_air: org.apache.spark.rdd.RDD[(String, String, String, String, String)] = MapPartitionsRDD[37] at map at <console>:25

filter(!_.startsWith("Year")) "Year"로 시작하지 않는값만 얻음 -> 첫번째 라인 필터링

"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
...

scala> val airports_air = sc.textFile("/skybluelee/data/airline_on_time_ext/airports.csv").
     |   map(_.replaceAll("\"", "")).
     |   filter(!_.startsWith("iata")).
     |   map{x =>
     |     val arr = x.split(",")
     |     val iata = arr(0)
     |     val airport = arr(1)
     |     val city = arr(2)
     |     val state = arr(3)
     |     (iata, airport, city, state)
     |   }
airports_air: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[42] at map at <console>:26

"Thigpen " 의 경우 str임을 나타내기 위해 "를 사용하였지만 사용하는 입장에서는 불필요하다. 따라서 "를 제거하기 위해 replace 함수를 사용한다.

Code,Description
"02Q","Titan Airways"
"04Q","Tradewind Aviation"
"05Q","Comlux Aviation, AG"
...

scala> val airlines_air = sc.textFile("/skybluelee/data/airline_on_time_ext/carriers.csv").
     |   map(_.replaceAll("\"", "")).
     |   filter(!_.startsWith("Code")).
     |   map{x =>
     |     val arr = x.split(",")
     |     val code = arr(0)
     |     val description = arr(1)
     |     (code, description)
     |   }
airlines_air: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[47] at map at <console>:26

scala> val airportsMap_air = sc.broadcast(airports_air.map{case (a, b, c, d) => (a, c)}.collectAsMap)
airportsMap_air: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(19)

scala> val airlinesMap_air = sc.broadcast(airlines_air.collectAsMap)
airlinesMap_air: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(21)

scala> flights_air.map{case (a, b, c, d, e) => (
     |   airportsMap_air.value.get(a).get,
     |   airportsMap_air.value.get(b).get,
     |   airlinesMap_air.value.get(c).get,
     |   d,
     |   e)}.count()
res2: Long = 2389217

scala> flights_air.map{case (a, b, c, d, e) => (
     |   airportsMap_air.value.get(a).get,
     |   airportsMap_air.value.get(b).get,
     |   airlinesMap_air.value.get(c).get,
     |   d,
     |   e)}.take(5).foreach(println)
(Houston,Little Rock,Southwest Airlines Co.,588,1343)
(Houston,Midland,Southwest Airlines Co.,1343,1125)
(Houston,Midland,Southwest Airlines Co.,3841,2009)
(Houston,Orlando,Southwest Airlines Co.,3,903)
(Houston,Orlando,Southwest Airlines Co.,25,1423)

Map Side-Join이 RDD Join보다 더 빠르다. RDD Join을 사용해야 하는 경우를 제외하고는 sql이나 map side-join을 사용할 것.

big table spark sql

scala> val flightsDF_air = flights_air.toDF("origin", "destination", "airline", "flight_num", "flight_time")
flightsDF_air: org.apache.spark.sql.DataFrame = [origin: string, destination: string ... 3 more fields]

scala> val airportsDF_air = airports_air.toDF("code", "name", "city", "state")
airportsDF_air: org.apache.spark.sql.DataFrame = [code: string, name: string ... 2 more fields]

scala> val airlinesDF_air = airlines_air.toDF("code", "name")
airlinesDF_air: org.apache.spark.sql.DataFrame = [code: string, name: string]

scala> flightsDF_air.as("a").
     |   join(broadcast(airportsDF_air.as("b")), $"a.origin" === $"b.code").
     |   join(broadcast(airportsDF_air.as("c")), $"a.destination" === $"c.code").
     |   join(broadcast(airlinesDF_air.as("d")), $"a.airline" === $"d.code").
     |   select($"b.city".as("origin"), $"c.city".as("destination"), $"d.name".as("airline"), $"a.flight_num", $"a.flight_time").
     |   show(false)
+-------+-----------+----------------------+----------+-----------+
|origin |destination|airline               |flight_num|flight_time|
+-------+-----------+----------------------+----------+-----------+
|Houston|Little Rock|Southwest Airlines Co.|588       |1343       |
|Houston|Midland    |Southwest Airlines Co.|1343      |1125       |
|Houston|Midland    |Southwest Airlines Co.|3841      |2009       |
|Houston|Orlando    |Southwest Airlines Co.|3         |903        |
|Houston|Orlando    |Southwest Airlines Co.|25        |1423       |
|Houston|Orlando    |Southwest Airlines Co.|51        |2024       |
|Houston|Orlando    |Southwest Airlines Co.|940       |1753       |
|Houston|Orlando    |Southwest Airlines Co.|2621      |622        |
|Houston|Chicago    |Southwest Airlines Co.|389       |1944       |
|Houston|Chicago    |Southwest Airlines Co.|519       |1453       |
|Houston|Chicago    |Southwest Airlines Co.|894       |2030       |
|Houston|Chicago    |Southwest Airlines Co.|969       |708        |
|Houston|Chicago    |Southwest Airlines Co.|2174      |1749       |
|Houston|Chicago    |Southwest Airlines Co.|2445      |1217       |
|Houston|Chicago    |Southwest Airlines Co.|2974      |954        |
|Houston|New Orleans|Southwest Airlines Co.|41        |1758       |

.show()는 항상 20개를 보여줌

0개의 댓글