[Spark] Java Json 구조 데이터 다루기

최지영·2023년 4월 7일
0

📃 Spark Java를 이용해 Json구조 형태 데이터 다루기

🎈 순서


  1. Resource 디렉토리 내에 있는 Json구조 데이터 Read
  2. 스키마 선언
  3. Json구조 데이터를 DataFrame에 맞게 변환
  4. DataLog 클래스로 변환 (Scala라면 Case Class)
  5. 호스트별 데이터 카운팅
  6. 호스트별 JsonArray로 변경

📃 메인 코드


package org.programming.spark;

import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.programming.spark.model.DataLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class SparkMain {

    private static Logger Log = LoggerFactory.getLogger(SparkMain.class);

    public static void main(String[] args) {

        SparkSession session = SparkSession.builder()
                .appName("Spark Programming")
                .master("local[*]")
                .getOrCreate();
        
        Log.info("Start Spark Session Programming");
        
        /**
         *  데이터셋 경로 = resources 내부에 존재
         */
        String path = SparkMain.class.getClassLoader().getResource("1995.json").getPath();
        Log.info("path: {}", path);
        /**
         *  데이터셋 읽어 오기 
         */
        Dataset<String> rowDataSet = session.read().textFile(path);

        /**
         *  Json 데이터 스키마 선언
         */
        List<StructField> fieldList = new ArrayList<>();
        fieldList.add(DataTypes.createStructField("host", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("timestamp", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("request", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("http_reply", DataTypes.IntegerType, true));
        fieldList.add(DataTypes.createStructField("bytes", DataTypes.IntegerType, true));

        StructType schemas = DataTypes.createStructType(fieldList);

        /***
         *  Json 구조의 데이터를 각 컬럼의 형태로 변환 
         ***/
        Dataset<Row> parsed = rowDataSet
                .select(functions.from_json(functions.column("value"), schemas)
                        .as("js"))
                .select("js.*");

        /***
         *  DataLog 클래스로 변환하기 위한 인코더   
         ***/
        Encoder<DataLog> dataLogEncoder = Encoders.bean(DataLog.class);

        /**
         * DataLog 클래스로 변환 테스트
         */
        Dataset<DataLog> dataLogDS = parsed.as(dataLogEncoder).as("dataLogDS");
        Dataset<Row> countByHostDS = DataLog.countByHost(dataLogDS).as("countDS");

        /**
         *  호스트별 카운팅
         */
        Dataset<Row> joinedDS = countByHostDS.join(dataLogDS, "host");

        /**
         *  각 호스트별 정보를 JsonArray로 변경
         */
        joinedDS.groupBy(functions.col("host"))
                .agg(functions.collect_list(
                        functions.struct("timestamp", "request", "http_reply","bytes","count")
                        )
                        .as("http_reply"))
                .withColumn("http_reply_json", functions.to_json(functions.col("http_reply")))
                .drop(functions.col("http_reply"))
                .show(10,false);

    }
}

📃 1995.json 파일


{"host":"199.72.81.55","timestamp":"1995-07-01T06:00:01.000+02:00","request":"GET /history/apollo/ HTTP/1.0","http_reply":200,"bytes":6245}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:06.000+02:00","request":"GET /shuttle/countdown/ HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:09.000+02:00","request":"GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0","http_reply":200,"bytes":4085}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:11.000+02:00","request":"GET /shuttle/countdown/liftoff.html HTTP/1.0","http_reply":304,"bytes":0}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:11.000+02:00","request":"GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0","http_reply":200,"bytes":4179}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":304,"bytes":0}
{"host":"burger.letters.com","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /shuttle/countdown/video/livevideo.gif HTTP/1.0","http_reply":200,"bytes":0}
{"host":"205.212.115.106","timestamp":"1995-07-01T06:00:12.000+02:00","request":"GET /shuttle/countdown/countdown.html HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:13.000+02:00","request":"GET /shuttle/countdown/ HTTP/1.0","http_reply":200,"bytes":3985}
{"host":"129.94.144.152","timestamp":"1995-07-01T06:00:13.000+02:00","request":"GET / HTTP/1.0","http_reply":200,"bytes":7074}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /shuttle/countdown/count.gif HTTP/1.0","http_reply":200,"bytes":40310}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":200,"bytes":786}
{"host":"unicomp6.unicomp.net","timestamp":"1995-07-01T06:00:14.000+02:00","request":"GET /images/KSC-logosmall.gif HTTP/1.0","http_reply":200,"bytes":1204}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /shuttle/countdown/count.gif HTTP/1.0","http_reply":200,"bytes":40310}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /images/NASA-logosmall.gif HTTP/1.0","http_reply":200,"bytes":786}
{"host":"d104.aa.net","timestamp":"1995-07-01T06:00:15.000+02:00","request":"GET /images/KSC-logosmall.gif HTTP/1.0","http_reply":200,"bytes":1204}
{"host":"129.94.144.152","timestamp":"1995-07-01T06:00:17.000+02:00","request":"GET /images/ksclogo-medium.gif HTTP/1.0","http_reply":304,"bytes":0}
{"host":"199.120.110.21","timestamp":"1995-07-01T06:00:17.000+02:00","request":"GET /images/launch-logo.gif HTTP/1.0","http_reply":200,"bytes":1713}

DataLog 클래스

 package org.programming.spark.model;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

@Getter
@Setter
@NoArgsConstructor
public class DataLog {
        private String host;
        private String timestamp;
        private String request;
        private Integer http_reply;
        private Integer bytes;


        public static Dataset<Row> countByHost(Dataset<DataLog> dataLogDataset){
                return dataLogDataset.select(functions.col("dataLogDS.*"))
                        .groupBy("host")
                        .count();

        }
}

📜 결과


다음 아래와 같이 나온 데이터 프레임을 확인할 수 있다.


0개의 댓글