LocalStack을 활용하여 AWS SQS 사용해보기

김민규·2023년 2월 20일
1

java

목록 보기
4/7
post-thumbnail

전체 코드는 https://github.com/goldcrestwilma/localstack-sqs에서 확인할 수 있다.

LocalStack 이란?

LocalStack클라우드 애플리케이션 개발을 위한 사용하기 쉬운 테스트/모킹 프레임워크다.
실제 AWS 환경에서 제공하는 동일한 기능을 사용할 수 있다.

🛠 Configuration

우선 도커에서 SQS를 사용하기 위해 docker-compose.yml 파일에서 aws 환경을 설정한다.

docker-compose.yml

version: '3.8'
services:
  aws:
    image: 'localstack/localstack'
    container_name: 'localstack'
    environment:
      - SERVICES=sqs
      - DEFAULT_REGION=us-east-1
      - DEBUG=1
      - DATA_DIR=/tmp/localstack/data
      - AWS_ACCESS_KEY_ID=qwer
      - AWS_SECRET_ACCESS_KEY=1234
      - AWS_DEFAULT_REGION=us-east-1
    ports:
      - '4566:4566'

AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY는 임의로 지정해도 된다.
도커 실행 후 LocalStack 컨테이너는 만든다.

./gradlew composeup

LocalStack 컨테이너로 직접 접속한다.

docker exec -it localstack sh

샘플 큐를 생성한다.

aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name sample-queue.fifo --attributes FifoQueue=true

📬 Producer

메시지를 발행하는 Producer 코드를 만든다.

application.yml

cloud:
  aws:
    stack:
      auto: false #enables the automatic stack name detection for the application.
    region:
      static: us-east-1
    credentials:
      access-key: qwer
      secret-key: 1234
    end-point:
      uri: http://localhost:4566

앞서 docker-compose.yml에서 설정했던 access-key, secret-key 넣어준다.
endpoint는 로컬에서 실행하기 때문에 localhost이고 SQS 서비스 포트(4566)를 설정해준다.

SQSConfig.java

@Configuration
public class SQSConfig {

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String accessKeyId;

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretAccessKey;

    @Value("${cloud.aws.end-point.uri}")
    private String sqsUrl;

    @Primary
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(sqsUrl, region))
            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey)))
            .build();
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate() {
        return new QueueMessagingTemplate(amazonSQSAsync());
    }

    @Bean
    public ObjectMapper objectMapper() {
        return new ObjectMapper();
    }
}

AWS 접속 EndPoint와 권한(Credential) 설정을 한다.

SQSEventPublisher.java

@Slf4j
@Service
@RequiredArgsConstructor
public class SQSEventPublisher {

    private final AmazonSQS amazonSQS;
    private final ObjectMapper objectMapper;

    public void publishEvent(JsonNode message) {
        log.info("Generating event : {}", message);
        String queueUri = "http://localhost:4566/000000000000/sample-queue.fifo";
        try {
            SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUri)
                .withMessageBody(objectMapper.writeValueAsString(message))
                .withMessageGroupId("SampleMessage")
                .withMessageDeduplicationId(UUID.randomUUID().toString());
            amazonSQS.sendMessage(sendMessageRequest);
            log.info("Event has been published in SQS.");
        } catch (JsonProcessingException e) {
            log.error("JsonProcessingException e : {} and stacktrace : {}", e.getMessage(), e);
        } catch (Exception e) {
            log.error("Exception occurred while pushing event to sqs : {} and stacktrace ; {}", e.getMessage(), e);
        }
    }
}

이미 생성되어 있는 Queue(sample-queue.fifo)로 메시지를 발행하는 역할을 한다.

❗❓ 여기서 주의 사항은 withMessageGroupId() String 형태로 넣어주게 되는데 공백이 있으면 에러가 발생한다.

PublisherController.java

@RestController
@RequiredArgsConstructor
public class PublisherController {

    private final SQSEventPublisher sqsEventPublisher;

    @PostMapping("/sendMessage")
    public ResponseEntity<String> sendMessage(@RequestBody JsonNode jsonNode) {
        sqsEventPublisher.publishEvent(jsonNode);
        return ResponseEntity.ok().build();
    }
}

테스트를 위해 컨트롤러를 만들고 해당 endpoint(sendMessage)로 메시지를 보낸다. 🚀

POST http://localhost:8080/sendMessage
Content-Type: application/json

{
	"message": "hello world"
}

📭 Consumer

이번엔 발행된 메시지를 소비하는 Consumer 코드를 만들어보자.

application.yml

cloud:
  aws:
    stack:
      auto: false
    region:
      static: us-east-1
      auto: false
    credentials:
      access-key: qwer
      secret-key: 1234
    queue:
      name: sample-queue.fifo
      uri: http://localhost:4566

server:
  port: 8081

logging:
  level:
    com.amazonaws.util.EC2MetadataUtils: error

Producer와 동시에 실행하기 때문에 포트가 겹칠수도 있어서 별도로 포트를 지정한다.

logging.level.com.amazonaws.util.EC2MetadataUtils: error

  • AWS EC2 인스턴스에서 구동하지 않을 경우 SdkClientException 예외가 발생하는데 이를 무시하기 위한 옵션이다.

SQSConfig.java

@Configuration
public class SQSConfig {

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String accessKeyId;

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretAccessKey;

    @Value("${cloud.aws.queue.uri}")
    private String sqsUrl;

    @Primary
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(sqsUrl, region))
            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey)))
            .build();
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate() {
        return new QueueMessagingTemplate(amazonSQSAsync());
    }

    @Bean
    protected MessageConverter messageConverter(ObjectMapper objectMapper) {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(objectMapper);
        converter.setSerializedPayloadClass(String.class);
        converter.setStrictContentTypeMatch(false);
        return converter;
    }
}

MessageConverter: 테스트 편의상 큐로부터 수신받는 메시지를 String 형태로 변환하기 위해 사용했다.

SQSListener.java

@Slf4j
@Service
public class SQSListener {

    @SqsListener(value = "${cloud.aws.queue.name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void receiveMessage(@Payload String payload, @Headers Map<String, String> headers, Acknowledgment acknowledgment) {
        log.info("payload: {}", payload);
        log.info("headers {}", headers);
        acknowledgment.acknowledge();
    }
}

@Payload를 String 형태로 받을 수 있다.


참조

profile
Backend Engineer, Vim User

0개의 댓글