📃 Spark Java를 이용해 Json구조 형태 데이터 다루기
package org.programming.spark;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.programming.spark.model.DataLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class SparkMain {
private static Logger Log = LoggerFactory.getLogger(SparkMain.class);
public static void main(String[] args) {
SparkSession session = SparkSession.builder()
.appName("Spark Programming")
.master("local[*]")
.getOrCreate();
Log.info("Start Spark Session Programming");
/**
* 데이터셋 경로 = resources 내부에 존재
*/
String path = SparkMain.class.getClassLoader().getResource("1995.json").getPath();
Log.info("path: {}", path);
/**
* 데이터셋 읽어 오기
*/
Dataset<String> rowDataSet = session.read().textFile(path);
/**
* Json 데이터 스키마 선언
*/
List<StructField> fieldList = new ArrayList<>();
fieldList.add(DataTypes.createStructField("host", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("timestamp", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("request", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("http_reply", DataTypes.IntegerType, true));
fieldList.add(DataTypes.createStructField("bytes", DataTypes.IntegerType, true));
StructType schemas = DataTypes.createStructType(fieldList);
/***
* Json 구조의 데이터를 각 컬럼의 형태로 변환
***/
Dataset<Row> parsed = rowDataSet
.select(functions.from_json(functions.column("value"), schemas)
.as("js"))
.select("js.*");
/***
* DataLog 클래스로 변환하기 위한 인코더
***/
Encoder<DataLog> dataLogEncoder = Encoders.bean(DataLog.class);
/**
* DataLog 클래스로 변환 테스트
*/
Dataset<DataLog> dataLogDS = parsed.as(dataLogEncoder).as("dataLogDS");
Dataset<Row> countByHostDS = DataLog.countByHost(dataLogDS).as("countDS");
/**
* 호스트별 카운팅
*/
Dataset<Row> joinedDS = countByHostDS.join(dataLogDS, "host");
/**
* 각 호스트별 정보를 JsonArray로 변경
*/
joinedDS.groupBy(functions.col("host"))
.agg(functions.collect_list(
functions.struct("timestamp", "request", "http_reply","bytes","count")
)
.as("http_reply"))
.withColumn("http_reply_json", functions.to_json(functions.col("http_reply")))
.drop(functions.col("http_reply"))
.show(10,false);
}
}
{"host":"199.72.81.55","timestamp":"1995-07-01T06:00:01.000+02:00","request":"GET /history/apollo/ HTTP/1.0","http_reply":200,"bytes":6245}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:06.000+02:00","request":"GET /shuttle/countdown/ HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:09.000+02:00","request":"GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0","http_reply":200,"bytes":4085}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:11.000+02:00","request":"GET /shuttle/countdown/liftoff.html HTTP/1.0","http_reply":304,"bytes":0}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:11.000+02:00","request":"GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0","http_reply":200,"bytes":4179}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":304,"bytes":0}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /shuttle/countdown/video/livevideo.gif HTTP/1.0","http_reply":200,"bytes":0}
{"host":"205.212.115.106","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /shuttle/countdown/countdown.html HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:13.000+02:00","request":"GET /shuttle/countdown/ HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"129.94.144.152","timestamp":"1995-07-01T06:00:13.000+02:00","request":"GET / HTTP/1.0","http_reply":200,"bytes":7074}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /shuttle/countdown/count.gif HTTP/1.0","http_reply":200,"bytes":40310}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":200,"bytes":786}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /images/KSC-logosmall.gif HTTP/1.0","http_reply":200,"bytes":1204}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /shuttle/countdown/count.gif HTTP/1.0","http_reply":200,"bytes":40310}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":200,"bytes":786}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /images/KSC-logosmall.gif HTTP/1.0","http_reply":200,"bytes":1204}
{"host":"129.94.144.152","timestamp":"1995-07-01T06:00:17.000+02:00","request":"GET /images/ksclogo-medium.gif HTTP/1.0","http_reply":304,"bytes":0}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:17.000+02:00","request":"GET /images/launch-logo.gif HTTP/1.0","http_reply":200,"bytes":1713}
package org.programming.spark.model;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
@Getter
@Setter
@NoArgsConstructor
public class DataLog {
private String host;
private String timestamp;
private String request;
private Integer http_reply;
private Integer bytes;
public static Dataset<Row> countByHost(Dataset<DataLog> dataLogDataset){
return dataLogDataset.select(functions.col("dataLogDS.*"))
.groupBy("host")
.count();
}
}
다음 아래와 같이 나온 데이터 프레임을 확인할 수 있다.