이 글에서는 Logstash를 활용해서 ElasticSearch와 MySQL에 데이터를 동기화해볼 것이다.
앞에 글에서 데이터 동기화를 위해서 Spring 서버에서 DB에 저장할 때 수동으로 ElasticSearch에도 저장을 해줬는데,
이번에는 Logstash와 JDBC 입력 플러그인을 통해서 동기화를 시도해 볼 것이다.
위 그림처럼 환경을 잡을 것이다.
로컬 PC에 설치돼있는 MySQL을 활용할 것이고, ElasticSearch, Logstash는 docker로 띄울 것이다.
다음 설정을 .env 파일에 추가해주자.
LOGSTASH_CONFIG=./resources/logstash.conf
MYSQL_HOST=host.docker.internal # 로컬에 mysql을 사용하기 때문에 이렇게 설정함. docker로 띄운다면 변경해야함.
MYSQL_PORT=3306
MYSQL_DB=ElkKafka
MYSQL_USER=root
MYSQL_PASSWORD=1234
JDBC_DRIVER_URL=https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar
JDBC_DRIVER_PATH=/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.23.jar
앞에 글에서 만든 Product
엔티티에 시간과 관련된 필드를 추가해주자.
spring main 클래스에 @EnableJpaAuditing
를 추가하는 것도 잊지말자.
@Getter
@EntityListeners(AuditingEntityListener.class)
@MappedSuperclass
public class BaseTimeEntity {
@Column(updatable = false)
@CreatedDate
private LocalDateTime createdAt;
@LastModifiedDate
private LocalDateTime updatedAt;
}
@Data
@Entity
@NoArgsConstructor
public class Product extends BaseTimeEntity {
...
}
여기서 updatedAt
을 기준으로 logstash에서 데이터를 가져올 것이다.
Logstash의 JDBC 입력 플러그인은 주기적으로 MySQL을 폴링하는 루프를 실행한다.
지금 이 설정은 10초마다 DB에서 데이터를 가져오는 설정이다.
모든 데이터를 가져오는 것이 아니라 마지막 폴링 시간 이후에 업데이트 된 데이터를 가져오고, ElasticSearch로 밀어 넣는 것이다.
input {
jdbc {
jdbc_driver_library => "${JDBC_DRIVER_PATH}"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DB}"
jdbc_user => "${MYSQL_USER}"
jdbc_password => "${MYSQL_PASSWORD}"
schedule => "*/10 * * * * *" # 10초마다 실행
# 변경 내역을 추적할 데이터베이스의 컬럼을 지정
tracking_column => "updated_at"
use_column_value => true
# sql_last_value -> logstash에서 내부적으로 다루는 마지막 실행 시간 변수
statement => "SELECT * FROM product WHERE updated_at > :sql_last_value" # 마지막 실행 시간 이후에 업데이트된 updated_at 컬럼 값을 기준으로 데이터를 가져옴
# 마지막 실행 메타데이터 파일의 경로
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
}
}
output {
elasticsearch {
hosts => ["http://es01:9200"]
index => "product"
document_id => "%{id}"
user => "elastic"
password => "changeme"
}
}
docker-compose.yml
에 services에 logstash를 추가해주자.
jdbc를 활용하기 위해서는 드라이버가 필요한데 만약 없다면 다운로드 받도록 구성했다.
logstash:
image: docker.elastic.co/logstash/logstash:${STACK_VERSION}
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
environment:
- xpack.monitoring.elasticsearch.hosts=http://es01:9200
- MYSQL_HOST=${MYSQL_HOST}
- MYSQL_PORT=${MYSQL_PORT}
- MYSQL_DB=${MYSQL_DB}
- MYSQL_USER=${MYSQL_USER}
- MYSQL_PASSWORD=${MYSQL_PASSWORD}
- JDBC_DRIVER_URL=${JDBC_DRIVER_URL}
- JDBC_DRIVER_PATH=${JDBC_DRIVER_PATH}
depends_on:
- es01
entrypoint: [ "/bin/sh", "-c" ]
command: >
"if [ ! -f ${JDBC_DRIVER_PATH} ]; then
curl -o ${JDBC_DRIVER_PATH} ${JDBC_DRIVER_URL};
fi &&
/usr/local/bin/docker-entrypoint"
ports:
- "5044:5044" # Logstash input port (optional)
Logstash를 통해 데이터를 가져오면서, 저장할 때와 업데이트할 때 ES와 관련된 코드가 제거됐다.
@Service
@Transactional
@RequiredArgsConstructor
public class ProductServiceV2WithJdbc {
private final ProductRepository repository;
private final ProductSearchRepository searchRepository;
public Product save(ProductRequest product) {
Product result = new Product(product.getPrice(), product.getStock(), product.getName());
repository.save(result);
// searchRepository.save(new ProductInfo(result)); // 필요없어짐.
return result;
}
public int purchase(long productId) {
Product product = repository.findById(productId).orElseThrow(RuntimeException::new);
product.buy();
// 업데이트에 대한 처리가 필요없어짐
return product.getStock();
}
public void delete(long productId) {
repository.deleteById(productId);
searchRepository.deleteById(productId); // isDeleted 필드가 따로 없다면 이렇게 수동 제거 필요.
}
}
지금 DB에는 createdAt과 updatedAt을 추가했는데 ElasticSearch의 Document에는 필드를 추가하지 않았다.
createdAt과 updatedAt 필드를 Document에도 추가해주자.
@Getter
@ToString
@NoArgsConstructor
@Document(indexName = Indices.PRODUCT_INDEX)
public class ProductInfo {
...
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
...
}
이렇게 추가를 하고 바로 적용해보면 새로운 데이터를 넣어도 createdAt과 updatedAt이 null로 적용된 것을 확인할 수 있다.
그 이유는 ElasticSearch에는 createdAt, updatedAt으로 필드명이 설정되지만
DB에서 필드명은 createdAt, updatedAt이기 때문이다.
이 문제를 해결하기 위한 방법은 2가지가 있는데 2번 방법을 사용하길 권장한다.
logstash.conf 파일에 다음 내용을 추가해주자.
이러면 DB의 created_at이 createdAt으로 updated_at이 udatedAt으로 들어가게 된다.
filter {
mutate {
rename => {
"created_at" => "createdAt"
"updated_at" => "updatedAt"
}
}
}
기본적으로 DB나 ElasticSearch에서는 스네이크 케이스를 주로 사용한다.
JPA에서도 그렇기 때문에 카멜 케이스의 필드명을 DB에서는 스네이크 케이스로 적용시켜준다.
ElasticSearch도 이 방식을 따르도록 설정을 약간만 수정하면 된다.
fieldNamingStrategy
부분을 추가해주자.
@Configuration
public class ElasticSearchConfig extends ElasticsearchConfiguration {
...
@Override
protected FieldNamingStrategy fieldNamingStrategy() {
return new SnakeCaseFieldNamingStrategy();
}
}