Whole File-Based RDDs

이상민·2023년 3월 26일
0

spark

목록 보기
9/17
$ vi file1.json
{
  "firstName":"Fred",
  "lastName":"Flintstone",
  "userId":123
}

$ vi file2.json
{
  "firstName":"Barney",
  "lastName":"Rubble",
  "userId":234
}

$ ./bin/spark-shell

파일을 executor로 복사하기 위해서는 hdfs 상에 파일을 생성해야 함
여기서는 local에서 실행

scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON

scala> val myRDD = sc.wholeTextFiles("./*.json")
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ./*.json MapPartitionsRDD[1] at wholeTextFiles at <console>:24

spark는 파일을 line 단위로 읽는데, wholeTextFiles를 사용하면 파일 단위로 파일을 읽는다.

scala> myRDD.collect.foreach(println)
(file:/skybluelee/spark3/file1.json,{
  "firstName":"Fred",
  "lastName":"Flintstone",
  "userId":123
}
)
(file:/skybluelee/spark3/file2.json,{
  "firstName":"Barney",
  "lastName":"Rubble",
  "userId":234
}
)

print는 줄바꿈 없이, println은 줄바꿈을 하고 출력함

scala> val myRDD2 = myRDD.map(pair => JSON.parseFull(pair._2).get.asInstanceOf[Map[String,Any]])
warning: one deprecation (since 1.0.6); for details, enable `:setting -deprecation' or `:replay -deprecation'
myRDD2: org.apache.spark.rdd.RDD[Map[String,Any]] = MapPartitionsRDD[2] at map at <console>:24

parseFull(pair._2)
-> pair._1: file:/skybluelee/spark3/file1.json
pair._2: {
"firstName":"Fred",
"lastName":"Flintstone",
"userId":123
}
pair._2에 대해 parsing
Map[String,Any]
String: "firstName", "lastName", "userId" -> 항상 str값이고
Any: "Fred", "Flintstone", 123 -> 자료형이 여러가지임

scala> val arrMap = myRDD2.take(2)
arrMap: Array[Map[String,Any]] = Array(Map(firstName -> Fred, lastName -> Flintstone,\
userId -> 123.0), Map(firstName -> Barney, lastName -> Rubble, userId -> 234.0))
scala> arrMap.foreach(x => println(x.getOrElse("firstName",null)))
Fred
Barney

x = Map
firstName이면 갖고오고 그렇지 않다면 갖고오지 않는다.

spark sql

scala> val myDF = spark.read.option("multiLine", true).json("./*.json")
myDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName: string ... 1 more field]

.json: 알아서 json을 parsing해줌

scala> myDF.printSchema()
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- userId: long (nullable = true)


scala> myDF.select("firstName").show()
+---------+
|firstName|
+---------+
|     Fred|
|   Barney|
+---------+

0개의 댓글