Spring Batch에서 메타 데이터를 쌓을 일이 생겼는데 대상 서비스가 Athena였다.
Spring Batch가 실행되는 인스턴스에서 타 aws 계정의 Athena접근이 되야하기에 credentials이 필요했고 해당 값은 secrets manager로 암호화된 값을 불러와 사용하도록 협의가 됐다.
비즈니스 프로세스는 다음과 같다.
Secrets Manager에서 새 보안 암호 생성은 간단했다.
보안 암호 유형은 다른 유형의 보안 암호를 선택 후
다른 계정의 credentials 값들을 넣어준다.
생성하게 되면 암호 이름과 arn이 나오게 되는데
소스에서 이름이나 arn 아무거나 사용해도 무관하다.
Depencency 정의
// aws secrets manager config
implementation 'io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config:2.4.2'
// aws athena
implementation 'software.amazon.awssdk:athena:2.25.4'코드를 입력하세요
application.yml 정의
athena 테이블 정의
athena:
database: example_dmp_glue
results-bucket: s3://example/convertdata/am/daily-meta
limit: 2000
client-execution-timeout: 60000
retry-sleep: 1000
-- TA에서 환경별 키명으로 값을 주어서..이렇게 정의
aws:
secrets-manager:
user-key: ${spring.profiles.active}-key
user-value: ${spring.profiles.active}-value
AthenaClientFactory Interface 정의
public interface AthenaClientFactory {
AthenaClient createClient();
}
AthenaClientFactoryImpl 구현
public class AthenaClientFactoryImpl implements AthenaClientFactory {
@Value("${aws.secrets-manager.name}")
private String secretName;
// secret manager 사용
@Value("${aws.secrets-manager.user-key}")
private String secretsUserKey;
// secret manager 사용
@Value("${aws.secrets-manager.user-value}")
private String secretsUserValue;
@Bean
public AthenaClient createClient() {
SecretsEntity secrets = getSecretsEntity();
AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(secrets.getAccessKey(), secrets.getSecretKey());
return AthenaClient.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
.build();
}
@Bean
private AWSSecretsManager createSecretsManagerClient() {
return AWSSecretsManagerClientBuilder
.standard()
.withRegion(Region.AP_NORTHEAST_2.id())
.build();
}
private SecretsEntity getSecretsEntity() {
GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest()
.withSecretId(secretName);
GetSecretValueResult getSecretValueResult;
SecretsEntity secrets = new SecretsEntity();
try {
AWSSecretsManager client = createSecretsManagerClient();
getSecretValueResult = client.getSecretValue(getSecretValueRequest);
} catch (Exception e) {
log.error("getSecrets : {}", e.getMessage());
throw e;
}
if (getSecretValueResult.getSecretString() != null) {
String secretValues = getSecretValueResult.getSecretString();
Map<String, String> secretsMap;
try {
secretsMap = jsonToMap(secretValues);
if (ObjectUtils.isNotEmpty(secretsMap)) {
secrets.setAccessKey(secretsMap.get(secretsUserKey));
secrets.setSecretKey(secretsMap.get(secretsUserValue));
}
} catch (JsonProcessingException e) {
log.error("getSecrets JsonProcessingException : {}", e.getMessage());
throw new RuntimeException(e);
}
}
return secrets;
}
// secret manager의 키값이 고정이라면 이런식의 변환은 없어도 됨ㅠ
public Map<String, String> jsonToMap(String json) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
TypeReference<Map<String, String>> typeReference = new TypeReference<>() {};
return objectMapper.readValue(json, typeReference);
}
AthenaCommon 구현
@Slf4j
@Component
@RequiredArgsConstructor
public class AthenaCommon {
private final ConfigProperties configProperties;
public String submitAthenaQuery(AthenaClient athenaClient, String query) {
try {
// The QueryExecutionContext allows us to set the database
QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
.database(configProperties.getDatabase())
.build();
// The result configuration specifies where the results of the query should go
ResultConfiguration resultConfiguration = ResultConfiguration.builder()
.outputLocation(configProperties.getResultsBucket())
.build();
StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
.queryString(query)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
.build();
StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
return startQueryExecutionResponse.queryExecutionId();
} catch (AthenaException e) {
log.error("AthenaException : {}", e.getMessage());
}
return "";
}
// Wait for an Amazon Athena query to complete, fail or to be cancelled
public void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) {
GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryExecutionResponse getQueryExecutionResponse;
boolean isQueryStillRunning = true;
while (isQueryStillRunning) {
getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
if (queryState.equals(QueryExecutionState.FAILED.toString())) {
throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
.queryExecution().status().stateChangeReason());
} else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
throw new RuntimeException("The Amazon Athena query was cancelled.");
} else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
isQueryStillRunning = false;
} else {
// Sleep an amount of time before retrying again
try {
Thread.sleep(configProperties.getRetrySleep());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("The current status is: " + queryState);
}
}
}
쿼리 사용
private List<SummaryEntity> startSelectQuery(String query) {
log.debug(String.format("Query: %s", query));
AthenaClient athenaClient = athenaClientFactory.createClient();
String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
List<SummaryEntity> summaries = processResultRows(athenaClient, queryExecutionId);
athenaClient.close();
return summaries;
}