Spring Boot에서 Google Cloud Pub/Sub 활용

Caesars·2021년 8월 20일
1

GCP 도입

목록 보기
1/8

해당 글에서는 스프링 부트 환경에서 Google Cloud Pub/Sub 예제를 소개합니다. 발행만 하고 구독을 하는 부분은 없습니다.


Pub/Sub 이란

일반적으로 메시지를 전송할때 HTTP Request/Response 방식을 많이 사용합니다. Rest Api 등 이 대표적으로 sender가 receiver에게 직접 메시지를 전송합니다.
하지만 Pub/Sub 모델에서는 publisher는 어떤 subscriber가 있는지 모르는 상태에서 메시지를 전송하고 subscriber도 publisher에 대한 정보 없이 구독 중인 메시지만을 전송 받을 수 있습니다.
또한 한 채널에 한 수신자만 메시지를 수신하는 메시지 큐잉과 달리 Pub/Sub은 수신자 모두에게 메시지를 전송가능합니다.

Request/Response


장점

  • 심플하고 구현하기 쉽다.

단점

  • 클라이언트-서버 사이 높은 의존성

Pub/Sub


장점

  • 수신자를 확장하기 쉽다.
  • 마이크로 서비스에 적합

단점

  • 복잡도 증가

시작하기

해당 링크 에 나와있는 데로 서비스 계정을 만들고 계정 키를 만들어 다운로드 합니다.

설정

build.gradle

compile("org.springframework.cloud:spring-cloud-gcp-starter-pubsub:1.2.5.RELEASE")
compile("org.springframework.integration:spring-integration-core")

application.yml

spring:
  cloud:
      gcp:
        credentials:
            location: classpath:/gcp-dev-key.json
        project-id: *your *project *Id
        topic: *your *topic
        
spring:
  profiles: local
  cloud:
    gcp:
      credentials:
          location: file:src/main/resources/gcp-dev-key.json
      project-id: *your *project *Id
      topic: *your *topic

local 환경에서 실행할 경우에는 json파일을 resources 디렉토리 하단에 위치시키면 된다.
jar 파일을 생성해 실행할 경우에는 classpath:/ 를 붙여주면 된다.
보통 resources 디렉토리는 classpath로 지정되있을 것이다.

추가로 Spring boot 에서는 기본적으로는 네 군데의 위치에서 파일을 읽는다고 한다.

프로젝트의 루트 = /config/ file:./config/
프로젝트의 루트 = file:./
프로젝트의 클래스패스의 /config/ = classpath:/config/
프로젝트의 클래스패스 = classpath:/

코드


@Configuration
public class PubSubApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubApplication.class);

    @Value("${spring.cloud.gcp.topic}")
    private String topic;

    // Outbound channel adapter

    // tag::messageSender[]
    @Bean
    @ServiceActivator(inputChannel = "pubsubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        return new PubSubMessageHandler(pubsubTemplate, topic);
    }
    // end::messageSender[]

    // tag::messageGateway[]
    @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
    public interface PubsubOutboundGateway {
        void sendToPubsub(String text);
    }
    // end::messageGateway[]
}

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.groobee.g2stats.pubsub.PubSubApplication.PubsubOutboundGateway;

@Aspect
@Component
public class PubSubAop {

    @Autowired
    private PubsubOutboundGateway messagingGateway;

    @After("execution(* method1(..))"
            + " or execution(* method2(..))"
            + " or execution(* method3(..))")
    public void sendToTopic(JoinPoint joinPoint) throws Throwable {
        Data data = (Data) joinPoint.getArgs()[1];
        messagingGateway.sendToPubsub(data.toString());
    }
}

messageSender의 PubSubMessageHandler는 PubSubTemplate을 사용하여 pubsubOutputChannel에 메시지를 게시합니다.

메시지를 발행해야 하는 경우가 몇몇 API에서 공통으로 필요해서 해당 API에서 각각 호출하기 보다 AOP로 만들어서 처리했습니다.


DEADLINE_EXCEEDED 에러 (2022-08)

상황

Pub/Sub을 운영환경에 적용 후 때때로 DeadlineExceededException 에러가 발생하였습니다. DeadlineExceededException 은 요청 기한을 초과할 때 발생하는 에러인데 서버에서 Publisher를 통해 발송한 메시지의 응답이 5초를 초과하는 상황이었습니다.

원인

처음에는 네트워킹 이슈라고 생각했지만 비슷한 상황에 처한 사람의 의견을 보니 네트워킹 문제보다는 일시적으로 너무 많은 요청을 보내는 것이라고 판단했습니다. 실제로 모니터링 차트를 보면 초당 발송 건수가 250을 넘어설때마다 에러가 발생하고 있었습니다.

수정

직접 흐름을 제어하는 방식으로 수정하려 했기에 기존에 쓰던 PubSubTemplate이 아니라 FlowControlSettings 을 활용한 Publisher를 사용했습니다. LimitExceeded에 도달하면 더 이상 메시지를 발송하지 않고 중단하게 됩니다. 또한 네트워킹 비용을 줄이기 위해 요청을 보내기 전에 데이터를 압축하는 로직을 추가했습니다. gRPC용 Pub/Sub 압축은 Gzip 알고리즘을 사용합니다.


Publisher publisher = null;

try {
   	BatchingSettings batchingSettings = BatchingSettings.newBuilder()
		.setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(LimitExceededBehavior.Block).build())
		.build();

	publisher = Publisher.newBuilder(topic)
		.setEnableCompression(true)				// 데이터 압축
		.setCompressionBytesThreshold(10L)
		.setBatchingSettings(batchingSettings)
		.build();

    String message = this.objMapper.writeValueAsString(value);
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    publisher.publish(pubsubMessage);

	} finally {
    	if (publisher != null) {
        	publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
        }
    }

gradle

implementation 'com.google.cloud:google-cloud-pubsub:1.120.11'

참고 :

messaging-gcp-pubsub guide
Google Cloud 인증
https://docs.mulesoft.com/google-pubsub-connector/1.0/google-pubsub-connector-reference
https://cloud.google.com/pubsub/docs/publisher?hl=ko#java
https://cloud.google.com/pubsub/docs/spring

profile
잊기전에 저장

0개의 댓글