Spring Batch에서 AWS Athena Query & Secret Manager

Chans·2024년 4월 1일
0

개요

Spring Batch에서 메타 데이터를 쌓을 일이 생겼는데 대상 서비스가 Athena였다.
Spring Batch가 실행되는 인스턴스에서 타 aws 계정의 Athena접근이 되야하기에 credentials이 필요했고 해당 값은 secrets manager로 암호화된 값을 불러와 사용하도록 협의가 됐다.

비즈니스 프로세스는 다음과 같다.

  1. Athena Select
  2. Albamon DB
  3. Athena Insert

Secrets Manager 생성

Secrets Manager에서 새 보안 암호 생성은 간단했다.

보안 암호 유형은 다른 유형의 보안 암호를 선택 후
다른 계정의 credentials 값들을 넣어준다.

생성하게 되면 암호 이름과 arn이 나오게 되는데
소스에서 이름이나 arn 아무거나 사용해도 무관하다.

Depencency 정의

// aws secrets manager config
implementation 'io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config:2.4.2'

// aws athena
implementation 'software.amazon.awssdk:athena:2.25.4'코드를 입력하세요

application.yml 정의

athena 테이블 정의
athena:
  database: example_dmp_glue
  results-bucket: s3://example/convertdata/am/daily-meta
  limit: 2000
  client-execution-timeout: 60000
  retry-sleep: 1000

-- TA에서 환경별 키명으로 값을 주어서..이렇게 정의
aws:
  secrets-manager:
    user-key: ${spring.profiles.active}-key
    user-value: ${spring.profiles.active}-value

AthenaClientFactory Interface 정의

public interface AthenaClientFactory {
    AthenaClient createClient();
}

AthenaClientFactoryImpl 구현

public class AthenaClientFactoryImpl implements AthenaClientFactory {

	@Value("${aws.secrets-manager.name}")
	private String secretName;

	// secret manager 사용
	@Value("${aws.secrets-manager.user-key}")
	private String secretsUserKey;

    // secret manager 사용
	@Value("${aws.secrets-manager.user-value}")
	private String secretsUserValue;

    @Bean
	public AthenaClient createClient() {
		SecretsEntity secrets = getSecretsEntity();
		AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(secrets.getAccessKey(), secrets.getSecretKey());
		return AthenaClient.builder()
				.region(Region.AP_NORTHEAST_2)
				.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
				.build();
	}
	
    @Bean
	private AWSSecretsManager createSecretsManagerClient() {
		return AWSSecretsManagerClientBuilder
				.standard()
				.withRegion(Region.AP_NORTHEAST_2.id())
				.build();
	}
    
    private SecretsEntity getSecretsEntity() {
		GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest()
				.withSecretId(secretName);

		GetSecretValueResult getSecretValueResult;
		SecretsEntity secrets = new SecretsEntity();
		try {
			AWSSecretsManager client = createSecretsManagerClient();
			getSecretValueResult = client.getSecretValue(getSecretValueRequest);
		} catch (Exception e) {
			log.error("getSecrets : {}", e.getMessage());
			throw e;
		}

		if (getSecretValueResult.getSecretString() != null) {
			String secretValues = getSecretValueResult.getSecretString();
			Map<String, String> secretsMap;
			try {
				secretsMap = jsonToMap(secretValues);
				if (ObjectUtils.isNotEmpty(secretsMap)) {
					secrets.setAccessKey(secretsMap.get(secretsUserKey));
					secrets.setSecretKey(secretsMap.get(secretsUserValue));
				}
			} catch (JsonProcessingException e) {
				log.error("getSecrets JsonProcessingException : {}", e.getMessage());
				throw new RuntimeException(e);
			}
		}

		return secrets;
	}

    // secret manager의 키값이 고정이라면 이런식의 변환은 없어도 됨ㅠ
	public Map<String, String> jsonToMap(String json) throws JsonProcessingException {
		ObjectMapper objectMapper = new ObjectMapper();
		TypeReference<Map<String, String>> typeReference = new TypeReference<>() {};
		return objectMapper.readValue(json, typeReference);
	}

AthenaCommon 구현

@Slf4j
@Component
@RequiredArgsConstructor
public class AthenaCommon {

	private final ConfigProperties configProperties;

	public String submitAthenaQuery(AthenaClient athenaClient, String query) {
		try {
			// The QueryExecutionContext allows us to set the database
			QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
					.database(configProperties.getDatabase())
					.build();

			// The result configuration specifies where the results of the query should go
			ResultConfiguration resultConfiguration = ResultConfiguration.builder()
					.outputLocation(configProperties.getResultsBucket())
					.build();

			StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
					.queryString(query)
					.queryExecutionContext(queryExecutionContext)
					.resultConfiguration(resultConfiguration)
					.build();

			StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
			return startQueryExecutionResponse.queryExecutionId();

		} catch (AthenaException e) {
			log.error("AthenaException : {}", e.getMessage());
		}

		return "";
	}

	// Wait for an Amazon Athena query to complete, fail or to be cancelled
	public void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) {
		GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
				.queryExecutionId(queryExecutionId).build();

		GetQueryExecutionResponse getQueryExecutionResponse;
		boolean isQueryStillRunning = true;
		while (isQueryStillRunning) {
			getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
			String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
			if (queryState.equals(QueryExecutionState.FAILED.toString())) {
				throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
						.queryExecution().status().stateChangeReason());
			} else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
				throw new RuntimeException("The Amazon Athena query was cancelled.");
			} else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
				isQueryStillRunning = false;
			} else {
				// Sleep an amount of time before retrying again
				try {
					Thread.sleep(configProperties.getRetrySleep());
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			log.debug("The current status is: " + queryState);
		}
	}
}

쿼리 사용

private List<SummaryEntity> startSelectQuery(String query) {
		log.debug(String.format("Query: %s", query));

		AthenaClient athenaClient = athenaClientFactory.createClient();
		String queryExecutionId = athenaCommon.submitAthenaQuery(athenaClient, query);
		athenaCommon.waitForQueryToComplete(athenaClient, queryExecutionId);
		List<SummaryEntity> summaries = processResultRows(athenaClient, queryExecutionId);
		athenaClient.close();

		return summaries;
	}

0개의 댓글

Powered by GraphCDN, the GraphQL CDN