사전준비
docker-compose.yml
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.license.self_generated.type=basic
- xpack.security.enabled=false
- xpack.security.enrollment.enabled=false
- http.host=0.0.0.0
volumes:
- ./elastic/es01/data:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
-
depends_on:
-
권한 부여
※ Permission denied 오류를 해결하기 위해 필요
chmod -R 777 ./elastic
로컬에서 테스트
docker-compose.yml
logstash:
image: docker.elastic.co/logstash/logstash:8.13.0
environment:
- DB_HOST=$DB_HOST
- DB_PORT=$DB_PORT
- DB_DATABASE=$DB_DATABASE
- DB_USERNAME=$DB_USERNAME
- DB_PASSWORD=$DB_PASSWORD
- xpack.monitoring.enabled=false
- xpack.monitoring.elasticsearch.hosts=["http://es01:9200"]
- http.host=0.0.0.0
- http.port=9600
- path.data=/usr/share/logstash/data
volumes:
- ./elastic/ls/jars/mysql-connector-j-8.3.0.jar/:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-8.3.0.jar # mysql 사용을 위한 jdbc-connector 다운
- ./elastic/ls/config:/usr/share/logstash/config
- ./elastic/ls/pipeline:/usr/share/logstash/pipeline
ports:
- 9600:9600
networks:
-
depends_on:
- es01
.env
파일에서 관리되도록 설정path.data
: Logstash 데이터 저장 경로 설정※ volumes
MariaDB와 jdbc 플러그인을 연결하기 위한 mysql-connector jar 파일 다운 필요
https://dev.mysql.com/downloads/connector/j/?os=26
→ volume을 이용해 컨테이너 내부의 jars폴더에 jar파일을 넣어준다.
호스트 머신의 config 폴더와 pipeline 폴더를 컨테이너와 공유
→ 호스트 환경에서 pipeline과 config 내용을 수정할 수 있기 위함
pipelines.yml - 파이프라인 정의
# 데이터 적재 파이프라인 1
- pipeline.id: main
pipeline.workers: 1
pipeline.batch.size: 1
path.config: '/usr/share/logstash/pipeline/logstash-main.conf'
# 데이터 적재 파이프라인 2
- pipeline.id: call
pipeline.workers: 1
pipeline.batch.size: 1
path.config: '/usr/share/logstash/pipeline/logstash-call.conf'
path.config
: 각 파이프라인 설정 파일 경로
logstash.conf - 파이프라인 설정 파일
input {
# jdbc 플러그인 → DB 연동 관련
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-8.3.0.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://${DB_HOST}:${DB_PORT}/${DB_DATABASE}?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Seoul"
jdbc_user => "${DB_USERNAME}"
jdbc_password => "${DB_PASSWORD}"
lowercase_column_names => false
use_column_value => true
tracking_column => "updatedAt"
tracking_column_type => "timestamp"
schedule => "*/5 * * * * *"
statement => "SELECT
qna.qna_idx AS qnaIdx,
...
FROM ${DB_DATABASE}.table AS qna"
last_run_metadata_path => "/usr/share/logstash/pipeline/last-run-metadata/last-value-1.yml"
}
}
filter {
# mutate → 필드 변환 관련
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
# elasticsearch 플러그인 → elasticsearch 적재 관련
elasticsearch{
hosts => ["http://es01:9200"]
ssl_verification_mode => none
index => "main"
document_id => "%{qnaIdx}"
template => "/usr/share/logstash/pipeline/template-main.json"
template_overwrite => true
}
}
last-value.yml
: 가장 마지막으로 조회한 데이터의 updated_at을 기록lowercase
옵션을 false로 지정하여 qnaidx → qnaIdx로 저장 가능하게 설정qnaIdx
필드의 값을 가져와서 각 document의 ID로 사용
template.json - 엘라스틱 서치 인덱스 설정 매핑
⚠ Failed to install template
→ Elasticsearch 8버전 이상 부터는 new index template 형식을 사용하야함
(참고)
https://stackoverflow.com/questions/76519564/logstash-elasticsearch-failed-to-install-template-got-response-code-400
{
"index_patterns": ["main-*"],
"priority": 1,
"template": {
"settings": {
....
},
"mappings": {
"properties": {
"field1": {
"type": "keyword"
},
"field2": {
"type": "long"
},
"field3": {
"type": "text",
"fields": {
"ngram": {
"type": "text",
"analyzer": "my_ngram_analyzer"
}
}
},
...
}
}
}
}
.gitignore
# Logstash
/elastic/ls/pipeline/last-run-metadata/last-value-1.yml
/elastic/ls/pipeline/last-run-metadata/last-value-2.yml
Elasticsearch 적재 확인
{host ip}:4095/_cat/indices?v
{host ip}:4095/{index name}/_search
{host ip}:4095/{index name}
log4j2.properties → 로그 출력 설정 파일
status = error
name = LogstashPropertiesConfig
appenders = console, rolling
loggers = jdbcinput
/* Console창에 info 하위 레벨 로그가 출력되도록 */
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n
appender.console.filter.threshold.type = ThresholdFilter
appender.console.filter.threshold.level = info
rootLogger.level = info
rootLogger.appenderRef.console.ref = ${sys:ls.log.format}_console
/* Log 파일에는 warn 하위 레벨 로그만 출력되도록 */
appender.rolling.type = RollingFile
appender.rolling.name = rolling
# 해당 경로에 로그 파일 저장
appender.rolling.fileName = /usr/share/logstash/logs/logstash.log
# 만약에 파일에 들어있는 로그 수가 많을 경우 "일"별로 로그 파일을 분리하여 기록
appender.rolling.filePattern = /usr/share/logstash/logs/logstash-%d{yyyyMMdd}.log
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 30
appender.rolling.filter.threshold.type = ThresholdFilter
appender.rolling.filter.threshold.level = warn
rootLogger.level = info
rootLogger.appenderRef.rolling.ref = rolling
/* 단, 5초마다 생성되는 jdbc 관련 로그는 Error Level일 경우에만 출력되도록 */
logger.jdbcinput.name = logstash.inputs.jdbc
logger.jdbcinput.level = ERROR
※ 기본으로 제공되는 또 하나의 파일 log4j2.file.properties는 제거하자. (log4j2.properties와 충돌 이슈)
docker-compose.yaml
volumes:
...
- ./logs/logstash:/usr/share/logstash/logs # 이 부분 추가
권한 부여
chmod -R 777 ./logs/logstash
세팅 끝.
※ 참고
https://velog.io/@baebae/docker-환경에서-elasticsearch와-logstash를-이용해-mysql-데이터-연동하기