[Centos7] ELK를 이용한 관제 시스템

배찌 (배찌)·2023년 3월 13일
0

설치 기록

목록 보기
11/12

구성

Elastic server

  • 192.168.0.71
  • 2v cpu
  • 6gb ram
  • 100gb hdd

Logstash server

  • 192.168.0.70
  • 2v cpu
  • 6gb ram
  • 100gb hdd

Filebeat server

  • 192.168.0.200
  • 2v cpu
  • 4gb ram
  • 50gb hdd

ELK란?

ELK란 Elasticsearch, Logstash, Kibana의 약자이다.
오픈 소스 프로젝트로, 대규모 집합을 검색, 분석 및 시각화 하는데 사용된다.
한마디로 각 서버에서 발생하는 로그를 중앙 집중화 하여, 로그 데이터를 수집, 분석 및 시각화하는 도구이다.

로그분석, 보안 분석, 인프라 모니터링 등 다양한 분야에서 사용할 수 있다.

Elasticsearch, Logstash, Kibana 역할

  • Logstash : 다양한 소스에서 데이터를 수집하고 반환하여 ElasticSearch로 보낼 수 있도록 한다.
  • Elasicsearch : 실시간 분산 검색 및 분석 엔진이다. 데이터를 저장하고 검색하는 데 사용된다.
  • Kibana : Elasticsearch의 데이터를 시각화하고 대시보드 및 보고서를 생성하는 데 사용된다.

Filebeat

Filebeat는 ELK stack의 일부로, 로그 데이터를 Elasticsearch 또는 Logstash로 전송하는 경량 로그 수집기이다.

기본적인 구조

ELK를 구성하기 전 기본적인 데이터의 흐름 이해가 필요하다.

출처 : https://www.gspann.com/resources/blogs/understanding-a-centralized-logging-platform-elk-stack/

  1. RawData에서 데이터를 수집하여 Logstash에 전달한다.
  2. Logstash이 전달받은 불필요한 데이터를 필터링하고 남은 데이터를 Elasticsearch로 전송한다.
  3. Logstash에서 전송한 데이터를 Elasticsearch에서 저장한다.
  4. kibana와 elasticsearch를 연결하여 elasticsearch의 데이터를 kibana에서 시각화 한다.

그렇기에 기본적으로 WEB서버가 필요하며, WEB서버에서 Filebeat를 설치하여 로그를 Logstash에 전송해야한다.

Elasticsearch & Kibana install

ELK는 yum과 패키지 설치가 있지만 프로젝트 진행 중이기에 yum으로 빠르게 설치한다.

Elasticsearch, Logstash는 java로 개발이 되어 데이터를 읽거나 전송하는 모든 관련된 ELK는 openjdk를 설치한다.

yum -y install java-11-openjdk

ES_JAVA_HOME 전역변수 등록

cat >> /etc/profile <<EOF
export ES_JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64

EOF

Elastic은 기본적으로 JAVA_HOME이 아닌 ES_JAVA_HOME을 사용하기에 등록해주어야 한다.

source /etc/profile

Elasticsearch 설치

ELK 레포지터리 추가

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat >> /etc/yum.repos.d/elasticsearch.repo <<EOF

[elasticsearch]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=0
autorefresh=1
type=rpm-md

EOF

yum install -y --enablerepo=elasticsearch elasticsearch

Elasticsearch 설정 편집

vi /etc/elasticsearch/elasticsearch.yml


여기서 몇가지 주의할점이 있다.
우선 많은 글들이 하나의 서버에서 ELK를 모두 설치하여 network.host를 localhost로 하는 경우가 많다.
그런데 여기서 IP로 설정하게 되면 실행이 되지 않는 경우가 생긴다.
그렇기 때문에 discovery.seed_hosts를 수정한다.
http.port의 경우 logstash에서 elasticsearch로 전송하는 port를 의미한다. 보안을 생각한다면 9200번이 아닌 임의의 번호로 수정하는것을 추천한다.

나의 경우엔 cluster.initial_master_nodes도 수정하였는데 elastic을 시작하였는데 에러가 뜬다면 나와 같이 수정해보면 좋다.

이제 방화벽을 열어준다.

firewall-cmd --add-port=9200/tcp --permanent
firewall-cmd --reload
systemctl enable elasticsearch
systemctl restart elasticsearch

이제 정상적으로 작동됬는지 확인해보자

curl http://192.168.0.71:9200


다행히 정상적으로 되었다. name역시 node-1로 인식된것을 볼수가 있다.

이제 정상적으로 되었으니 kibana에서 로그인 할 Elasticsearch 유저를 생성해줘야한다.

우선 .yml파일에 추가 적인 내용이 되야한다.

vi /etc/elasticsearch/elasticsearch.yml
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true

X-pack 보안 기능을 활성화 하면 elasticserarch에서 제공하는 사용자 인증 및 권한 부여, SSL/TLS 보안 등의 기능을 지원한다.
여기서 ssl을 이용함으로써 유저와 패스워드를 입력하여 kibana에서 elasticsearch에 접속할수 있는 방법을 가지게 된것이다.

그리고 이제 elasticsearch에서 제공하는 내장 스크립트를 실행하여 Elasticsearch 클러스터에 대한 보안 계정을 설정정/보호하며, 인증 및 권한 부여를 사용하여 Elasticsearch 클러스터에 대한 액세스를 제한한다.

/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive

실행하면 내장 스크립트가 진행되며 대화형 스크립트가 진행되고, 패스워드만 지정해주면 된다.

Kibana 설치

kibana 레포지터리 추가

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearc
cat >> /etc/yum.repos.d/kibana.repo <<EOF

[kibana-7.x]
name=Kibana repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

EOF

kibana 설정

vi /etc/kibana/kibana.yml

해당 줄을 찾아서 수정하면된다.

server.port: 5601

server.host: <kibana ip>

elasticsearch.hosts: ["http://<elasticsearch ip>:<port>"]

나의 경우야 두개의 ip가 문제가 생길 일은 없지만 분리를 하게 된다면 이 부분역시 중요하다고 할 수 있다.

방화벽 설정

firewall-cmd --add-port=5601/tcp --permanent
firewall-cmd --reload
systemctl restart kibana

접속 테스트

http://<elasticsearch ip>:<port>

정상적으로 접속이 되었다.
하지만 지금 이 부분은 익명 사용자이기에 설정을 할수가 없다.
그렇기에 이제부터 로그인 설정을 하는데, kibana는 elasticsearch의 데이터를 시각화 하기에 elastic user로 접속하게 설정해야한다.

vi /etc/kibana/kibana.yml

kibana에서 다음과 같은 행을 찾아보면 주석처리가 되어있을것이다.

#elasticsearch.username: "user"
#elasticsearch.password: "password"

나의 경우 다음과 같이 변경하였다.


다시 재시작해준다.

systemctl enable kibana
systemctl restart kibana


정상적으로 로그인 페이지가 뜬다면 설정이 완료 되었다.

Logstash install

yum -y install java-11-openjdk

레포지터리 추가

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat >> /etc/yum.repos.d/logstash.repo <<EOF

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

EOF

logstash 설치

yum install logstash


Logstash에서는 그닥 설정할게 많지는 않다. 하지만 주의깊게 설정해야 할 것은 있다.

그건 바로 Pipeline설정이다. Filebeat에서 Logstash로 데이터를 전송할때 logstash에서는 Pipeline을 만들어 Pipeline에서 정보를 받아들이고 정제하여 인덱스형태로 elasticsearch로 전송하는 형태이다.

그렇기에 Filebeat 설정이 우선시 되어야 하니 Filebeat에서 추가로 설정한다

Filebeat install

여기서는 사전에 apache web server를 설치하였다.

이 점 유의하며 진행한다.

yum -y install java-11-openjdk.x86_64

filebeat 레포지터리 설치

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat >> /etc/yum.repos.d/filebeat.repo <<EOF

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

EOF

filebeat 설치

yum -y install filebeat

이제부터 데이터를 전송하기 위해서 설정해야 할것이 몇가지가 있다.

우선 전송해야될 로그를 크게 두가지로 나누었다.

/var/log/httpd/access_log
/var/log/secure

access_log는 접속하는 로그이고, secure는 ssh로 접속하는 로그가 남는다.

그렇다면 우리는 기본적으로 이러한 형태로 만들수 있다.

vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
    - /var/log/secure
•••

output.logstash:
  hosts: ["192.168.0.70:7077"]
  index => "web-%{+YYYY.MM.dd}"

여기서 output.lostash와 filebeat.inputs은 기본적으로 있는 내용이니 검색하여 찾아가길 바란다.

이렇게 설정하면 192.168.0.70 ip에 7077/tcp포트로 access_log, /var/log/secure를 전송한다는 로그가 만들어 진것이다.

이제 연결되는지 확인하기 위해서 Logstash에서 설정해야한다.

Logstash Pipeline 설정

현재 filebeat가 7077로 포트가 들어오며, elasticsearch는 9200번으로 데이터를 전달 받는다.

그러면 기본적인 형태를 만들어보자

파이프라인 파일 생성

vi /etc/logstash/conf.d/web_logs.conf
input {
  beats {
    port => 7077
  }
}

filter {

}

output {
	hosts => ["http://192.168.0.71:9200"]
    index => "web-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "123qwe"
}

없는 내용이 많지만 기본적으로 input으로 받아들이고 필터링을 거쳐서 output을 통해 elasticsearch로 데이터를 전송한다. 이때 ssl인증을 켰기 때문에 데이터를 전송할때 user, password가 추가로 기입이 되어야한다.

지금 생각해보면 Elastic이 아니라 Logstash 계정으로 전달해도 문제가 없지 않았을까 싶은데 시간이 없어 테스트를 해보진 못했다.

실행 전 방화벽을 연다.

firewall-cmd --add-port=7077/tcp --permanent
firewall-cmd --reload

실행

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/web_logs.conf --config.reload.automatic 

이렇게 했다면 포트가 정상적으로 열렸을것이다.
그러면 이제 filebeat에서 테스트를 진행한다.

filebeat test output


오 한번에 됬다.
만약 안된다면 selinux, firewalld를 체크해보길 바란다.

정상적으로 진행되니 실행

systemctl restart filebeat

이제 Logstash에서 정상적으로 전달되는지 확인하기 위해서 다음과 같은 conf파일을 생성한다

input {
  beats {
    port => 7077
  }
}

filter {

}

output {
	stdout { codec => "rubydebug" }
}

이렇게하고 다시 실행해준다.

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/web_logs.conf --config.reload.automatic 

이렇게 되면 콘솔로 다음과 같은 로그가 나오게 된다.

          "input" => {
        "type" => "log"
    },
            "ecs" => {
        "version" => "1.12.0"
    },
          "agent" => {
        "ephemeral_id" => "18e58759-ce22-4f35-8ecb-e24da108cac2",
                "type" => "filebeat",
             "version" => "7.17.9",
                "name" => "localhost.localdomain",
            "hostname" => "localhost.localdomain",
                  "id" => "f0d5147f-78d0-4091-afec-1f80dbb612bd"
    }
          "input" => {
        "type" => "log"
    },
            "ecs" => {
        "version" => "1.12.0"
    },
          "agent" => {
            "hostname" => "localhost.localdomain",
                "name" => "localhost.localdomain",
                  "id" => "f0d5147f-78d0-4091-afec-1f80dbb612bd",
             "version" => "7.17.9",
                "type" => "filebeat",
        "ephemeral_id" => "c96f2b0c-1ae6-4c09-b860-b9771d9dea72"
    },
       "@version" => "1",
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "host" => {
             "hostname" => "localhost.localdomain",
         "architecture" => "x86_64",
                 "name" => "localhost.localdomain",
                   "id" => "14a9d15737c742e08a3a5fc0caf24fdf",
        "containerized" => false,
                   "ip" => [
            [0] "192.168.0.200",
            [1] "fe80::df6c:fbac:1121:5b9d",
            [2] "192.168.122.1"
        ],
                  "mac" => [
            [0] "00:0c:29:a7:b5:ff",
            [1] "52:54:00:2f:be:b8",
            [2] "52:54:00:2f:be:b8"
        ],
                   "os" => {
            "platform" => "centos",
                "name" => "CentOS Linux",
              "family" => "redhat",
              "kernel" => "3.10.0-1160.83.1.el7.x86_64",
             "version" => "7 (Core)",
            "codename" => "Core",
                "type" => "linux"
        }
    }
}

중략을 하긴 했지만 정상적으로 데이터가 전송이 되는것을 확인 할수 있다.

이제 다시 수정한다.

input {
  beats {
    port => 7077
  }
}

filter {

}

output {
	hosts => ["http://192.168.0.71:9200"]
    index => "web-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "123qwe"
}

이렇게 수정하여 전송을 하게되면, 정상확인을 위해서 다음과 같은 방법으로 웹사이트에 접속한다.

http://<elasticsearch ip>:9200/_cat/indices?v

다른 테스트를 진행하다보니 많이 생성됫긴하지만 로그에 web-20xx-xx-xx이라고 되어있는 index가 보일것이다.

그렇다면 정상적으로 전달이 된것이다.
만약 pipeline에서 index를 설정하지않앗다면 filebeat로 표시가 될것이다.

여기서 부터는 kibana idnex partten을 설정해야하는데 multi pipeline을 설정해야 하기 때문에 Multiple pipeline이 필요없는 사람은 Kibana index partten으로 넘어가도 좋다.

Logstash Multiple Pipeline

이 Multiple Pipeline의 주 목적은 한 서버의 로그를 web log와 secure log를 분리하여 다른 index에 표현하는것이 주목적이다.

그렇기에 처음에는 Logstash에서 두개의 pipeline id를 만들고 로그 디렉토리 path를 이용하여 분리하면 되지 않을까?
라고 생각한것은 잘 되지 않았다.

그리하여 log정보에 tag를 달아 logstash에서 tag를 기준으로 정보를 분리하여 두개의 index로 표현하는 방법으로 선택하였다.

이를 위해서는 filebeat에서는 fields를 사용하여 분리하였다.

filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
    - /var/log/httpd/error_log
  fields:
    application: apache
  fields_under_root: true

- type: log
  paths:
    - /var/log/secure
  fields:
    application: secure
  fields_under_root: true

output.logstash:
  hosts: ["192.168.0.70:7077"]

이렇게 되면 두개의 log에 두개의 fields로 나뉘어 보내게 된다.

logstash.conf

input {
beats {
  port => 7077
}
}

filter {
      if [fields][application] == "apache" {

      }else if [fields][application] == "secure" {

     }
}


output {
      if [fields][application] == "apache" {
          stdout { codec => "rubydebug" }
      }else if [fields][application] == "secure" {
          stdout { codec => "rubydebug" }
     }
}

테스트를 진행하기 위해서 콘솔창에서 확인하였는데 나오지가 않았다.
이상하다고 여겨 if문을 모두 주석처리하고 input되어 오는 데이터를 찾아봣다.

                   "ip" => [
            [0] "192.168.0.200",
            [1] "fe80::df6c:fbac:1121:5b9d",
            [2] "192.168.122.1"
        ],
                  "mac" => [
            [0] "00:0c:29:a7:b5:ff",
            [1] "52:54:00:2f:be:b8",
            [2] "52:54:00:2f:be:b8"
        ],
        "containerized" => false,
         "architecture" => "x86_64",
             "hostname" => "localhost.localdomain",
                 "name" => "localhost.localdomain",
                   "id" => "14a9d15737c742e08a3a5fc0caf24fdf"
    },
          "input" => {
        "type" => "log"
    },
    "application" => "secure",
            "ecs" => {
        "version" => "1.12.0"
    },
          "agent" => {
        "ephemeral_id" => "18e58759-ce22-4f35-8ecb-e24da108cac2",
                "type" => "filebeat",
             "version" => "7.17.9",
                "name" => "localhost.localdomain",
            "hostname" => "localhost.localdomain",
                  "id" => "f0d5147f-78d0-4091-afec-1f80dbb612bd"
    }
          "input" => {
        "type" => "log"
    },
    "application" => "apache",
            "ecs" => {
        "version" => "1.12.0"
    },
          "agent" => {
            "hostname" => "localhost.localdomain",
                "name" => "localhost.localdomain",
                  "id" => "f0d5147f-78d0-4091-afec-1f80dbb612bd",
             "version" => "7.17.9",
                "type" => "filebeat",
        "ephemeral_id" => "c96f2b0c-1ae6-4c09-b860-b9771d9dea72"
    },
       "@version" => "1",
           "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
           "host" => {
             "hostname" => "localhost.localdomain",
         "architecture" => "x86_64",
                 "name" => "localhost.localdomain",
                   "id" => "14a9d15737c742e08a3a5fc0caf24fdf",
        "containerized" => false,
                   "ip" => [
            [0] "192.168.0.200",
            [1] "fe80::df6c:fbac:1121:5b9d",
            [2] "192.168.122.1"
        ],
                  "mac" => [
            [0] "00:0c:29:a7:b5:ff",
            [1] "52:54:00:2f:be:b8",
            [2] "52:54:00:2f:be:b8"
        ],
                   "os" => {
            "platform" => "centos",
                "name" => "CentOS Linux",
              "family" => "redhat",
              "kernel" => "3.10.0-1160.83.1.el7.x86_64",
             "version" => "7 (Core)",
            "codename" => "Core",
                "type" => "linux"
        }
    }
}

나는 fields 메인으로 오지않을까? 라고 생각했지만 실제로는 application을 기준으로 분리가 되었다

"application" => "secure"
•••
"application" => "apache"

그렇기에 fields를 삭제하고 application으로 분리하였다.

input {
  beats {
    port => 7077
  }
}

filter {
        if [application] == "apache" {

        }else if [application] == "secure" {

       }
}


output {
        if [application] == "apache" {
            stdout { codec => "rubydebug" }
        }else if [application] == "secure" {
            stdout { codec => "rubydebug" }
       }
}

그 결과는 역시 제대로 되었고, 이제 Elasticserach로 전송하는 내용으로 추가하였다.

input {
  beats {
    port => 7077
  }
}

filter {
        if [application] == "apache" {

        }else if [application] == "secure" {

                }
}


output {
       if [application] == "apache" {
         elasticsearch {
                        hosts => ["http://192.168.0.71:9200"]
                        index => "apache-%{+YYYY.MM.dd}"
                        user => "elastic"
                        password => "123qwe"
                }
    } else if [application] == "secure" {
         elasticsearch {
                        hosts => ["http://192.168.0.71:9200"]
                        index => "secure-%{+YYYY.MM.dd}"
                        user => "elastic"
                        password => "123qwe"
                }
       }
}

그 결과는 다음과 같이 정상적으로 전달되는것을 확인할 수 가 있었다.

Kibana Index Patterns

elasticserach까지 정상적으로 데이터를 전송하였다면 이제 elastic에 있는 index를 kibana에서 시각화하는 작업만이 남았다.


사이트에 접속하여 왼쪽 상단의 메뉴판을 누르고 "Slack Managerment"를 눌러준다
그다음 Kibana 탭에서 index patterns을 눌러준다

나는 사전에 테스트를 진행하여 인덱스 패턴이 있지만 보통 없기때문에 create라는 창이 오른쪽에서 나온다.

Create Index Pattern을 눌러주면 다음과 같은 창이 나온다.

우리는 현재 logstash에서의 index를 다음과 같이 설정했다.

index => "apache-%{+YYYY.MM.dd}"
index => "secure-%{+YYYY.MM.dd}"

앞의 apache, secure말고는 모두 변동사항의 index이다.

그렇기에 을 이용하여 apache뒤에 을 붙여 앞단에 apache가 적혀있는 모든 인덱스 내용을 불러들일 수 있다.

정상적인 index라면 다음과 같이 Timestamp field가 자동으로 생성되고, @timestamp를 선택하면 정상적으로 index pattern이 생성된다.

이제 홈으로 돌아가서 "Analytics"에 "Discover"로 접속한다.

정상적으로 index pattern이 설정 되었다면 다음과 같은 창이 나온다.

하지만 이 방법은 추천하는 방식은 아니다. 이렇게되면 모든 로그를 받아들이기에 불필요한 로그역시 도출되어 정작 중요한 로그를 보지 못할 가능성이 높다.

ELK는 오픈소스이기 때문에 조금만 찾아보면 필터가 나온다. 나의 경우 다음과 같이 필터를 설정하였다.

filter{
        if [application] == "apache" {
                grok { match => [ 'message', '%{IP:client_ip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:http_method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:response_size} "(?:%{URI:referrer}|-)" "%{DATA:user_agent}" ' ]
                        }
                geoip { source => "client_ip"
                        target => "geoip"
                        database => "/etc/logstash/maps/GeoLite2-City.mmdb" }

                }

        else if [application] == "secure" {

                        grok { match => [ 'message', '%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd[%{POSINT:pid}]: Accepted %{WORD:auth_method} for %{USERNAME:user} from %{IP:src_ip} port %{NUMBER:src_port} ssh2: %{WORD:key_type} %{WORD:hash}' ]
                                }

                        geoip { source => "src_ip"
                        target => "geoip"
                        database => "/etc/logstash/maps/GeoLite2-City.mmdb" }


        }
}
profile
Never give up Impossible is nothing

0개의 댓글