[Spark Structured Streaming] 은 1개의 Streaming Aggregation만 지원한다.

Woong·2022년 6월 23일
0

Apache Spark

목록 보기
16/22

개요

  • Spark Structured Streaming 에서는 여러 streaming aggregations 를 지원하지 않는다.
  • 에러 메시지
    Multiple streaming aggregations are not supported with streaming DataFrames/Datasets

무엇이 안되는 것인가?

  • 아래처럼 여러번 transform을 할 수 없다.
* Stream input 을 읽어서
* DataFrame1 로 transform 하고
* 이를 DataFrame2 로 transform   <-- 지원 X
  • 여러 종류의 aggregation 은 당연히 지원된다.
dataFrame.agg(
    functions.max("priority").alias("priority"),
    sum("cnt").alias("cnt")
)

foreachBatch 를 활용한 대응 방안

  • foreachBatch 를 통해 추가적인 aggregation 을 적용하는 방안
ds1
    .withWatermark("rdate", "5 seconds")
    .filter(col("device_id").isNotNull)
    .groupBy(window(col("rdate"), "5 seconds", "5 seconds"), col("group_id"), col("device_id"))
    .agg(
        functions.last("priority").alias("priority"),
        sum("category").alias("category")
    )
    .select("window.end", "group_id", "device_id", "priority", "category")

    .writeStream
    .foreachBatch( (batchDF: DataFrame, batchId: Long) =>
        sumGroupPriority(batchDF, batchId)
    )
    .outputMode("append")
    .start()
    .awaitTermination()
  • 2번째 transform 을 foreachBatch 에 등록할 함수에서 지정
def sumGroupPriority(df:DataFrame, batchId:Long): Unit = {
    val formatter = new SimpleDateFormat("yyyy_MM_dd")

    val connectionProperties = new Properties()
        connectionProperties.put("driver", "org.mariadb.jdbc.Driver")
        connectionProperties.put("user", "myuser")
        connectionProperties.put("password", "mypassword")

    val subDataFrame = df.groupBy("end", "group_id")
        .agg(sum("priority").alias("priority"),
        sum("category").alias("category")
        )
        .select("end", "group_id", "priority", "category")
        .withColumnRenamed("end", "rdate")

    subDataFrame.write.mode("append")
        .jdbc(
              "jdbc:mysql://localhost:3306/test",
              "test_tb_%s".format(
                  formatter.format(Calendar.getInstance().getTime)),
              connectionProperties)
}

reference

0개의 댓글