일단 해보기 : Spring ElasticSearch 검색 기능 만들기 (2) - Logstash JDBC로 DB 동기화

Jang990·2024년 7월 13일
0

일단해보기

목록 보기
4/7

이 글에서는 Logstash를 활용해서 ElasticSearch와 MySQL에 데이터를 동기화해볼 것이다.
앞에 글에서 데이터 동기화를 위해서 Spring 서버에서 DB에 저장할 때 수동으로 ElasticSearch에도 저장을 해줬는데,
이번에는 Logstash와 JDBC 입력 플러그인을 통해서 동기화를 시도해 볼 것이다.

logstash 추가하기


위 그림처럼 환경을 잡을 것이다.
로컬 PC에 설치돼있는 MySQL을 활용할 것이고, ElasticSearch, Logstash는 docker로 띄울 것이다.

.env에 추가하기

다음 설정을 .env 파일에 추가해주자.

LOGSTASH_CONFIG=./resources/logstash.conf
MYSQL_HOST=host.docker.internal # 로컬에 mysql을 사용하기 때문에 이렇게 설정함. docker로 띄운다면 변경해야함.
MYSQL_PORT=3306
MYSQL_DB=ElkKafka
MYSQL_USER=root
MYSQL_PASSWORD=1234
JDBC_DRIVER_URL=https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar
JDBC_DRIVER_PATH=/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.23.jar

updated_at 필드 추가하기

앞에 글에서 만든 Product 엔티티에 시간과 관련된 필드를 추가해주자.
spring main 클래스에 @EnableJpaAuditing를 추가하는 것도 잊지말자.

@Getter
@EntityListeners(AuditingEntityListener.class)
@MappedSuperclass
public class BaseTimeEntity {
    @Column(updatable = false)
    @CreatedDate
    private LocalDateTime createdAt;

    @LastModifiedDate
    private LocalDateTime updatedAt;
}
@Data
@Entity
@NoArgsConstructor
public class Product extends BaseTimeEntity {
   ...
}

여기서 updatedAt을 기준으로 logstash에서 데이터를 가져올 것이다.

logstash.conf 파일 추가하기

Logstash의 JDBC 입력 플러그인은 주기적으로 MySQL을 폴링하는 루프를 실행한다.
지금 이 설정은 10초마다 DB에서 데이터를 가져오는 설정이다.

모든 데이터를 가져오는 것이 아니라 마지막 폴링 시간 이후에 업데이트 된 데이터를 가져오고, ElasticSearch로 밀어 넣는 것이다.

input {
  jdbc {
    jdbc_driver_library => "${JDBC_DRIVER_PATH}"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DB}"
    jdbc_user => "${MYSQL_USER}"
    jdbc_password => "${MYSQL_PASSWORD}"
    schedule => "*/10 * * * * *" # 10초마다 실행

    # 변경 내역을 추적할 데이터베이스의 컬럼을 지정
    tracking_column => "updated_at"
    use_column_value => true

    # sql_last_value -> logstash에서 내부적으로 다루는 마지막 실행 시간 변수
    statement => "SELECT * FROM product WHERE updated_at > :sql_last_value" #  마지막 실행 시간 이후에 업데이트된 updated_at 컬럼 값을 기준으로 데이터를 가져옴

    # 마지막 실행 메타데이터 파일의 경로
    last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
  }
}

output {
  elasticsearch {
    hosts => ["http://es01:9200"]
    index => "product"
    document_id => "%{id}"
    user => "elastic"
    password => "changeme"
  }
}

docker-compose.yml에 추가하기

docker-compose.yml에 services에 logstash를 추가해주자.
jdbc를 활용하기 위해서는 드라이버가 필요한데 만약 없다면 다운로드 받도록 구성했다.

  logstash:
    image: docker.elastic.co/logstash/logstash:${STACK_VERSION}
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    environment:
      - xpack.monitoring.elasticsearch.hosts=http://es01:9200
      - MYSQL_HOST=${MYSQL_HOST}
      - MYSQL_PORT=${MYSQL_PORT}
      - MYSQL_DB=${MYSQL_DB}
      - MYSQL_USER=${MYSQL_USER}
      - MYSQL_PASSWORD=${MYSQL_PASSWORD}
      - JDBC_DRIVER_URL=${JDBC_DRIVER_URL}
      - JDBC_DRIVER_PATH=${JDBC_DRIVER_PATH}
    depends_on:
      - es01
    entrypoint: [ "/bin/sh", "-c" ]
    command: >
      "if [ ! -f ${JDBC_DRIVER_PATH} ]; then
         curl -o ${JDBC_DRIVER_PATH} ${JDBC_DRIVER_URL};
       fi &&
       /usr/local/bin/docker-entrypoint"
    ports:
      - "5044:5044" # Logstash input port (optional)

V2-JDBC. Logstash JDBC로 동기화하기

Logstash를 통해 데이터를 가져오면서, 저장할 때와 업데이트할 때 ES와 관련된 코드가 제거됐다.

@Service
@Transactional
@RequiredArgsConstructor
public class ProductServiceV2WithJdbc {
    private final ProductRepository repository;
    private final ProductSearchRepository searchRepository;

    public Product save(ProductRequest product) {
        Product result = new Product(product.getPrice(), product.getStock(), product.getName());
        repository.save(result);
//        searchRepository.save(new ProductInfo(result)); // 필요없어짐.
        return result;
    }

    public int purchase(long productId) {
        Product product = repository.findById(productId).orElseThrow(RuntimeException::new);
        product.buy();

        // 업데이트에 대한 처리가 필요없어짐

        return product.getStock();
    }

    public void delete(long productId) {
        repository.deleteById(productId);
        searchRepository.deleteById(productId); // isDeleted 필드가 따로 없다면 이렇게 수동 제거 필요.
    }
}

document 수정하기

지금 DB에는 createdAt과 updatedAt을 추가했는데 ElasticSearch의 Document에는 필드를 추가하지 않았다.

컬럼 추가

createdAt과 updatedAt 필드를 Document에도 추가해주자.

@Getter
@ToString
@NoArgsConstructor
@Document(indexName = Indices.PRODUCT_INDEX)
public class ProductInfo {
    ...
    
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
    
    ...
}

하지만 null...

이렇게 추가를 하고 바로 적용해보면 새로운 데이터를 넣어도 createdAt과 updatedAt이 null로 적용된 것을 확인할 수 있다.

그 이유는 ElasticSearch에는 createdAt, updatedAt으로 필드명이 설정되지만
DB에서 필드명은 createdAt, updatedAt이기 때문이다.

이 문제를 해결하기 위한 방법은 2가지가 있는데 2번 방법을 사용하길 권장한다.

해결방법 1. logstash.conf 변경

logstash.conf 파일에 다음 내용을 추가해주자.
이러면 DB의 created_at이 createdAt으로 updated_at이 udatedAt으로 들어가게 된다.

filter {
  mutate {
    rename => {
      "created_at" => "createdAt"
      "updated_at" => "updatedAt"
    }
  }
}

해결방법 2. 스네이크 케이스로 변경 (권장)

기본적으로 DB나 ElasticSearch에서는 스네이크 케이스를 주로 사용한다.
JPA에서도 그렇기 때문에 카멜 케이스의 필드명을 DB에서는 스네이크 케이스로 적용시켜준다.

ElasticSearch도 이 방식을 따르도록 설정을 약간만 수정하면 된다.

fieldNamingStrategy 부분을 추가해주자.

@Configuration
public class ElasticSearchConfig extends ElasticsearchConfiguration {
    ...

    @Override
    protected FieldNamingStrategy fieldNamingStrategy() {
        return new SnakeCaseFieldNamingStrategy();
    }
}
profile
공부한 내용을 적지 말고 이해한 내용을 설명하자

0개의 댓글