2. java filter plugin 개발

brody·2020년 6월 12일
0

로그스태시 소스 빌드하기

로컬에 logstash 소스가 있어야만 filter plugin 빌드가 가능하다.

  1. logstash download

    https://github.com/elastic/logstash.git
  2. 빌드하기

    gradlew.bat assemble
  3. 해당 위치에 jar가 생성되었는지 확인

    {플젝위치}/build/libs에 logstash-core-*.*.*.jar생성되었는지 확인
  4. 환경변수 설정

    set LS_SRC_HOME = {LOGSTASHCORE플젝위치}

filter plugin받아 작성하기

  1. filter - example download

    https://github.com/logstash-plugins/logstash-filter-java_filter_example
  2. C:\Users\SDS.gradle\gradle.properties에 아래 추가

    LOGSTASH_CORE_PATH={LOGSTASHCORE플젝위치}/logstash-core/
    
    #LOGSTASH_CORE_PATH=$LS_SRC_HOME/logstash-core/
    #->아래꺼로 해도 된다고 하는데 뭔가 잘 안맞아서 그냥 절대경로로 세팅함..
    #프록시가 있다면 그것도 여기에 넣어줘야함
    systemProp.proxySet=true
    
    systemProp.http.proxyHost={IP}
    systemProp.http.proxyPort={PORT}
    systemProp.http.nonProxyHosts={exclude할주소}|localhost
    
    systemProp.https.proxyHost={IP}
    systemProp.https.proxyPort={PORT}
    systemProp.https.nonProxyHosts={exclude할주소}|localhost
  3. build.gradle수정 (dependency가져오기, 이름바꾸기)

    나는 기존 maven repository에 배포된 것들을 가져와야해서 다음과 같이 gradle 설정을 바꿨다.

    아래와같이 maven url 모두 추가 (settings.xml에 있는그대로)

    repositories{
        maven {
            credentials {
                username '{id}'
                password '{password}'
            }
            url "{url}"
        }
    }

    아래와 같이 dependencies 추가 (pom.xml에 있는 그대로)

    compile '{groupID}:{artifactId}:{version(RELEASE일 경우 +)}'

    이름 바꿈

    pluginInfo.pluginClass     = "{yourNewFilterName}"
    pluginInfo.pluginName =  "{your_new_filter_name}"
  1. java plugin작성..

    dependency로 가져온 것들 사용해가며 filter 작성..

    위에서 지정한 {pluginInfo.pluginClass}로 class생성하여 만듬

  1. gem build
gradlew.bat gem
  1. 아래와같이 파일 생김
logstash-filter-your_new_name-1.00.gem
  1. plugin install

    여기서 삽질을 많이했는데, window에서는 경로의 backslash를 forwardslash로 바꿔야한다.

    For Windows platforms: Substitute backslashes for forward slashes as appropriate in the command.

    또 local plugin을 설치하는데 자꾸 https://rubygems.org 에 연결할수 없다고 나오는지 모르겠지만.. 환경변수에 아래와 같이 프록시를 설정했다.

    HTTPS_PROXY=https://{IP}:{PORT}
    HTTP_PROXY=http://{IP}:{PORT}
    bin\logstash-plugin.bat install --no-verify --local {경로}\logstash-filter-your_new_name-1.00.gem
    Installation successful
  2. test

    input {
      kafka {
        bootstrap_servers => "{IP:PORT}"
        topics => "{ACTION_NAME}"
        group_id => "log-to-es-group"
        consumer_threads => 1
      }
    }
    
    filter {
    	your_new_filter_name {}
    }
    
    output { 
      elasticsearch {
        hosts => ["{ES_HOST}"]
        index => "log-insert"
      }
      stdout { codec => rubydebug }
    }

문제는 kafka input에서 byte[]의 kafkaMessage를 filter로 줄 때 String으로 변환하여 주고있다. 직렬화된 byte[]를 특정 Class로 역직렬화 하고 싶은데, String으로 변환해서 들어오니 Class -> byte[] -> String -> byte[] -> Class의 비효율이 생긴다.

또 String -> byte[] -> Class로 변환하려고 할때 아래와 같은 exception이 발생함..
byte변환 시 getBytes("8859_1")로 charsetName을 주면 누락이 없다고 해서 kafka input 설정의 encoding도 아래와 같이 놓고 시도해보았지만 변환은 계속 안된다.

	codec => plain {
                   charset => "ISO-8859-1"
		    }
invalid stream header: EFBFBDEF

몇일째 삽질중이다. 로그스태시 input에서 어떤 일이 발생하는지 디버깅하기가 쉽지 않다. 다만 그냥 어떤 stream을 그대로 es에 손쉽게 때려박기는 용이한 서비스같다. 일단 flume을 작성해보고 생각하자.

profile
일하며 하는 기록

0개의 댓글