mongodb 설치 및 spark 연동

이상민·2023년 7월 16일
0

mongodb 설치

mongodb 공식 홈페이지
gnupg, curl 설치

$ sudo apt-get install gnupg curl

MongoDB public GPG Key import

$ curl -fsSL https://pgp.mongodb.com/server-6.0.asc | \
   sudo gpg -o /usr/share/keyrings/mongodb-server-6.0.gpg \
   --dearmor

Create the /etc/apt/sources.list.d/mongodb-org-6.0.list

$ echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-6.0.gpg ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list

ubuntu 다른 버전을 사용하면 정상적으로 다운이되지 않는다.

$ sudo apt-get update

$ sudo apt-get install -y mongodb-org

mongodb 실행

$ sudo systemctl start mongod
$ sudo systemctl daemon-reload // start에서 오류가 발생하면 사용
$ sudo systemctl status mongod // 상태 확인
$ sudo systemctl restart mongod // 재시작

$ sudo systemctl enable mongod 
// ubuntu같은 인스턴스를 종료 후 재시작하면 mongodb도 종료되는데 서버를 reboot할 때마다 mongodb도 동시에 reboot

$ mongosh

기존에 있던 zeppelin-mongodb-0.10.1.jar 파일을 대체하였고 이름을 바꿈.

conf 수정

$ sudo vi etc/mongod.conf

bindIp: 0.0.0.0으로 수정

pyspark를 통해 spark와 mongodb 연동

MongoDB Getting Started
시작부터 write, read등 참고할 문서 존재

$ ./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
              --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
              --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.0

위 명령을 실행하면 mongo-spark-connector_2.12:10.2.0을 시작으로 필요한 jar 파일을 다운받는다.
여기서 문제가 발생하는데 --master yarn(yarn cluster) 환경에서는 제대로 동작하지 않는다.
이때 다운받는 파일은 아래와 같다.
mongodb driver sync, mongodb driver core, bson, bson record codec
MongoDB Connector for Spark
spark이 3.1이상이고 mongodb가 4.0이상인 경우 spark-mongodb-connector로 10.2.0을 사용하고 이때 호환되는 파일이 4.8.2 버전이다.

spark local[*] 에서 확인

현재 movie라는 database안에 people이라는 collection이 있고 document는 아래와 같다.

movie> db.people.find()
[
  {
    _id: ObjectId("64b557991bbe86e3bfdd7afa"),
    name: 'Michael',
    salary: 3000
  },
  {
    _id: ObjectId("64b557991bbe86e3bfdd7afb"),
    name: 'Andy',
    salary: 4500
  },
  {
    _id: ObjectId("64b557991bbe86e3bfdd7afc"),
    name: 'Justin',
    salary: 3500
  },
  {
    _id: ObjectId("64b557991bbe86e3bfdd7afd"),
    name: 'Berta',
    salary: 4000
  }
]

데이터 load

/skybluelee/spark3 $ ./bin/pyspark

>>> df = spark.read.format("mongodb") \
...     .option("spark.mongodb.read.database", "movie") \
...     .option("spark.mongodb.read.collection", "people") \
...     .option("uri", "mongodb://localhost:27017/movie.people") \
...     .load()

공식 문서에서는 read.database, read.collection이 없지만 적용하지 않으면 값을 제대로 얻을 수 없다.

pymongo

kafka를 yarn cluster에서 사용중이므로 mongodb 또한 yarn cluster로 접근하려 했지만 불가능하여 pymongo를 사용한다. 환경은 zeppelin, interpreter는 default spark

Read

%pyspark
from pymongo import MongoClient

# MongoDB 클라이언트 설정
client = MongoClient('mongodb://localhost:27017')
db = client['movie']  # 사용할 MongoDB 데이터베이스 선택
collection = db['movie_data']  # 사용할 MongoDB 컬렉션 선택

# 컬렉션에서 문서 읽기
documents = collection.find()

# 문서 순회하며 데이터 출력
for document in documents:
    print(document)

Write

%pyspark
from pyspark.sql import SparkSession
from pymongo import MongoClient

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Write to MongoDB") \
    .getOrCreate()

# MongoDB connection details
mongo_host = "localhost:27017"  # Replace with the actual MongoDB server host
mongo_port = 27017  # Replace with the appropriate port number
database_name = "movie"  # Replace with the desired database name
collection_name = "people"  # Replace with the desired collection name

# Connect to MongoDB
client = MongoClient(mongo_host, mongo_port)

# Access the MongoDB database
db = client[database_name]

# Get the PySpark DataFrame
#df = spark.read.format("json").options(header=True, inferSchema=True).load("hdfs://spark-master-01:9000/skybluelee/movie-partitioned-json/YEAR=2019/MONTH=1/DAY=11_20/*.json")
df = spark.read.format("json").options(header=True, inferSchema=True).load("hdfs://spark-master-01:9000/skybluelee/data/resources/employees.json")

# Convert the DataFrame to a list of dictionaries
data = df.rdd.map(lambda row: row.asDict()).collect()

# Insert data into the MongoDB collection
db[collection_name].insert_many(data)

# Close the MongoDB connection
client.close()

# Stop the SparkSession
spark.stop()

yarn cluster 사용할 코드

실행 자체를 yarn에서 수행

from pyspark import SparkConf
conf = SparkConf()
conf.setAll([('spark.driver.extraClassPath', '/skybluelee/spark3/jars/mongo-spark-connector_2.12-10.2.0.jar:/skybluelee/spark3/jars/mongodb-driver-sync-4.8.2.jar:/skybluelee/spark3/jars/mongodb-driver-core-4.8.2.jar:/skybluelee/spark3/jars/bson-record-codec-4.8.2.jar:/skybluelee/spark3/jars/bson-4.8.2.jar')])

spark = SparkSession.builder \
    .appName("MongoDB to DataFrame") \
    .config(conf=conf) \
    .getOrCreate()

df = spark.read.format("mongodb") \
    .option("spark.mongodb.read.database", "movie") \
    .option("spark.mongodb.read.collection", "people") \
    .option("uri", "mongodb://localhost:27017/movie.people") \
    .load()
df.printSchema()
root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)

printSchema() 까지는 정상적으로 출력된다.

>>> df.show()
23/07/17 15:35:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (spark-worker-01 executor 1): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
        at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:184)
        at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:46)
        at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:144)
        at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:101)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:291)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:183)
        at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:133)
        at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:90)
        at com.mongodb.client.internal.MongoIterableImpl.cursor(MongoIterableImpl.java:95)
        at com.mongodb.spark.sql.connector.read.MongoBatchPartitionReader.getCursor(MongoBatchPartitionReader.java:108)
        at com.mongodb.spark.sql.connector.read.MongoBatchPartitionReader.next(MongoBatchPartitionReader.java:72)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

config를 yarn으로 설정

from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setAll([('spark.driver.extraClassPath', '/skybluelee/spark3/jars/mongo-spark-connector_2.12-10.2.0.jar:/skybluelee/spark3/jars/mongodb-driver-sync-4.8.2.jar:/skybluelee/spark3/jars/mongodb-driver-core-4.8.2.jar:/skybluelee/spark3/jars/bson-record-codec-4.8.2.jar:/skybluelee/spark3/jars/bson-4.8.2.jar')])

spark = SparkSession.builder \
    .appName("MongoDB to DataFrame") \
    .master("yarn") \
    .config(conf=conf) \
    .getOrCreate()

df = spark.read.format("mongodb") \
    .option("spark.mongodb.read.database", "movie") \
    .option("spark.mongodb.read.collection", "people") \
    .option("uri", "mongodb://localhost:27017/movie.people") \
    .load()
    
df.printSchema()

df.show()

0개의 댓글