AWS S3를 사용하기 위해 하나의 버킷을 접근가능한 IAM Access Key, Secret Key를
application.proerties
에 하드코딩으로 사용하고 있었습니다.
이번에 AWS FTR 인증을 받으면서 키값들을 Secret Manager를 통해 관리하고, 일정 주기 마다 Rotation을 진행해야 한다는 피드백을 받아 구성해본 내용을 정리해보려고 합니다.
/actuator/refresh
엔드포인트 호출여러대의 Application에게 IAM Access key 변경 이벤트를 전달할 broker를 세팅해볼 것입니다.
ec2에 kafka cluster 구성, k8s로 배포 하는 방법도 보긴 했지만, Cluster 구성 및
SASL/SCRAM
,TSL(SSL)
같은 보안 구성을 AWS 콘솔에서 좀 더 쉽게 구성해보기 위해 선택했습니다.
사실 한번 사용해보고 싶었...ㅎㅎㅎ
기본적인 생성은 링크로 대체
클러스터 생성 참고 링크
이 일감을 하면서
첫번째 난관
에 봉착했습니다.
그건 바로 클러스터를SASL/SCRAM
로 구성하고,public access
를 활성화를 하기 위해 여러번 클러스터를 만들고 지우고를 반복했다...
allow.everyone.if.no.acl.found=false
옵션 클러스터 구성에 추가true로 설정되면 슈퍼유저뿐만 아니라 모든 사용자가 리소스에 액세스
public access
를 활성화 할 수 있습니다.AmazonMSK
prefix가 필수적으로 들어갸야 합니다.{
'username' : 'msk_test_user',
'password' : 'msk_test_password'
}
topic 생성은 생략합니다.
kafka-acls.sh
를 이용해 유저마다 특정 Topic 권한을 부여 할 수 있습니다.
./kafka-acls.sh --authorizer-properties \
zookeeper.connect={사진에서_빨간_화살표_clipboard_복사_붙여넣기} \
--add --allow-principal "User:msk_test_user" \
--operation All --topic="s3.test.secret.manager" \
--group=*
s3.test.secret.manager
topic에 msk_test_user
라는 유저에게 모든 권한을 부여하는 명령어 입니다.이 부분은 제가 참고한 사이트에서
변경된 부분말 기술할 예정입니다.
Access key 주기적 변경계층 추가
AWS Lambda python3 에서 기본적으로 제공하는 library중에 kafka library를 제공하지 않아 추가해 줘야 합니다.
(Apple Silicon mac 기준입니다.)
계층 생성
mkdir python
cd python
pip3 install kafka-python --platform manylinux2014_x86_64 --target . --python-version 3.11
kafka_python
(다르게 하실분들은 다음 설정을 잘해주세요.)함수에 계층 추가
kafka_python
을 선택, 최신 버전 선택 후 추가--platform manylinux2014_x86_64
옵션을 제외하고 계속 시도를 했습니다.# import...
from datetime import datetime
from kafka import KafkaProducer
# lambda_handler...
def lambda_handler(event, context):
# ...
elif step == "finishSecret":
# finishSecret code...
finish_kafka_publish(step)
logging.info(">>> finish end")
#
def finish_kafka_publish(step):
# info
kafka_servers = [
## public msk endpoint
]
# create producer
producer = KafkaProducer(
api_version=(0, 10),
bootstrap_servers=kafka_servers,
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="msk_test_user",
sasl_plain_password="msk_test_password",
value_serializer=lambda x: dumps(x).encode('utf-8'))
message = { 'eventDateTime' : time.time(), 'step': step }
logger.info(">>> message %s", message)
# publish event
topic_name='s3.test.secret.manager'
producer.send(topic_name, value=message)
producer.flush()
logger.info(">>> end of finish_kafka_publish")
Access Key 교체 lambda 함수 마지막 스텝에
finish_kafka_publish
부분이 추가되었습니다.
키 교체 불가능
https://secretmanager.awsurl.com
이렇게 public 대역대로 호출하는 부분이 내장되어 있어 public endpoint로 구성하게 되었습니다. (이걸로 하루 소비..)<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-starter-aws-secrets-manager-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${secret-manager.kafka.bootstrap-servers:}")
private String bootstrapServers;
@Value("${secret-manager.kafka.username:}")
private String username;
@Value("${secret-manager.kafka.password:}")
private String password;
@Bean
public ConsumerFactory<String, SecretManagerDto> secretManagerConsumerFactory() {
final String HID = CommonUtils.getHID();
// Print config
log.info("Kafka Config[bootstrapServers: {}]", bootstrapServers);
log.info("Kafka Config[username: {}]", username);
log.info("Kafka Config[consumer groupId: {}]", HID);
// Setting config
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, HID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
JsonDeserializer<SecretManagerDto> deserializer = new JsonDeserializer<>(SecretManagerDto.class, false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SecretManagerDto> secretManagerKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SecretManagerDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secretManagerConsumerFactory());
return factory;
}
public enum SecretManagerRotationStep {
CREATE_SECRET("createSecret")
, SET_SECRET("setSecret")
, TEST_SECRET("testSecret")
, FINISH_SECRET("finishSecret")
;
private final String value;
SecretManagerRotationStep(String value) { this.value = value; }
@JsonValue
public String value() {
return value;
}
}
@Data
public class SecretManagerDto {
private String eventDateTime;
private SecretManagerRotationStep step;
}
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumerListener {
private final RestService restService;
@KafkaListener(topics = "${secret-manager.kafka.topic}", containerFactory = "secretManagerKafkaListenerContainerFactory")
public void receiveSecretManager(ConsumerRecord<String, SecretManagerDto> record) {
final SecretManagerDto value = record.value();
log.info(">>> receiveSecretManager step: {}, datetime: {}", value.getStep(), value.getEventDateTime());
switch (value.getStep()) {
case CREATE_SECRET: {
log.debug("CREATE_SECRET");
break;
}
case SET_SECRET: {
log.debug("SET_SECRET");
break;
}
case TEST_SECRET: {
log.debug("TEST_SECRET");
break;
}
case FINISH_SECRET: {
log.debug("FINISH_SECRET");
restService.rotationAccessKey();
break;
}
default: {
log.debug("DEFAULT");
}
}
}
}
@Slf4j
@Service
public class RestService {
@Value("${server.port}")
private String serverPort;
@Value("${secret-manager.rotation.ray.token}")
private String rotationToken;
public void rotationAccessKey() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add(JwtTokenProvider.TOKEN_HEADER_NAME, rotationToken);
URI uri = UriComponentsBuilder.fromHttpUrl("http://localhost:" + serverPort).path("/actuator/refresh").build().encode().toUri();
ResponseEntity<Object> response = RestUtils.exchange(uri, HttpMethod.POST, new HttpEntity<>(headers), Object.class);
if(response.getStatusCode() != HttpStatus.OK) {
throw new CertException(response.getStatusCode().value(), response.getStatusCode().getReasonPhrase());
}
}
}
public class AmazonConfig {
@RefreshScope
@Bean(destroyMethod = "close", initMethod = "getClient")
public S3Config s3Config() {
//...
}
}
s3Config Bean을 Refresh 하여 Application을 종료하지 않아도 변경된 Access key로 s3 client를 다시 만들도록 구성
s3 client를 사용하는 중에 refresh가 되면 s3 client destroy 함수에서 s3 client를 close 시켜 버려
http socket closed가 되버립니다. 이 때, s3에 업로드, 조회 시 400 에러가 발생하게 됩니다.
물론 0.1초에서 1초 사이 짧은 시간에 다시 빈을 생성해서 이 후 문제는 없습니다.
Spring Config의 kafka-bus를 사용할까? Topic을 하나 만들어서 할까 하는 고민을 했습니다.
저는 kafka-bus를 사용하면 application에/actuator/refresh-bus
endpoint만 불려도 나머지 application들도 같이 refresh가 된다는 것이 좋았지만, Application이 하나의 application topic을 consuming하게 만들어 이밴트 발행시 자신이 직접 자신의 설정값을 변경하도록 하고 싶었습니다.
해당 s3 iam을 사용하는 application이 추가될 때마다 lambda에서 http request 하는 코드가 추가될 것이라고 생각했기 때문에 자신이 알아서 Consuming하고 refresh하도록 하고싶었습니다.
부족한 부분, 궁금하신 점, 이렇게 구성했으면 어땠을까?, 문제점의 해결방안 등의 부분에 의견이 있다면 언제든지 피드백 주세요. 감사합니다.!