1. EMR
[{"classification":"spark","properties":
{"maximizeResourceAllocation":"true"}}]
또는
spark = SparkSession.builder.master("yarn").appName("dbConnectTest"). \
config("spark.maximizeResourceAllocation", "true"). \
enableHiveSupport().getOrCreate()
2.1 spark > maximizeResourceAllocation
[{"classification":"spark","properties":
{"maximizeResourceAllocation":"true"}}]
spark.default.parallelism
spark.driver.memory
spark.executor.memory
spark.executor.cores
spark.executor.instances
2.2 spark > dynamicAllocation.enabled
[{"classification":"spark","properties":
{"dynamicAllocation.enabled":"true"}}]
spark.shuffle.service.enabled (필수)
spark.dynamicAllocation.minExecutors (선택) default: 0
spark.dynamicAllocation.maxExecutors (선택) default: infinity
spark.dynamicAllocation.initialExecutors (선택) default: spark.dynamicAllocation.minExecutors 값 사용
spark.dynamicAllocation.executorAllocationRatio (선택) default: 1
https://spark.apache.org/docs/2.4.4/configuration.html#dynamic-allocation
2.3 spark > executor.heartbeatInterval
2.4 spark > network.timeout
2.5 spark > hadoop.mapreduce.fileoutputcommitter.algorithm.version
2.6 spark > speculation
2.7 spark > sql.execution.arrow.enable
2.8 spark > sql.crossJoin.enabled
3.1. livy-conf > livy.server.session.timeout
[{"classification":"livy-conf","properties":
{"livy.server.session.timeout":"12h"}}]
3.2. livy-conf > livy.impersonation.enabled
[{"classification":"livy-conf","properties":
{"livy.impersonation.enabled":"true"}}]
4.1 emrfs-site > fs.s3.maxConnections, fs.s3.maxRetries
[{"classification":"emrfs-site","properties":
{"fs.s3.maxConnections":"100","fs.s3.maxRetries":"20"}}]
5.1 presto-connector-hive > hive.metastore
[{"classification":"presto-connector-hive","properties":
{"hive.metastore":"glue"}}]
6.1 hdfs-site > dfs.webhdfs.user.provider.user.pattern
[{"classification":"hdfs-site","properties":
{"dfs.webhdfs.user.provider.user.pattern":"^[A-Za-z0-9_][A-Za-z0-9._-]*[$]?$"}}]
%pyspark
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark import SQLContext, SparkContext, SparkConf
from pyspark.sql import functions as F
import datetime, timeit
import boto3
client = boto3.client('sts')
response = client.assume_role(RoleArn='arn:aws:iam::00000000000000:role/IAM_ROLE_NAME',
RoleSessionName='LDPS-Analysis',
DurationSeconds=43200)
awsCredentials = response['Credentials']
spark = SparkSession.builder.master("yarn").appName("dbConnectTest"). \
config("spark.sql.execution.arrow.enable","true"). \
config("spark.maximizeResourceAllocation", "true"). \
config("spark.dynamicAllocation.enabled", "true"). \
config("spark.network.timeout", "10000000"). \
config("spark.executor.heartbeatInterval", "30"). \
config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2"). \
config("spark.speculation","false"). \
config("spark.shuffle.service.enabled","true"). \
config("spark.sql.crossJoin.enabled","true"). \
enableHiveSupport().getOrCreate()
hadoop_config = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_config.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoop_config.set('com.amazonaws.services.s3.enableV4', 'true')
hadoop_config.set('fs.s3a.endpoint', 's3.ap-northeast-2.amazonaws.com')
hadoop_config.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_config.set('fs.s3a.access.key', awsCredentials['AccessKeyId'])
hadoop_config.set('fs.s3a.secret.key', awsCredentials['SecretAccessKey'])
hadoop_config.set("fs.s3a.session.token", awsCredentials["SessionToken"])
sqlContext = SQLContext(spark.sparkContext)