[Spark] Spark Streaming

Denver·2022년 12월 18일
0

Hadoop

목록 보기
7/9
post-thumbnail

Spark Docs에 나오는 Spark Streaming 예제
localhost:9999에서 입력받은 글자 단어 세기

0. 실행 환경

AWS EC2 t2.xlarge
OS : Red Hat 9.1
Python : 3.9
Spark : 3.3.1
Scala : 2.12.15
Java : OpenJDK 64-Bit Server VM, 1.8.0_352

1. Streaming Test

1-1. streaming.py 생성

vi streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create SparkSession
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# localhost:9999 streaming input -> Create DataFrame
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()


# Split input by " " as word
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Count words
wordCounts = words.groupBy("word").count()

# Print number of words
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()



# DataFrame으로 실행
words_df = lines_df.select(expr("explode(split(value, ' ')) as word"))
counts_df = words_df.groupBy("word").count()
word_count_query = counts_df.writeStream.format("console")\
                            .outputMode("complete")\
                            .option("checkpointLocation", ".checkpoint")\
                            .start()
word_count_query.awaitTermination()

1-2. streaming 실행

spark-submit structured_network_wordcount.py localhost 9999

1-3. Netcat실행

# 추가 세션 실행 후 명령어 입력
nc -lk 9999
# -> 글자 입력

1-4. 결과


2. readSteam Options

# socket(테스트용 : UTF-8 읽어옴. fault-tolerant 보장 x) 
readStream("socket") \
.option("host", "localhost")\
.option("port", 9999)\

# rate source(테스트용 : 초당 지정된 수 만큼 데이터 생성)

# kafka source
readStream("kafka")\
.option("subscribe", "topic1") \
.load()
 
# file source
# 지원 파일 형식 : text, csv, json, orc, parquet 
serSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory") 

Q.

  1. readstream("socket").option("host", HOST)
    HOST에 locahost말고 되는지?

  2. kafka, 파일로 테스트해보기

  3. checkpointLocation 오류 해결

word_count_query = df.writeStream.format("console")\
                            .outputMode("complete")\
                            .option("checkpointLocation", ".checkpoint")\
                            .start()

참고 자료

structured-streaming-programming-guide
structured-streaming-programming-guide - KO

profile
까먹었을 미래의 나를 위해

0개의 댓글