[마이크로서비스] 비동기 메시지 통신 Spring Cloud Stream, 레디스 (9)

hyeokjin·2022년 8월 16일
0

microservice

목록 보기
9/13
post-thumbnail

스프링 클라우드 스트림

스프링 클라우드 스트림은 경량 메시지 처리 기능을 마이크로서비스에 쉽게 통합하는 기술이며,
애플리케이션에서 발생하는 비동기 이벤트를 사용하는 지능형 마이크로서비스를 구축할 수 있다.
또한 RabbitMQ와 카프카 같은 메시지 브로커와 마이크로서비스를 빠르게 통합할 수 있다.

비동기 메시지 통신 카프카에 자세히 알고 싶다면 아래 해당 글을 참고하자.
카프카와 스프링 클라우드 스트림 관련하여 정리되어있다.

카프카, 데이터 플랫폼의 최강자(정리) + Spring Cloud Stream 연동

이번 시간에는 라이선싱 서비스에서 조직 서비스 데이터베이스에 접근할 때 캐싱을 하는 레디스를 두고 라이선싱 서비스와 조직 서비스간에는 카프카를 이용해 비동기 메시지 통신을 구현하고자 한다.

여기서 레디스는 특정 라이선스와 연관된 조직 데이터에 대해 분산 레디스 캐시를 확인한다. 캐시에 없다면 조직 서비스를 호출하고 결과를 레디스 해시에 캐싱한다.

분산 캐시로 레디스를 사용하는 것은 다음과 같은 장점이 있다.

  • 일반적으로 보유한 데이터 검색 성능을 향상 시킨다 : 캐시를 사용해 데이터베이스에서 읽어 오지 않고도 일부 주요 서비스의 성능을 크게 향상시킬 수 있다.

  • 데이터를 보유한 데이터베이스 테이블에 대한 부하를 줄인다 : 레디스 서버를 사용하면 데이터베이스에 액세스하는 대신 기본 키로 읽기를 구현할 수 있으므로 비용 효율이 훨씬 더 높다

  • 주 데이터 저장소 또는 데이터베이스에 성능 문제가 있다면 정상적으로 저하될 수 있도록 회복력을 높인다 : 캐시에 보관하는 데이터양에 따라 캐싱 솔루션은 데이터 저장소에 액세스할 때 발생할 수 있는 오류 수를 줄이는 데 도움이 될 수 있다.

    	메시징을 사용한 서비스 간 상태 변화 전달

아래는 진행하고자하는 서비스들의 소스를 참고할 깃헙 주소이다.

https://github.com/hyeokjinON/microservice_study/tree/master/chapter10

아파치 카프카 및 레디스 도커 구성

메시지 생산자의 도커 환경에 카프카 및 레디스 서비스를 추가 먼저 하자.
docker-compose.yml 내용은 다음과 같다.

docker-compose.yml

version: '2.1'
services:
  database:
    image: postgres:latest
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "postgres"
      POSTGRES_DB:       "ostock_dev"
    volumes:
        - ./init.sql:/docker-entrypoint-initdb.d/1-init.sql
        - ./data.sql:/docker-entrypoint-initdb.d/2-data.sql
    networks:
      backend:
        aliases:
          - "database"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
  configserver:
    image: ostock/configserver:0.0.1-SNAPSHOT
    ports:
       - "8071:8071"
    environment:
      ENCRYPT_KEY: "fje83Ki8403Iod87dne7Yjsl3THueh48jfuO9j4U2hf64Lo"
    networks:
      backend:
        aliases:
          - "configserver"
  eurekaserver:
    image: ostock/eurekaserver:0.0.1-SNAPSHOT
    ports:
      - "8070:8070"
    depends_on:
      database:
        condition: service_healthy
      configserver:
        condition: service_started  
    networks:
      backend:
        aliases:
          - "eurekaserver"
  gatewayserver:
    image: ostock/gatewayserver:0.0.1-SNAPSHOT
    ports:
      - "8072:8072"
    environment:
      PROFILE: "default"
      SERVER_PORT: "8072"
      CONFIGSERVER_URI: "http://configserver:8071"
      EUREKASERVER_URI: "http://eurekaserver:8070/eureka/"
      EUREKASERVER_PORT: "8070"
      CONFIGSERVER_PORT: "8071"
    depends_on:
      database:
        condition: service_healthy
      configserver:
        condition: service_started
      eurekaserver:
        condition: service_started
    networks:
      backend:
        aliases:
          - "gateway"
# 추가된 부분은 다음과 같다 start
  zookeeper:
    image: zookeeper:3.7.0
    ports:
      - 2181:2181
    networks:
      backend:
        aliases:
          - "zookeeper"
  kafkaserver:
    image: wurstmeister/kafka:latest
    ports:
      - 9092:9092
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1,ratings:1:1
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper
    networks:
      backend:
        aliases:
          - "kafka"
  redisserver:
    image: redis:alpine
    ports:
      - 6379:6379
    networks:
      backend:
        aliases:
          - "redis"
# 추가된 부분은 다음과 같다 end
  licensingservice:
    image: ostock/licensing-service:0.0.3-SNAPSHOT
    environment:
      PROFILE: "dev"
      CONFIGSERVER_URI: "http://configserver:8071"
      CONFIGSERVER_PORT:   "8071"
      DATABASESERVER_PORT: "5432"
      ENCRYPT_KEY:       "IMSYMMETRIC"
    depends_on:
      database:
        condition: service_healthy
      configserver:
        condition: service_started
      kafkaserver:
        condition: service_started
    ports:
      - "8080:8080"
    networks:
      - backend
  organizationservice:
    image: ostock/organization-service:0.0.1-SNAPSHOT
    environment:
      PROFILE: "dev"
      CONFIGSERVER_URI: "http://configserver:8071"
      CONFIGSERVER_PORT:   "8071"
      DATABASESERVER_PORT: "5432"
      ENCRYPT_KEY:       "IMSYMMETRIC"
    depends_on:
      database:
        condition: service_healthy
      configserver:
        condition: service_started
      kafkaserver:
        condition: service_started
    ports:
      - "8081:8081"
    networks:
      - backend
    

networks:
  backend:
    driver: bridge
 

조직 서비스에서 메시지 생산자 작성

pom.xml에 의존성 추가를 한다

pom.xml

		<dependency>
	    	<groupId>org.springframework.cloud</groupId>
	    	<artifactId>spring-cloud-stream</artifactId>
        </dependency>
          <dependency>
	    	<groupId>org.springframework.cloud</groupId>
	    	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

메이븐 의존성을 정의 후, 애플리케이션 스프링 클라우드 스트림의 메시지 브로커와 바인딩하도록 지정한다. @EnableBinding 애너테이션을 추가한다.

OrganizationServiceApplication.java

package com.optimagrowth.organization;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@RefreshScope
// 스프링 클라우드 스트림이 메시지 브로커에 애플리케이션을 바인딩하도록 지정한다.
@EnableBinding(Source.class)
public class OrganizationServiceApplication {
	
	public static void main(String[] args) {
		SpringApplication.run(OrganizationServiceApplication.class, args);
	}

}

메시지 발행하는 코드를 구현하기 첫 번째 단계로
UserContext 변수를 ThreadLocal로 만든다.

UserContext.java

package com.optimagrowth.organization.utils;

import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;

@Component
public class UserContext {
    public static final String CORRELATION_ID = "tmx-correlation-id";
    public static final String AUTH_TOKEN     = "Authorization";
    public static final String USER_ID        = "tmx-user-id";
    public static final String ORG_ID         = "tmx-org-id";

    private static final ThreadLocal<String> correlationId= new ThreadLocal<String>();
    private static final ThreadLocal<String> authToken= new ThreadLocal<String>();
    private static final ThreadLocal<String> userId = new ThreadLocal<String>();
    // 변수를 ThreadLocal로 저장하면 현재 스레드에 대한 데이터를 스레드별로 저장할 수 있다. 여기에 설정된 정보는 그 값을 설정한 스레드만 읽을 수 있다.
    private static final ThreadLocal<String> orgId = new ThreadLocal<String>();


    public static String getCorrelationId() { return correlationId.get(); }
    public static void setCorrelationId(String cid) {correlationId.set(cid);}

    public static String getAuthToken() { return authToken.get(); }
    public static void setAuthToken(String aToken) {authToken.set(aToken);}

    public static String getUserId() { return userId.get(); }
    public static void setUserId(String aUser) {userId.set(aUser);}

    public static String getOrgId() { return orgId.get(); }
    public static void setOrgId(String aOrg) {orgId.set(aOrg);}

    public static HttpHeaders getHttpHeaders(){
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.set(CORRELATION_ID, getCorrelationId());

        return httpHeaders;
    }

}

SimpleSourceBean.java는 메시지를 발행하는 로직을 만든다

SimpleSourceBean.java

package com.optimagrowth.organization.events.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import com.optimagrowth.organization.events.model.OrganizationChangeModel;
import com.optimagrowth.organization.utils.ActionEnum;
import com.optimagrowth.organization.utils.UserContext;

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

	// 서비스에서 사용되는 Source 인터페이스 구현체를 주입한다.
    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(ActionEnum action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action.toString(),
                organizationId,
                UserContext.getCorrelationId()); // 자바 POJO 메시지를 발행한다.

		// Source 클래스에서 정의된 채널에서 전달된 메시지를 발송한다.
        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

매개변수로 사용된 ActionEnum.java는 다음과 같다

ActionEnum.java

package com.optimagrowth.organization.utils;

public enum ActionEnum {
   GET,
   SAVE,
   UPDATED,
   DELETED
}

실제 메시지 발행은 publishOrganizationChange() 메서드에서 수행된다
이 메서드에서 OrganizationChangeModel를 살펴보자

OrganizationChangeModel.java

package com.optimagrowth.organization.events.model;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter @Setter @ToString
public class OrganizationChangeModel {
	private String type;
	private String action;
	private String organizationId;
	private String correlationId;

	public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
		super();
		this.type = type;
		this.action = action;
		this.organizationId = organizationId;
		this.correlationId = correlationId;
	}
}

서비스의 스프링 클라우드 스트림 Source를 카프카 메시지 브로커와 메시지 토픽에 매핑하는 구성을 설정하기 위해 configServer 구성정보 프로퍼티에서 스프링 클라우드 스트림을 구성한다.

organization-service.properties

# 메시지를 작성할 메시지 큐 또는 토픽이름이다.
spring.cloud.stream.bindings.output.destination=orgChangeTopic
# 송수신할 메시지 타입이다
spring.cloud.stream.bindings.output.content-type=application/json
# 이 프로퍼티들은 카프카와 주키퍼의 네트워크 위치를 제공한다( 로컬에서 실행되도록 /etc/hosts 파일에 127.0.0.1 kafka를 정의하거나 localhost를 사용한다)
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

조직 서비스에서 메시지 발행하는 서비스를 작성하자

OrganizationService.java

package com.optimagrowth.organization.service;

import java.util.Optional;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.optimagrowth.organization.events.source.SimpleSourceBean;
import com.optimagrowth.organization.model.Organization;
import com.optimagrowth.organization.repository.OrganizationRepository;
import com.optimagrowth.organization.utils.ActionEnum;

@Service
public class OrganizationService {
	
	private static final Logger logger = LoggerFactory.getLogger(OrganizationService.class);
	
    @Autowired
    private OrganizationRepository repository;
    
    // 조직 서비스에 SimpleSourceBean을 주입하려고 자동연결 한다.
    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization findById(String organizationId) {
    	Optional<Organization> opt = repository.findById(organizationId);
    	simpleSourceBean.publishOrganizationChange(ActionEnum.GET, organizationId);
        return (opt.isPresent()) ? opt.get() : null;
    }	

    public Organization create(Organization organization){
    	organization.setId( UUID.randomUUID().toString());
        organization = repository.save(organization);
        // 조직 데이터를 변경하는 모든 메서드는 publishOrganizationChange() 메서드를 호출한다.
        simpleSourceBean.publishOrganizationChange(ActionEnum.CREATED, organization.getId());
        return organization;

    }

    public void update(Organization organization){
    	repository.save(organization);
        simpleSourceBean.publishOrganizationChange(ActionEnum.UPDATED, organization.getId());
    }

    public void delete(String organizationId){
    	repository.deleteById(organizationId);
    	simpleSourceBean.publishOrganizationChange(ActionEnum.DELETED, organizationId);
    }
    
    @SuppressWarnings("unused")
	private void sleep(){
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
		}
	}
}

라이선싱 서비스에서 메시지 소비자 작성

조직 서비스가 조직 데이터를 변경할 때마다 카프카에 메시지를 발행하도록 조직 서비스를 수정 했다. 이제 스프링 클라우드 스트림을 사용하는 서비스가 어떻게 소비하는지 살펴보자.

라이선싱 서비스를 수정한다.

pom.xml

		<dependency>
	    	<groupId>org.springframework.cloud</groupId>
	    	<artifactId>spring-cloud-stream</artifactId>
        </dependency>
          <dependency>
	    	<groupId>org.springframework.cloud</groupId>
	    	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

licensing-service.properties


# 입력채널을 orgChangesTopic 큐에 매핑한다.
spring.cloud.stream.bindings.inboundOrgChanges.destination= orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type= application/json
# 서비스별로 한번에 처리 하려고 group 프로퍼티를 사용한다.
spring.cloud.stream.bindings.inboundOrgChanges.group= licensingGroup
spring.cloud.stream.kafka.binder.zkNodes= kafka
spring.cloud.stream.kafka.binder.brokers=kafka

LicenseServiceApplication.java

// 유입되는 메시지를 수신하고자 Sink 인터페이스에 정의된 채널을 사용하도록 서비스를 설정한다.
@EnableBinding(Sink.class)
public class LicenseServiceApplication {
	// 입력 채널에서 메시지를 받을 때마다 이 메서드를 실행한다.
	@StreamListener(Sink.INPUT)
	public void loggerSink(OrganizationChangeModel orgChange) {
		logger.debug("Received {} event for the organization id {}", orgChange.getAction(), orgChange.getOrganizationId());
	}
}

postman을 이용하여 살펴보자

http://localhost:8072/organization/v1/organization/
엔드포인트에서 다음 메시지 내용을 POST 호출 해본다

body

{
  "name":"Ostock",
  "contactName":"illary Huaylupo",
  "contactEmail":"illaryhs@gmail.com",
  "contactPhone":"888888888"
}

로그를 확인해 보자

조직 서비스 카프카 메시지를 전송했음을 나타내는 조직 서비스의 로그 메시지와
SAVE 이벤트 메시지를 수신했음을 나타내는 라이선싱 서비스 로그 메시지를 볼 수 있다.

organizationservice_1  | 2022-08-16 13:48:37.236 DEBUG 1 --- [nio-8081-exec-4] c.o.o.events.source.SimpleSourceBean     : Sending Kafka message CREATED for Organization Id: b23eb9f2-2b0d-46dc-a0e7-ecdfb0017fcf

licensingservice_1     | 2022-08-16 13:48:37.279 DEBUG 1 --- [container-0-C-1] c.o.l.e.h.OrganizationChangeHandler      : Received SAVE event for the organization id b23eb9f2-2b0d-46dc-a0e7-ecdfb0017fcf

캐시 검색을 위한 레디스

이제 여기서 레디스를 이용한 분산 캐싱 예제를 구축해보자.
라이선싱 서비스는 특정 라이선스와 연관된 조직 데이터에 대해 분산 레디스 캐시를 확인한다.
캐시에 없다면 조직 서비스를 호출하고 결과를 레디스 해시에 캐싱한다.

먼저 스프링 데이터 레디스 의존성으로 라이선싱 서비스를 구성한다.

pom.xml

       <dependency>
		    <groupId>org.springframework.data</groupId>
		    <artifactId>spring-data-redis</artifactId>
		 </dependency>
		<dependency>
		    <groupId>redis.clients</groupId>
		    <artifactId>jedis</artifactId>
		    <type>jar</type>
		</dependency>

레디스 서버에 대한 데이터베이스 커넥션을 설정한다.

LicenseServiceApplication.java

	// 레디스 서버에 대한 데이터베이스 커넥션을 설정
	@Bean
	JedisConnectionFactory jedisConnectionFactory() {
		String hostname = serviceConfig.getRedisServer();
		int port = Integer.parseInt(serviceConfig.getRedisPort());
	    RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(hostname, port);
	    //redisStandaloneConfiguration.setPassword(RedisPassword.of("yourRedisPasswordIfAny"));
	    return new JedisConnectionFactory(redisStandaloneConfiguration);
	}

	// 레디스 서버에 액션을 실행할 RedisTemplate을 생성한다.
	@Bean
	public RedisTemplate<String, Object> redisTemplate() {
		RedisTemplate<String, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(jedisConnectionFactory());
		return template;
	}

라이선싱 서비스를 위한 구성 파일에 정의할 사용자 지정 매개변수를 추출하는 클래스를 추가한다.

ServiceConfig.java

package com.optimagrowth.license.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import lombok.Getter;

@Component @Getter
public class ServiceConfig{

  @Value("${redis.server}")
  private String redisServer="";

  @Value("${redis.port}")
  private String redisPort="";
  
}

컨피그 서비스 구성정보 저장소에서 레디스 서버의 호스트와 포트를 정의한다.

licensing-service.properties

redis.server = redis
redis.port = 6379

레디스와 통신하도록 라이선싱 서비스를 설정하는 기본 작업을 완료했다.

🎈 레디스 구성 설정에서 다음과 같은 커넥션 설정도 지정할 수도 있다. 구성 설정에 대해 더 궁금하다면 추가로 검색해보자.

레디스 저장소 정의

레디스는 크고 분산된 인메모리 해시맵과 같은 역할을 하는 키-값 데이터 저장소다.
레디스 저장소에 액세스하려고 스프링 데이터를 사용하기 때문에 리포지터리 클래스를 정의해야 한다.

CrudRepository에서 확장한 OrganizationRedisRepository는 레디스에서 데이터를 저장하고 조회하는 모든 CRUD 로직을 포함한다.

OrganizationRedisRepository.java

package com.optimagrowth.license.repository;

import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import com.optimagrowth.license.model.Organization;

@Repository
public interface OrganizationRedisRepository extends CrudRepository<Organization,String>  {
}

리포지터리에서 사용할 모델을 만든다.

레디스 서버에 여러 해시와 데이터 구조가 포함될 수 있다.
따라서 레디스와 통신할 때마다 작업을 수행할 데이터 구조의 이름을 레디스에 전달해야 한다.

Organization.java

package com.optimagrowth.license.model;

import javax.persistence.Id;

import org.springframework.data.redis.core.RedisHash;
import org.springframework.hateoas.RepresentationModel;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter @Setter @ToString
// 조직 데이터가 저장될 레디스 서버의 해시이름을 설정한다.
@RedisHash("organization")
public class Organization extends RepresentationModel<Organization> {

	@Id
	String id;
    String name;
    String contactName;
    String contactEmail;
    String contactPhone;
    
}

이제 조직 데이터가 필요할 때 마다 호출되기 전에 레디스 캐시를 먼저 확인하도록 서비스를 수정하자.

코드의 위치는 조직 서비스를 호출하기 위해 RestTemplate 를 사용하는 OrganizationRestTemplateClient.java 파일이다

OrganizationRestTemplateClient.java

package com.optimagrowth.license.service.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import com.optimagrowth.license.model.Organization;
import com.optimagrowth.license.repository.OrganizationRedisRepository;
import com.optimagrowth.license.utils.UserContext;

@Component
public class OrganizationRestTemplateClient {
	@Autowired
	RestTemplate restTemplate;

	// OrganizationRestTemplateClient에서 OrganizationRedisRepository를 자동연결한다.
	@Autowired
	OrganizationRedisRepository redisRepository;

	private static final Logger logger = LoggerFactory.getLogger(OrganizationRestTemplateClient.class);

	public Organization getOrganization(String organizationId){
		logger.debug("In Licensing Service.getOrganization: {}", UserContext.getCorrelationId());

		// 레디스에 데이터 존재유무 체크한다.
        Organization organization = checkRedisCache(organizationId);

		// 레디스에서 조직데이터 조회가 되지 않는다면 조직 서비스를 호출하여 원본 조직 데이터베이스에서 데이터를 조회하고 cacheOrganizationObject()을 통해 레디스에 저장한다.
        if (organization != null){
            logger.debug("I have successfully retrieved an organization {} from the redis cache: {}", organizationId, organization);
            return organization;
        }

        logger.debug("Unable to locate organization from the redis cache: {}.", organizationId);
        
		ResponseEntity<Organization> restExchange =
				restTemplate.exchange(
						"http://gateway:8072/organization/v1/organization/{organizationId}",
						HttpMethod.GET,
						null, Organization.class, organizationId);
		
		/*Save the record from cache*/
        organization = restExchange.getBody();
        if (organization != null) {
            cacheOrganizationObject(organization);
        }

		return restExchange.getBody();
	}

	// 레디스에서 조직 ID로 Organization클래스를 조회하고 없으면 null을 반환한다(orElse(null))
	private Organization checkRedisCache(String organizationId) {
		try {
			return redisRepository.findById(organizationId).orElse(null);
		}catch (Exception ex){
			logger.error("Error encountered while trying to retrieve organization {} check Redis Cache.  Exception {}", organizationId, ex);
			return null;
		}
	}
	
	private void cacheOrganizationObject(Organization organization) {
        try {
        	// 레디스에서 조직데이터를 저장한다.
        	redisRepository.save(organization);
        }catch (Exception ex){
            logger.error("Unable to cache organization {} in Redis. Exception {}", organization.getId(), ex);
        }
    }
}

postman을 이용하여 로깅 메시지를 살펴보자

http://localhost:8072/license/v1/organization/e839ee96-28de-4f67-bb79-870ca89743a0/license/279709ff-e6d5-4a54-8b55-a5c37542025b

GET 요청을 보내면 로그에서 두 개의 출력 결과를 볼 수 있다.

첫 번째 출력은 조직 레코드 ID에 대해 라이선싱 서비스의 엔드포인트를 처음 액세스 시도한 시간을 보여준다. 즉, 레디스 캐시에 레코드가 없어서 조직 서비를 호출해서 데이터를 조회했다.

licensingservice_1     | 2022-08-16 13:57:31.677 DEBUG 1 --- [nio-8080-exec-1] c.o.l.s.c.OrganizationRestTemplateClient : Unable to locate organization from the redis cache: e839ee96-28de-4f67-bb79-870ca89743a0.

그 다음 두번째 호출했을때 조직 레코드가 레디스에 캐싱되어 데이터를 가져온 것을 확인할 수 있다.

licensingservice_1     | 2022-08-16 13:57:38.007 DEBUG 1 --- [nio-8080-exec-2] 		c.o.l.s.c.OrganizationRestTemplateClient : I have successfully retrieved an organization e839ee96-28de-4f67-bb79-870ca89743a0 from the redis cache: Organization(id=e839ee96-28de-4f67-bb79-870ca89743a0, 	name=Ostock, contactName=Illary Huaylupo, 	contactEmail=illaryhs@gmail.com, contactPhone=888888888)

사용자 정의 채널 설정

스프링 클라우드 스트림의 Source 및 Sink 인터페이스가 함께 패키징된 기본 입출력 채널을 사용하여 라이선싱 및 조직 서비스간 메시징 통합을 했다.
하지만 두 개이상의 채널을 정의하거나 고유한 채널 이름을 사용하고 싶다면 자체 인터페이스를 정의하여 입출력 채널을 노출할 수 있다.

다음은 라이선싱 서비스에서 클래스를 반환하는 메서드를 정의한다.

CustomChannels.java

package com.optimagrowth.license.events;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface CustomChannels {
	
    // 채널 이름을 지정한다. 노출된 채널에 대한 SubscribableChannel 클래스를 반환한다
    @Input("inboundOrgChanges")
    SubscribableChannel orgs();
    
}

메시지 발행을 위한 출력 채널을 정의하려면 다음과 같이 사용한다

@OutputChannel("outboundOrg")
MessageChannel outboundOrg();

새로운 사용자 정의 채널을 통해 서비스를 변경해보자.
그 전에 LicenseServiceApplication에 사용했던 코드는 삭제한다

LicenseServiceApplication

//@EnableBinding(Sink.class)
public class LicenseServiceApplication {

	//@StreamListener(Sink.INPUT)
	//public void loggerSink(OrganizationChangeModel orgChange) {
	//	logger.debug("Received {} event for the organization id {}", orgChange.getAction(), orgChange.getOrganizationId());
	//}
}

다음은 새로운 사용자 정의 채널을 구현한 코드다.

OrganizationChangeHandler.java

package com.optimagrowth.license.events.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

import com.optimagrowth.license.events.CustomChannels;
import com.optimagrowth.license.events.model.OrganizationChangeModel;

// Sink 대신 CustomChannels로 바뀐 모습이다.
@EnableBinding(CustomChannels.class)
public class OrganizationChangeHandler {

    private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);
    
    // 추후 레디스 데이터에 대해 CRUD 작업을 위해 사용할 예정이다.
    private OrganizationRedisRepository organizationRedisRepository;

	// 모든 메타데이터 추출 작업을 수행하는 유틸리티 클래스다.
    @StreamListener("inboundOrgChanges")
    // 데이터가 진행하는 액션을 확인한다
    public void loggerSink(OrganizationChangeModel organization) {
    	
        logger.debug("Received a message of type " + organization.getType());
        
        switch(organization.getAction()){
            case "GET":
                logger.debug("Received a GET event from the organization service for organization id {}", organization.getOrganizationId());
                break;
            case "SAVE":
                logger.debug("Received a SAVE event from the organization service for organization id {}", organization.getOrganizationId());
                break;
            case "UPDATE":
                logger.debug("Received a UPDATE event from the organization service for organization id {}", organization.getOrganizationId());
                break;
            case "DELETE":
                logger.debug("Received a DELETE event from the organization service for organization id {}", organization.getOrganizationId());
                break;
            default:
                logger.error("Received an UNKNOWN event from the organization service of type {}", organization.getType());
                break;
        }
    }


}

postman으로 조직을 만들고 ID값 기준으로 조회하여 테스트 해보자

POST 호출
http://localhost:8072/organization/v1/organization

body

{
"name":"HYEOKJIN",
"contactName":"CONTRACT_HYEOKJIN",
"contactEmail":"HYEOKJIN@gmail.com",
"contactPhone":"55555555"
}

로그를 살펴본다.
조직 서비스가 카프카 메시지를 보냈음을 나타낸다

organizationservice_1  | 2022-08-16 14:09:03.566 DEBUG 1 --- [nio-8081-exec-3] c.o.o.events.source.SimpleSourceBean     : Sending Kafka message SAVE for Organization Id: 19d6aae1-81bc-466c-a83a-29aa463c9e85

라이선싱 서비스에서 조직 메시지를 받을 때 OrganizationChangeHandler의 액션 메시지다. 레디스에 SAVE 값을 저장하는 로그부분이다

licensingservice_1     | 2022-08-16 14:09:03.625 DEBUG 1 --- [container-0-C-1] c.o.l.e.h.OrganizationChangeHandler      : Received a message of type com.optimagrowth.organization.events.model.OrganizationChangeModel
licensingservice_1     | 2022-08-16 14:09:03.625 DEBUG 1 --- [container-0-C-1] c.o.l.e.h.OrganizationChangeHandler      : Received a SAVE event from the organization service for organization id 19d6aae1-81bc-466c-a83a-29aa463c9e85

이제 이 ID 값으로 조회해보자

GET 호출
http://localhost:8072/organization/v1/organization/19d6aae1-81bc-466c-a83a-29aa463c9e85

조직 서비스에서 라이선싱 서비스 OrganizationChangeHandler 부분 GET 카프카 이벤트를 받았음을 나타낸다.

organizationservice_1  | 2022-08-16 14:12:07.012 DEBUG 1 --- [nio-8081-exec-7] c.o.o.events.source.SimpleSourceBean     : Sending Kafka message GET for Organization Id: 19d6aae1-81bc-466c-a83a-29aa463c9e85

licensingservice_1     | 2022-08-16 14:12:07.094 DEBUG 1 --- [container-0-C-1] c.o.l.e.h.OrganizationChangeHandler      : Received a message of type com.optimagrowth.organization.events.model.OrganizationChangeModel
licensingservice_1     | 2022-08-16 14:12:07.094 DEBUG 1 --- [container-0-C-1] c.o.l.e.h.OrganizationChangeHandler      : Received a GET event from the organization service for organization id 19d6aae1-81bc-466c-a83a-29aa463c9e85

🧨 다음 챕터에는 스프링 클라우드 슬루스와 집킨을 이용한 분석 추적에 대해 알아보겠다

profile
노옵스를향해

0개의 댓글