flink_project의 환경을 기준으로 실행
IDE는 IntelliJ를 사용
Job.scala 위치에 우클릭 -> New -> Scala Class -> Object 선택후 파일 이름 선택
ThisBuild / resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
)
name := "flink-project"
version := "0.1-SNAPSHOT"
organization := "org.example"
ThisBuild / scalaVersion := "2.12.7"
val flinkVersion = "1.15.2"
val json4sVersion = "4.0.6"
val awsSdkVersion = "1.12.300"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-clients" % flinkVersion,
"org.apache.flink" % "flink-connector-kafka" % flinkVersion,
"org.apache.flink" % "flink-connector-elasticsearch7" % flinkVersion,
"org.json4s" %% "json4s-jackson" % json4sVersion,
"com.amazonaws" % "aws-java-sdk-ssm" % awsSdkVersion
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies
)
assembly / mainClass := Some("org.example.Job")
// make run command include the provided dependencies
Compile / run := Defaults.runTask(Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true
// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
package org.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object KafkaExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 스트리밍 변수 생성
val kafkaSource = KafkaSource.builder()
.setBootstrapServers("localhost:29092") // bootstrap server 설정
.setTopics("flink-test") // topic 설정
.setGroupId("flink") // group id 설정
.setStartingOffsets(OffsetsInitializer.earliest()) // 데이터를 어떤 순서대로 받는지 결정
.setValueOnlyDeserializer(new SimpleStringSchema()) // streaming 데이터를 어떻게 받을지 설정
.build() // 위의 경우 string으로 받음
val inputStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
inputStream.print()
env.execute("Flink Kafka Example")
}
}
latest()의 경우 새로 들어온 메시지에 대해서만 처리. 기존에 토픽에 전송한 데이터가 나오지 않음
committedOffsets()의 경우 consumer가 정상적으로 commit이 된 시점부터 읽어옴.
우클릭 후 Run 실행
consumer가 동작
우측 상단의 빨간 네모를 클릭하여 종료
topic에 오랫동안 메시지가 전송되지 않으면 자동 종료 되기도 한다.
동일한 방식으로 Scala Class -> Object로 파일 생성
build.sbt에 dependency 추가
OpenSearch 버전을 1.3으로 낮춰야 함. 권장 버전과 호환이 되지 않음
package org.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import scala.collection.JavaConverters.mapAsJavaMap
object KafkaExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 스트리밍 변수 생성
val kafkaSource = KafkaSource.builder()
.setBootstrapServers("localhost:29092") // bootstrap server 설정
.setTopics("flink-test") // topic 설정
.setGroupId("flink") // group id 설정
.setStartingOffsets(OffsetsInitializer.earliest()) // 데이터를 어떤 순서대로 받는지 결정
.setValueOnlyDeserializer(new SimpleStringSchema()) // streaming 데이터를 어떻게 받을지 설정
.build() // 위의 경우 string으로 받음
val inputStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
inputStream.print()
val processStream = inputStream // 데이터 가공
processStream.sinkTo(
new Elasticsearch7SinkBuilder[String]
.setBulkFlushMaxActions(1) // 한번에 몇개의 데이터를 넣을지 결정
.setHosts(new HttpHost("<https://를 제외한 opensearch 도메인 엔드포인트(search...)>", 443, "https"))
.setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
indexer.add(createIndexRequest(element))
).setConnectionUsername("master")
.setConnectionPassword("passwd")
.build())
def createIndexRequest(element: (String)): IndexRequest = {
val map = Map (
"data" -> element.asInstanceOf[AnyRef]
)
Requests.indexRequest.index("flink-index").source(mapAsJavaMap(map))
}
// 실제로는 {"data": "수신 msg"} 식으로 저장될 것
env.execute("Flink Kafka2ElasticSearch Example")
}
}
GET _cat/indices?s=i:desc
green open flink-index XSYoa7s9RcuJM7SZ0aoh4A 5 1 6 0 31.5kb 15.7kb
flink-index란 collection이 추가됨.
GET flink_index/_search
{
"took": 6,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 6,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "flink-index",
"_type": "_doc",
"_id": "_RUP7oUBnp6WJWXPT4pi",
"_score": 1.0,
"_source": {
"data" : """{"num":1}"""
}
},
{
"_index": "flink-index",
"_type": "_doc",
"_id": "_RUP7oUBnp6WJWXPT4pi",
"_score": 1.0,
"_source": {
"data" : "hi"
}
},
...
]
}
}
가공하는 단계에서 문제가 발생할 때 처리하는 방식
scala class -> Trait
package org.example
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.{Collector, OutputTag}
trait Parser extends ProcessFunction[String, MSG]{ // String이 input, MSG가 output
val outputTag: OutputTag[String] = new OutputTag[String]("side-output"){}
def parse(input: String): MSG
override def processElement(input: String, context: ProcessFunction[String, MSG]#Context, collector: Collector[MSG]) = {
try{
val event = parse(input)
collector.collect(event) // 이벤트가 제대로 파싱되면 collect로 넘김
} catch { // 오류 발생시
case exception: Throwable =>
context.output(outputTag, exception.toString) // side output tag를 바탕으로 Exception 메시지 전달
}
}
}
scala class -> Case Class
dependency에 json 추가
package org.example
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods
case class MSG(num: Int)
class MessageParser extends Parser{
override def parse(input: String): MSG = { // return type이 MSG
// String으로 넘어온 json 형태의 input값을 num이란 필드가 존재하는 case class MSG로 변환
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
JsonMethods.parse(input).extract[MSG]
}
}
scala class -> Object
package org.example
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer}
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
import java.time.ZonedDateTime
import scala.collection.JavaConverters.mapAsJavaMap
object ElasticSearchSink {
def getSink = { // 정상적으로 파싱된 데이터
new Elasticsearch7SinkBuilder[MSG]
.setBulkFlushMaxActions(1) // 한번에 몇개의 데이터를 넣을지 결정
.setHosts(new HttpHost("<도메인 엔드포인트>", 443, "https"))
.setEmitter { (element: MSG, context: SinkWriter.Context, indexer: RequestIndexer) =>
val map = Map(
"data" -> element.num.asInstanceOf[AnyRef],
"event_time" -> ZonedDateTime.now().asInstanceOf[AnyRef]
)
indexer.add(Requests.indexRequest.index("flink-index").source(mapAsJavaMap(map)))
}.setConnectionUsername("master")
.setConnectionPassword("passwd")
.build()
}
def getErrorSink = { // 오류가 발생한 데이터
new Elasticsearch7SinkBuilder[String]
.setBulkFlushMaxActions(1) // 한번에 몇개의 데이터를 넣을지 결정
.setHosts(new HttpHost("<도메인 엔드포엔트>", 443, "https"))
.setEmitter { (element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
val map = Map(
"error" -> element.asInstanceOf[AnyRef],
"error_time" -> ZonedDateTime.now().asInstanceOf[AnyRef]
)
indexer.add(Requests.indexRequest.index("flink-error").source(mapAsJavaMap(map)))
}.setConnectionUsername("master")
.setConnectionPassword("passwd")
.build()
}
}
package org.example
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object Kafka2ElasticSearch {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment // 스트리밍 변수 생성
val kafkaSource = KafkaSource.builder()
.setBootstrapServers("localhost:29092") // bootstrap server 설정
.setTopics("flink-test") // topic 설정
.setGroupId("flink") // group id 설정
.setStartingOffsets(OffsetsInitializer.earliest()) // 데이터를 어떤 순서대로 받는지 결정
.setValueOnlyDeserializer(new SimpleStringSchema()) // streaming 데이터를 어떻게 받을지 설정
.build() // 위의 경우 string으로 받음
val inputStream: DataStream[String] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
inputStream.print()
val parser = new MessageParser // MSG에서 정의
val processStream = inputStream // 데이터 가공
.process(parser) // parser에서 ProcessFunction을 상속
val errorStream = processStream
.getSideOutput(parser.outputTag)
errorStream.print() // parsing이 실패한 데이터는 print
errorStream.sinkTo(ElasticSearchSink.getErrorSink)
processStream.sinkTo(ElasticSearchSink.getSink)
env.execute("Flink Kafka2ElasticSearch Example")
}
}
실행 후
2> {"num":1}
2> hi
2> com.fasterxml.jackson.coreJsonParseException: Unrecognized token 'hi': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"hi"; line: 1, column: 3]
...
json 형태로 전송된 값은 제대로 실행되지만, string으로만 보낸 'hi'는 오류가 발생하는 것을 확인
GET _cat/indices?s=i:desc
green open flink-index XSYoa7s9RcuJM7SZ0aoh4A 5 1 2 0 15.1kb 7.5kb
green open flink-error BKdsji388SSLK23kds0sAA 5 1 5 0 45.3kb 22.5kb
{
"_index": "flink-index",
"_type": "_doc",
"_id": "SDkdjfke993Dks_ID",
"_version": 1
"_score": null,
"_source": {
"data" : 2,
"event_time": "2023-08-13T08:21:16.861351+09:00"
}
},
"_field": {
"event_time": [
"2023-08-13T14:31:15.861Z"
]
},
"sort": [
1674743475861
]
}
{
"_index": "flink-error",
"_type": "_doc",
"_id": "Sslk390sdjDKkjdwSL",
"_version": 1
"_score": null,
"_source": {
"error" : "com.fasterxml.jackson.coreJsonParseException: Unrecognized token 'hi': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (String)"hi"; line: 1, column: 3]",
"error_time": "2023-08-13T08:21:17.8179421+09:00"
}
},
"_field": {
"event_time": [
"2023-08-13T15:13:42.863Z"
]
},
"sort": [
1674743475833
]
}
같은 코드를 여러번 실행 혹은 같은 메시지를 여러번 전송하더라도 한번만 저장하도록 만드는 방식
사용중인 코드의 일부분인 .setStartingOffsets(OffsetsInitializer.earliest())
를 보면 맨처음 받은 데이터를 전부 읽어옴. 만약 재배포를 실행한다면 수많은 데이터를 처음부터 다시 읽게 됨.
elasticsearch의 경우 같은 위와 같은 경우 같은 document 내용물이라도 _id
가 전부 다르게 사용됨.
object ElasticSearchSink {
def getSink = { // 정상적으로 파싱된 데이터
new Elasticsearch7SinkBuilder[MSG]
.setBulkFlushMaxActions(1) // 한번에 몇개의 데이터를 넣을지 결정
.setHosts(new HttpHost("<도메인 엔드포인트>", 443, "https"))
.setEmitter { (element: MSG, context: SinkWriter.Context, indexer: RequestIndexer) =>
val map = Map(
"data" -> element.num.asInstanceOf[AnyRef],
"event_time" -> ZonedDateTime.now().asInstanceOf[AnyRef]
)
indexer.add(Requests.indexRequest.index("flink-index")
.id(element.num.toString) // 추가
.source(mapAsJavaMap(map)))
}.setConnectionUsername("master")
.setConnectionPassword("passwd")
.build()
}
id 또한 지정하여 같은 내용이 중복되지 않도록 getSink
함수를 변경
에러를 처리하는 경우 멱등성을 필요로하지 않으므로 변경하지 않음
id 필드가 고정됨.
{
"_index": "flink-index",
"_type": "_doc",
"_id": "1",
"_version": 2
"_score": null,
"_source": {
"data" : 2,
"event_time": "2023-08-13T08:26:15.846315+09:00"
}
},
"_field": {
"event_time": [
"2023-08-13T14:31:15.861Z"
]
},
"sort": [
1674743475861
]
}
단 위의 경우 version이 올라가는 것을 확인할 수 있음. 데이터를 덮어 씌우는 것.
overwrite를 원하지 않는 경우 같은 데이터가 존재하는 경우 id를 비교하고 동일한 id라면 exception을 사용하는 방법도 있음.
object ElasticSearchSink {
def getSink = { // 정상적으로 파싱된 데이터
new Elasticsearch7SinkBuilder[MSG]
.setBulkFlushMaxActions(1) // 한번에 몇개의 데이터를 넣을지 결정
.setHosts(new HttpHost("<도메인 엔드포인트>", 443, "https"))
.setEmitter { (element: MSG, context: SinkWriter.Context, indexer: RequestIndexer) =>
val map = Map(
"data" -> element.num.asInstanceOf[AnyRef],
"event_time" -> ZonedDateTime.now().asInstanceOf[AnyRef]
)
indexer.add(Requests.indexRequest.index("flink-index")
.id(element.num.toString)
.create(true) // 추가
.source(mapAsJavaMap(map)))
}.setConnectionUsername("master")
.setConnectionPassword("passwd")
.build()
}
...ElasticSearch exception [type=version_conflict_engine_exception, reason=[1]: version conflict, document already exists (current version [3])]]
.create(true)
를 사용하면 동일한 이벤트가 색인되는 것을 막을 수 있음.
개발자로서 배울 점이 많은 글이었습니다. 감사합니다.