[Spark] EMR Configuration

yozzum·2022년 4월 1일
0

Spark

목록 보기
11/21

1. EMR

  • spark, livy, presto, emrfs-site, hdfs-site 등 설정이 가능합니다.
  • EMR 클러스터 생성 시 소프트웨어 편집 > 소프트웨어 설정 편집에서 json 형태로 넣거나, zeppelin/jupyter에서 설정할 수 있습니다.
[{"classification":"spark","properties":
	{"maximizeResourceAllocation":"true"}}]

또는

spark = SparkSession.builder.master("yarn").appName("dbConnectTest"). \
            config("spark.maximizeResourceAllocation", "true"). \
            enableHiveSupport().getOrCreate()

2.1 spark > maximizeResourceAllocation

  • executor가 클러스터의 각 노드에서 최대의 리소스를 활용하도록 합니다.
  • 이 설정을 하면 executor의 최대의 컴퓨팅, 메모리 리소스를 계산합니다.
  • 이에 대응되는 spark-default 세팅을 계산된 값에 맞추어 설정합니다.
  • 설정은 /etc/spark/conf/spark-defaults.conf에서 확인할 수 있습니다.
[{"classification":"spark","properties":
	{"maximizeResourceAllocation":"true"}}]
  • 아래는 maximizeResourceAllocation이 true일 때 자동 설정되는 spark-default 값입니다.
spark.default.parallelism 
spark.driver.memory
spark.executor.memory 
spark.executor.cores
spark.executor.instances

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation

2.2 spark > dynamicAllocation.enabled

  • 워크로드에 기반해서 executor의 수를 자동으로 늘리거나 줄여줍니다.
  • 특히 여러 어플리케이션이 클러스터 리소스를 공유하는 경우 효과적입니다.
[{"classification":"spark","properties":
	{"dynamicAllocation.enabled":"true"}}]
  • 아래는 dynamicAllocation과 연관된 설정들입니다. shuffle.service.enabled는 필수 입니다.
spark.shuffle.service.enabled (필수)
spark.dynamicAllocation.minExecutors (선택) default: 0
spark.dynamicAllocation.maxExecutors (선택) default: infinity
spark.dynamicAllocation.initialExecutors (선택) default: spark.dynamicAllocation.minExecutors 값 사용
spark.dynamicAllocation.executorAllocationRatio (선택) default: 1

https://spark.apache.org/docs/2.4.4/configuration.html#dynamic-allocation

2.3 spark > executor.heartbeatInterval

  • 드라이버는 executor가 살아있는지 수시로 체크하기 위해 heartbeat를 주기적으로 수신합니다.

2.4 spark > network.timeout

  • 값을 높임으로써 드라이버가 executor로 부터 하트비트를 수신 관련해 타임아웃을 발생시키기까지 더 많은 시간을 확보할 수 있습니다.

2.5 spark > hadoop.mapreduce.fileoutputcommitter.algorithm.version

2.6 spark > speculation

  • true로 설정하면, task가 느린 경우 자동으로 재시작합니다.

2.7 spark > sql.execution.arrow.enable

  • spark dataframe을 pandas dataframe으로 변환하여 사용하는경우 true로 설정하여 해당 작업을 위한 최적화를 합니다.

2.8 spark > sql.crossJoin.enabled

  • 쿼리가 카테시안 프로덕트를 포함하는경우, 이 설정을 true로 해야합니다. default는 false 입니다.

3.1. livy-conf > livy.server.session.timeout

  • 통신 방식 : (SparkSession → livy session) ↔ S3
  • spark session이 돌아가는 시간이 livy session timeout 보다 긴 경우에 404 session 0 not found 에러가 발생합니다.
  • 이를 방지하기 위해서 livy.session.timeout을 12시간(최대)으로 늘려 놓습니다.
[{"classification":"livy-conf","properties":
	{"livy.server.session.timeout":"12h"}}]

3.2. livy-conf > livy.impersonation.enabled

[{"classification":"livy-conf","properties":
	{"livy.impersonation.enabled":"true"}}]

4.1 emrfs-site > fs.s3.maxConnections, fs.s3.maxRetries

[{"classification":"emrfs-site","properties":
	{"fs.s3.maxConnections":"100","fs.s3.maxRetries":"20"}}]

5.1 presto-connector-hive > hive.metastore

  • 메타스토어를 glue로 설정한 경우, 해당 계정의 glue catalog database를 바라봅니다.
[{"classification":"presto-connector-hive","properties":
	{"hive.metastore":"glue"}}]

6.1 hdfs-site > dfs.webhdfs.user.provider.user.pattern

[{"classification":"hdfs-site","properties":
	{"dfs.webhdfs.user.provider.user.pattern":"^[A-Za-z0-9_][A-Za-z0-9._-]*[$]?$"}}]

%pyspark
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark import SQLContext, SparkContext, SparkConf
from pyspark.sql import functions as F
import datetime, timeit
import boto3
client = boto3.client('sts')
response = client.assume_role(RoleArn='arn:aws:iam::00000000000000:role/IAM_ROLE_NAME',
RoleSessionName='LDPS-Analysis',
DurationSeconds=43200)
awsCredentials = response['Credentials']

Spark용 S3 접근 코드

spark = SparkSession.builder.master("yarn").appName("dbConnectTest"). \
config("spark.sql.execution.arrow.enable","true"). \
config("spark.maximizeResourceAllocation", "true"). \
config("spark.dynamicAllocation.enabled", "true"). \
config("spark.network.timeout", "10000000"). \
config("spark.executor.heartbeatInterval", "30"). \
config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2"). \
config("spark.speculation","false"). \
config("spark.shuffle.service.enabled","true"). \
config("spark.sql.crossJoin.enabled","true"). \
enableHiveSupport().getOrCreate()

hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_config.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoop_config.set('com.amazonaws.services.s3.enableV4', 'true')
hadoop_config.set('fs.s3a.endpoint', 's3.ap-northeast-2.amazonaws.com')
hadoop_config.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_config.set('fs.s3a.access.key', awsCredentials['AccessKeyId'])
hadoop_config.set('fs.s3a.secret.key', awsCredentials['SecretAccessKey'])
hadoop_config.set("fs.s3a.session.token", awsCredentials["SessionToken"])
sqlContext = SQLContext(spark.sparkContext)

profile
yozzum

0개의 댓글