전체 코드는 https://github.com/goldcrestwilma/localstack-sqs에서 확인할 수 있다.
LocalStack은 클라우드 애플리케이션 개발을 위한 사용하기 쉬운 테스트/모킹 프레임워크다.
실제 AWS 환경에서 제공하는 동일한 기능을 사용할 수 있다.
우선 도커에서 SQS를 사용하기 위해 docker-compose.yml
파일에서 aws 환경을 설정한다.
version: '3.8'
services:
aws:
image: 'localstack/localstack'
container_name: 'localstack'
environment:
- SERVICES=sqs
- DEFAULT_REGION=us-east-1
- DEBUG=1
- DATA_DIR=/tmp/localstack/data
- AWS_ACCESS_KEY_ID=qwer
- AWS_SECRET_ACCESS_KEY=1234
- AWS_DEFAULT_REGION=us-east-1
ports:
- '4566:4566'
AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
는 임의로 지정해도 된다.
도커 실행 후 LocalStack 컨테이너는 만든다.
./gradlew composeup
LocalStack 컨테이너로 직접 접속한다.
docker exec -it localstack sh
샘플 큐를 생성한다.
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name sample-queue.fifo --attributes FifoQueue=true
메시지를 발행하는 Producer 코드를 만든다.
cloud:
aws:
stack:
auto: false #enables the automatic stack name detection for the application.
region:
static: us-east-1
credentials:
access-key: qwer
secret-key: 1234
end-point:
uri: http://localhost:4566
앞서 docker-compose.yml
에서 설정했던 access-key
, secret-key
넣어준다.
endpoint
는 로컬에서 실행하기 때문에 localhost
이고 SQS 서비스 포트(4566
)를 설정해준다.
@Configuration
public class SQSConfig {
@Value("${cloud.aws.region.static}")
private String region;
@Value("${cloud.aws.credentials.access-key}")
private String accessKeyId;
@Value("${cloud.aws.credentials.secret-key}")
private String secretAccessKey;
@Value("${cloud.aws.end-point.uri}")
private String sqsUrl;
@Primary
@Bean
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(sqsUrl, region))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey)))
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSQSAsync());
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}
AWS 접속 EndPoint
와 권한(Credential
) 설정을 한다.
@Slf4j
@Service
@RequiredArgsConstructor
public class SQSEventPublisher {
private final AmazonSQS amazonSQS;
private final ObjectMapper objectMapper;
public void publishEvent(JsonNode message) {
log.info("Generating event : {}", message);
String queueUri = "http://localhost:4566/000000000000/sample-queue.fifo";
try {
SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUri)
.withMessageBody(objectMapper.writeValueAsString(message))
.withMessageGroupId("SampleMessage")
.withMessageDeduplicationId(UUID.randomUUID().toString());
amazonSQS.sendMessage(sendMessageRequest);
log.info("Event has been published in SQS.");
} catch (JsonProcessingException e) {
log.error("JsonProcessingException e : {} and stacktrace : {}", e.getMessage(), e);
} catch (Exception e) {
log.error("Exception occurred while pushing event to sqs : {} and stacktrace ; {}", e.getMessage(), e);
}
}
}
이미 생성되어 있는 Queue(sample-queue.fifo
)로 메시지를 발행하는 역할을 한다.
❗❓ 여기서 주의 사항은
withMessageGroupId()
String 형태로 넣어주게 되는데 공백이 있으면 에러가 발생한다.
@RestController
@RequiredArgsConstructor
public class PublisherController {
private final SQSEventPublisher sqsEventPublisher;
@PostMapping("/sendMessage")
public ResponseEntity<String> sendMessage(@RequestBody JsonNode jsonNode) {
sqsEventPublisher.publishEvent(jsonNode);
return ResponseEntity.ok().build();
}
}
테스트를 위해 컨트롤러를 만들고 해당 endpoint(sendMessage
)로 메시지를 보낸다. 🚀
POST http://localhost:8080/sendMessage
Content-Type: application/json
{
"message": "hello world"
}
이번엔 발행된 메시지를 소비하는 Consumer
코드를 만들어보자.
cloud:
aws:
stack:
auto: false
region:
static: us-east-1
auto: false
credentials:
access-key: qwer
secret-key: 1234
queue:
name: sample-queue.fifo
uri: http://localhost:4566
server:
port: 8081
logging:
level:
com.amazonaws.util.EC2MetadataUtils: error
Producer와 동시에 실행하기 때문에 포트가 겹칠수도 있어서 별도로 포트를 지정한다.
logging.level.com.amazonaws.util.EC2MetadataUtils
: error
SdkClientException
예외가 발생하는데 이를 무시하기 위한 옵션이다.@Configuration
public class SQSConfig {
@Value("${cloud.aws.region.static}")
private String region;
@Value("${cloud.aws.credentials.access-key}")
private String accessKeyId;
@Value("${cloud.aws.credentials.secret-key}")
private String secretAccessKey;
@Value("${cloud.aws.queue.uri}")
private String sqsUrl;
@Primary
@Bean
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(sqsUrl, region))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretAccessKey)))
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSQSAsync());
}
@Bean
protected MessageConverter messageConverter(ObjectMapper objectMapper) {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
converter.setSerializedPayloadClass(String.class);
converter.setStrictContentTypeMatch(false);
return converter;
}
}
MessageConverter: 테스트 편의상 큐로부터 수신받는 메시지를 String 형태로 변환하기 위해 사용했다.
@Slf4j
@Service
public class SQSListener {
@SqsListener(value = "${cloud.aws.queue.name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receiveMessage(@Payload String payload, @Headers Map<String, String> headers, Acknowledgment acknowledgment) {
log.info("payload: {}", payload);
log.info("headers {}", headers);
acknowledgment.acknowledge();
}
}
@Payload
를 String 형태로 받을 수 있다.