[AWS] IAM Secret Key 교체 with MSK

허주환·2024년 2월 26일
0

AWS

목록 보기
1/1

AWS S3를 사용하기 위해 하나의 버킷을 접근가능한 IAM Access Key, Secret Key를
application.proerties에 하드코딩으로 사용하고 있었습니다.
이번에 AWS FTR 인증을 받으면서 키값들을 Secret Manager를 통해 관리하고, 일정 주기 마다 Rotation을 진행해야 한다는 피드백을 받아 구성해본 내용을 정리해보려고 합니다.

0.Tech

  • AWS
    • IAM
    • S3
    • Secret Manager
    • Lambda (Python 3)
    • MSK
  • Application
    • Spring boot for Java 1.8

1. Architecture

Architecture

  1. Secret Manager 에서 Key Change Event 발생
  2. Lambda 교체함수 실행 > finish step일 때 Kafka Topic에 publish
  3. 해당 Topic을 Consuming 하는 Application에서 자신의 /actuator/refresh 엔드포인트 호출
  4. 변경된 Access key 적용

2. MSK 구성

여러대의 Application에게 IAM Access key 변경 이벤트를 전달할 broker를 세팅해볼 것입니다.

MSK를 선택한 이유

ec2에 kafka cluster 구성, k8s로 배포 하는 방법도 보긴 했지만, Cluster 구성 및 SASL/SCRAM, TSL(SSL) 같은 보안 구성을 AWS 콘솔에서 좀 더 쉽게 구성해보기 위해 선택했습니다.
사실 한번 사용해보고 싶었...ㅎㅎㅎ

클러스터 생성

기본적인 생성은 링크로 대체
클러스터 생성 참고 링크

클러스터 구성 주의사항

이 일감을 하면서 첫번째 난관에 봉착했습니다.
그건 바로 클러스터를 SASL/SCRAM로 구성하고, public access를 활성화를 하기 위해 여러번 클러스터를 만들고 지우고를 반복했다...

  • allow.everyone.if.no.acl.found=false 옵션 클러스터 구성에 추가
    • 특정 리소스와 일치하는 리소스 패턴이 없으면 리소스에 연결된 ACL이 없음, true로 설정되면 슈퍼유저뿐만 아니라 모든 사용자가 리소스에 액세스
    • 유저 인증 기반에서는 이 옵션을 넣어 인증된 유저만 접근 가능하도록 해야만 public access를 활성화 할 수 있습니다.
      allow.everyone.if.no.acl.found-option

유저 추가

  • Secret Manager를 통해 유저를 추가할 수 있습니다.
    • 보안암호이름에 AmazonMSK prefix가 필수적으로 들어갸야 합니다.
      MSK_Secret_manager_ex
    • 보안 암호 값은 json 형식으로 표현하면 아래와 같습니다.
      {
        'username' : 'msk_test_user',
        'password' : 'msk_test_password'
      }
    • MSK Cluster에 여러 계정을 추가할 경우 보안 암호를 여러개 연결 시키면 됩니다.
      MSK_Add_User

MSK User Docs

유저 Topic 권한 부여

topic 생성은 생략합니다.
kafka-acls.sh 를 이용해 유저마다 특정 Topic 권한을 부여 할 수 있습니다.

./kafka-acls.sh --authorizer-properties \
zookeeper.connect={사진에서_빨간_화살표_clipboard_복사_붙여넣기} \
--add --allow-principal "User:msk_test_user" \
--operation All --topic="s3.test.secret.manager" \
--group=*

MSK_Zookeeper_connect

  • s3.test.secret.manager topic에 msk_test_user라는 유저에게 모든 권한을 부여하는 명령어 입니다.

3. Access key 주기적 교체

이 부분은 제가 참고한 사이트에서
변경된 부분말 기술할 예정입니다.
Access key 주기적 변경

계층 추가

AWS Lambda python3 에서 기본적으로 제공하는 library중에 kafka library를 제공하지 않아 추가해 줘야 합니다.
(Apple Silicon mac 기준입니다.)

  • 계층 생성

    mkdir python
    cd python
    pip3 install kafka-python --platform manylinux2014_x86_64 --target . --python-version 3.11
    • 계층 이름을 kafka_python (다르게 하실분들은 다음 설정을 잘해주세요.)
    • python 파일 압축 후 계층 생성에서 업로드 진행
      Lambda_layer_add
  • 함수에 계층 추가

    • kafka_python을 선택, 최신 버전 선택 후 추가
      Lambda_layer_apply
  • 계측 추가에서의 난관
    • Aws Lambda가 계층 생성시 x86_64기반을 선택한 것을 망각하고 현재 작업중인 노트북(Apple Silicon / M1)에서
      --platform manylinux2014_x86_64 옵션을 제외하고 계속 시도를 했습니다.
    • 저와 같은 Apple Silicon을 사용하고 계신분은 저 옵션을 필수적으로 넣어주시거나 호환 아키텍처를 잘 선택해 주세요.!!!

교체 Lambda 함수

# import... 
from datetime import datetime
from kafka import KafkaProducer

# lambda_handler...
def lambda_handler(event, context):
# ...
	elif step == "finishSecret":
    # finishSecret code...
    
    finish_kafka_publish(step)
    logging.info(">>> finish end")

#
def finish_kafka_publish(step):
    # info
    kafka_servers = [
        ## public msk endpoint
    ]
    
    # create producer
    producer = KafkaProducer(
        api_version=(0, 10),
        bootstrap_servers=kafka_servers,
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username="msk_test_user",
        sasl_plain_password="msk_test_password",
        value_serializer=lambda x: dumps(x).encode('utf-8'))
    
    message = { 'eventDateTime' : time.time(), 'step': step }
    logger.info(">>> message %s", message)
    
    # publish event
    topic_name='s3.test.secret.manager' 
    producer.send(topic_name, value=message)
    producer.flush()
    logger.info(">>> end of finish_kafka_publish")

Access Key 교체 lambda 함수 마지막 스텝에
finish_kafka_publish 부분이 추가되었습니다.

  • Lambda 함수에서의 난관
    • Lambda 함수를 private subnet으로 vpc 구성을 하고 싶어 kafka_servers도 private msk endpoint로 구성하고 키 교체 테스트를 진행
    • 이렇게 하면 키 교체 불가능
    • Lambda 키 교체 함수 중에 Secret Manager를 https://secretmanager.awsurl.com 이렇게 public 대역대로 호출하는 부분이 내장되어 있어 public endpoint로 구성하게 되었습니다. (이걸로 하루 소비..)

4. Spring boot 구성

pom.xml

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
  <groupId>io.awspring.cloud</groupId>
  <artifactId>spring-cloud-starter-aws-secrets-manager-config</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  • secret manager의 값을 가져오기 위해
    • spring-cloud-starter-aws-secrets-manager-config
  • Event Comsuming을 위해
    • org.springframework.kafka
  • Event Consuming 이후 변경된 access key를 Application에서 refresh 하도록
    • spring-boot-starter-actuator

KafkaConsumerConfig.java

@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${secret-manager.kafka.bootstrap-servers:}")
    private String bootstrapServers;
    @Value("${secret-manager.kafka.username:}")
    private String username;
    @Value("${secret-manager.kafka.password:}")
    private String password;

    @Bean
    public ConsumerFactory<String, SecretManagerDto> secretManagerConsumerFactory() {
        final String HID = CommonUtils.getHID();

        // Print config
        log.info("Kafka Config[bootstrapServers: {}]", bootstrapServers);
        log.info("Kafka Config[username: {}]", username);
        log.info("Kafka Config[consumer groupId: {}]", HID);

        // Setting config
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, HID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

        JsonDeserializer<SecretManagerDto> deserializer = new JsonDeserializer<>(SecretManagerDto.class, false);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SecretManagerDto> secretManagerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SecretManagerDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(secretManagerConsumerFactory());
        return factory;
    }

Event Enum

public enum SecretManagerRotationStep {
    CREATE_SECRET("createSecret")
    , SET_SECRET("setSecret")
    , TEST_SECRET("testSecret")
    , FINISH_SECRET("finishSecret")
    ;

    private final String value;
    SecretManagerRotationStep(String value) { this.value = value; }

    @JsonValue
    public String value() {
        return value;
    }
}

DTO

@Data
public class SecretManagerDto {
    private String eventDateTime;
    private SecretManagerRotationStep step;
}

KafkaConsumerListener.java


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumerListener {

    private final RestService restService;

    @KafkaListener(topics = "${secret-manager.kafka.topic}", containerFactory = "secretManagerKafkaListenerContainerFactory")
    public void receiveSecretManager(ConsumerRecord<String, SecretManagerDto> record) {
        final SecretManagerDto value = record.value();

        log.info(">>> receiveSecretManager step: {}, datetime: {}", value.getStep(), value.getEventDateTime());
        switch (value.getStep()) {
            case CREATE_SECRET: {
                log.debug("CREATE_SECRET");
                break;
            }
            case SET_SECRET: {
                log.debug("SET_SECRET");
                break;
            }
            case TEST_SECRET: {
                log.debug("TEST_SECRET");
                break;
            }
            case FINISH_SECRET: {
                log.debug("FINISH_SECRET");
                restService.rotationAccessKey();
                break;
            }
            default: {
                log.debug("DEFAULT");
            }
        }
    }
}

RestService.java


@Slf4j
@Service
public class RestService {
    @Value("${server.port}")
    private String serverPort;

    @Value("${secret-manager.rotation.ray.token}")
    private String rotationToken;

    public void rotationAccessKey() {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.add(JwtTokenProvider.TOKEN_HEADER_NAME, rotationToken);

        URI uri = UriComponentsBuilder.fromHttpUrl("http://localhost:" + serverPort).path("/actuator/refresh").build().encode().toUri();

        ResponseEntity<Object> response = RestUtils.exchange(uri, HttpMethod.POST, new HttpEntity<>(headers), Object.class);

        if(response.getStatusCode() != HttpStatus.OK) {
            throw new CertException(response.getStatusCode().value(), response.getStatusCode().getReasonPhrase());
        }
    }
}

AmazonConfig.java

public class AmazonConfig {
	@RefreshScope
    @Bean(destroyMethod = "close", initMethod = "getClient")
    public S3Config s3Config() {
    	//...
    }
}

s3Config Bean을 Refresh 하여 Application을 종료하지 않아도 변경된 Access key로 s3 client를 다시 만들도록 구성

5. 문제점

s3 client를 사용하는 중에 refresh가 되면 s3 client destroy 함수에서 s3 client를 close 시켜 버려
http socket closed가 되버립니다. 이 때, s3에 업로드, 조회 시 400 에러가 발생하게 됩니다.
물론 0.1초에서 1초 사이 짧은 시간에 다시 빈을 생성해서 이 후 문제는 없습니다.

6. 마무리

Spring Config의 kafka-bus를 사용할까? Topic을 하나 만들어서 할까 하는 고민을 했습니다.
저는 kafka-bus를 사용하면 application에 /actuator/refresh-bus endpoint만 불려도 나머지 application들도 같이 refresh가 된다는 것이 좋았지만, Application이 하나의 application topic을 consuming하게 만들어 이밴트 발행시 자신이 직접 자신의 설정값을 변경하도록 하고 싶었습니다.
해당 s3 iam을 사용하는 application이 추가될 때마다 lambda에서 http request 하는 코드가 추가될 것이라고 생각했기 때문에 자신이 알아서 Consuming하고 refresh하도록 하고싶었습니다.
부족한 부분, 궁금하신 점, 이렇게 구성했으면 어땠을까?, 문제점의 해결방안 등의 부분에 의견이 있다면 언제든지 피드백 주세요. 감사합니다.!

profile
Junior BE Developer

0개의 댓글