RabbitMQ 사용 시 겪은 문제

봄도둑·2022년 8월 4일
0

Spring 개인 노트

목록 보기
8/17

RabbitMQ 사용 시 겪은 에러 모음입니다. 아직 미해결된 문제도 있어 해당 문제에 대한 해결 방법도 같이 알려주시면 감사하겠습니다!

1. ListenerExecutionFailedException: Failed to convert message

📖 publisher에서 object 타입을 메시지에 담아 보낼 때 consumer에서 발생하는 에러

  • publisher에서 특정 객체를 object 타입으로 메시지에 담아 전송
  • publisher는 문제 없이 메시지를 정상적으로 발송
  • 해당 메시지를 사용하는 consumer는 이 메시지를 convert하는 과정에서 문제 발생
    • 여기서 핵심으로 봐야 할 것은 맨 첫 줄 org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
  • 이러한 현상은 consumer가 메시지에 들어 있는 object를 적합한 형태로 변환하려 했으나 적합한 클래스를 찾지 못해 convert가 불가능한 현상을 말함
  • ex) publisher에서 메시지를 보낼 때 해당 오브젝트를 TestAVo 로 보냄 → consumer에는 TestAVo 의 클래스가 없기 때문에 메시지에 들어 있는 object를 적합한 형태로 변환하지 못해 문제가 발생

1-2. 해결 방법

  • 해결 방법은 여러 가지가 있음 → MQ에서 사용할 메시지를 사용자가 임의로 지정한 공통 객체를 이용하거나(SagaEventMessage) 매번 rabbitTemplate에서 설정 변경을 통해 들어오는 object를 적합한 타입으로 변환해주어야 함

MSA에서 메시지를 주고 받을 때 공통으로 사용할 클래스 생성 => 여기서는 SagaEventMessage 클래스라는 custom class 사용


public class SagaEventMessage {
    private String messageId;
    private Map<String, Object> header = new HashMap<>();
    private Object payload;
    private String correlationId = "";
    private Object correlationData = null;

    public SagaEventMessage() {
        this.messageId = UUID.randomUUID().toString();
    }

    public SagaEventMessage messageId(String messageId) {
        this.messageId = messageId;
        return this;
    }

    public SagaEventMessage header(Map<String, Object> header) {
        this.header = header;
        return this;
    }

		//..

    public <T> T getPayloadAsType(Class<T> clazz) {
        Gson gson = new Gson();
        String payload = gson.toJson(this.payload);
        return gson.fromJson(payload, clazz);
    }

    //..

    @Override
    public String toString() {
        return "Message{" +
                "messageId='" + messageId + '\'' +
                ", header=" + header +
                ", payload=" + payload +
                ", correlationId='" + correlationId + '\'' +
                ", correlationData=" + correlationData +
                '}';
    }
}
  • 위의 객체를 주고 받는 형태로 사용하면 됨
  • 메시지를 발행할 때 메시지 발행을 전담할 publisher 클래스 생성
@Slf4j
@Component
@AllArgsConstructor
public class SagaEventPublisher {

    private final String applicationName;

    private final RabbitTemplate rabbitTemplate;

    public void send(String routingKey, String replyKey, Object payload, String correlationId, Map<String, Object> header) {
        try {
	            //parameter 값 정리 및 ack 여부 확인
            };
            
            SagaEventMessage sagaEventMessage = new SagaEventMessage().payload(payload).header(header).correlationId(id);
            rabbitTemplate.convertAndSend("command." + applicationName, routingKey, sagaEventMessage);
        } catch (AmqpException e) {
            log.error("send AmqpException");
        }
    }
		
		//메소드 오버로딩
    public void send(String routingKey, Object payload, String correlationId, Map<String, Object> header) {
        try {
	            //parameter 값 정리 및 ack 여부 확인
            };
           
            SagaEventMessage sagaEventMessage = new SagaEventMessage().payload(payload).header(header).correlationId(id);
            rabbitTemplate.convertAndSend("command." + applicationName, routingKey, sagaEventMessage);
        } catch (AmqpException e) {
            log.error("send AmqpException");
        }
    }
}
  • 이제 메시지를 발송할 때 해당 publisher의 send 메소드를 이용해 발행하면 됨 → consumer는 해당 메시지에 담겨 있는 객체가 SagaEventMessage이기 때문에 별도의 변환 과정을 거치지 않고 메시지를 convert해서 사용할 수 있음
  • 메시지를 받을 handler 클래스 생성
@Slf4j
@Component
@RequiredArgsConstructor
public class ServiceEventHandler {

    private final Gson gson;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = 큐이름, durable = "true"),
            exchange = @Exchange(value = 익스체인지이름, type = "fanout")
    ))
	public void method1(SagaEventMessage message) {
		String payload = message.getPayload().toString();
		//SagaEventMessge에 object 타입으로 넣어둔 payload(이게 본체)
		//얘를 받는 게 consumer의 목적 -> 사용할 수 있는 String 타입으로 변환해서 사용
		
	}
  • 위의 메세지를 주고 받을 공용 클래스를 사용하지 않을 거라면, 동일한 모양의 DTO를 만들어서 양 쪽 서비스가 그 DTO를 들고 있어야 함 ⇒ 그런데 MSA로 개발하게 되면 이러한 같은 모양의 DTO를 서로 통신하는 서비스들 모두가 들고 있어야 하는데 이게 맞을까??

2. Broker not available; cannot force queue declarations during start: java.io.IOException

2021-12-27 19:34:16.952  INFO 1780 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-12-27 19:34:16.982  INFO 1780 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#158e9f6e:0/SimpleConnection@6986f93e [delegate=amqp://username@127.0.0.1:5672/, localPort= 59240]

Spring boot 가동 시 rabbitMQ connection이 정상적으로 진행되지 않고 수 차례 시도하는 현상 발생 → 수차례 커넥션 시도 후 Broker not available; cannot force queue declarations during start: java.io.IOException 로그 출력

  • 원인 : 서로 다른 exchange에 동일한 routing key로 binding 되어 있는 queue가 있었음 → routing key 변경 후 해당 현상 해결

3. direct binding 문제

  • 해당 문제는 amqp의 exchange type을 제대로 이해 하지 못해 발생한 문제

현재 겪고 있는 문제

service A에서 test.exchange.publishA로 메시지 발행 → 메시지를 받아서 읽어야 하는 얘들은 service B와 C → 그런데 한 놈만 받아서 읽음

예상 : test.exchange.publishA에 하나의 큐만 바인딩 되어 있음 → B 또는 C가 해당 큐에서 메시지를 가져와 처리 → 큐에 남은 메시지가 없기 때문에 남은 녀석은 가만히 있게 됨


⇒ 큐가 하나이기 때문에 해당 큐에서 누군가 메시지를 받아서 처리하면 다른 녀석은 큐가 비게 되면서 처리 하지 못하는 멍때리는 현상 발생

해결점 :

  1. 큐를 2개로 나눈다면? ⇒ 현재 exchange type은 direct → direct에서 2개의 큐가 바인딩되어 있으면 2개의 큐에 메시지가 모두 들어갈까? 아니면 하나의 큐에만 들어가는 걸까? → direct 메시지여도 2개의 큐가 바인딩 될 수 있음

(사진 출처 : https://www.rabbitmq.com/tutorials/tutorial-four-python.html)

  • direct exchange의 경우 동일한 routing key로 여러 queue에 binding할 수 있다!

4. retry 속성 미적용 문제(미해결)

retry:
  enabled: true
  initial-interval: 3s
  max-attempts: 2
  max-interval: 6s
  multiplier: 2

retry 속성은 먹히지 않음 → why?

dead letter queue 구현 중 retry 설정 값 수정을 통해 동작해야 하는데 이 부분에서 동작하지 않고 있음

profile
배워서 내일을 위해 쓰자

0개의 댓글