[NiFi] Local To DB 실습

CHAN LIM·2024년 1월 5일
0

NiFi

목록 보기
10/13
post-thumbnail

Intro.

생각해보니 Local To Snowflake는 해봤는데,
Local에서 DB로 Loading을 해보지 않아서 생각난 김에 진행.

  • 또한 바로 CSV TO Json을 포함한 실습이다.

0. 전체 프로세스

  • 대부분 이전에 사용했던 프로세서들이다.
    • 앞으로는 각 프로세서의 동작을 파악하기 쉽게 이름을 수정하려고 한다.

1. Processor 생성

1.1 GetFile

Local에서 File을 가져오는 프로세서

  • 이전에 사용했던 프로세서이므로 익숙할 것
  • Input Directory 속성만 설정하면 된다.
    • 파일을 가져올 디렉토리 경로 설정.

1.2 SplitText

텍스트를 여러 개의 작은 텍스트 파일로 분할하는 프로세서

  • Line Split Count
    • 헤더 행을 제외하고 각 분할 파일에 추가될 행 수.
    • 해당 실습은 한 줄 씩 분할할 것이므로 1로 설정
  • Maximum Fragment Size
    • 헤더 행을 포함한 각 분할 파일의 최대 크기.
  • Header Line Count
    • 헤더의 일부로 간주되어야 하는 행 수.
  • Header Line Marker Characters
    • 헤더 라인을 나타내는 데이터 파일 라인의 첫 번째 문자.
  • Remove Trailing Newlines
    • 각 분할 파일의 끝에서 줄 바꿈을 제거할지 여부.

1.3 UpdateAttribute

Flowfile의 속성을 변경하는 프로세서

  • 이름이 다른 건, 사용자가 용도에 맞게 수정했기 때문이다.
    • 따라서 당황하지 않아도 된다.
  • UpdateAttribute 관하여 이전 정리
    • 해당 프로세서를 활용한다면, 매우 복잡하고 다양하기 때문에 추후에 따로 정리할 필요성을 느낌.

일단, 목적은 Flowfile에 Schema Name을 추가하는 것이므로
위와 같이 설정해준다.

  • 속성 추가 방법은 마찬가지로 +를 클릭하여 추가한다.

1.4 ConvertRecord

Record를 변환하는 새로운 Processor

  • Record Reader
    • 들어오는 데이터를 읽는 데 사용할 컨트롤러 서비스를 지정한다.
    • CSV 파일을 읽을 것이기 때문에 CSVReader로 지정
  • Record Writer
    • 레코드 작성에 사용할 컨트롤러 서비스를 지정한다.
    • CSV를 Json으로 변환할 것이기 때문에 JsonRecordSetWriter로 지정
  • Include Zero Record FlowFile
    • 들어오는 FlowFile을 변환할 때 변환 결과 데이터가 없는 경우,
      이 속성은 해당 관계에 FlowFile을 보낼지 여부를 지정한다.

Controller Service

먼저! CSV TO Json 실습과 설정이 다르므로 주의한다.

  • AvroSchemaRegistry
    • 스키마 등록 및 접근을 위한 서비스를 제공한다.
    • name:
      • 스키마 이름
    • value:
      • 실제 스키마의 텍스트 표현을 나타내는 동적 속성으로 스키마를 등록

  • CSVReader

  • Schema Access Strategy
    • 데이터 해석에 사용할 스키마를 얻는 방법을 지정
    • 먼저, Schema Access Strategy를 `Use 'Schema Name' Property로 변경한다.
  • Schema Registry
    • 스키마 레지스트리에 사용할 컨트롤러 서비스를 지정
    • 바로 직전에 생성한 AvroSchemaRegistry를 Schema Registry에 등록한다.

  • JsonRecordSetWriter
    • 레코드의 스키마를 데이터에 추가하는 방법을 지정
  • Schema Access Strategy
    • 데이터 해석에 사용할 스키마를 얻는 방법을 지정
    • 먼저, Schema Access Strategy를 `Use 'Schema Name' Property로 변경한다.
  • Schema Registry
    • 스키마 레지스트리에 사용할 컨트롤러 서비스를 지정
    • 바로 직전에 생성한 AvroSchemaRegistry를 Schema Registry에 등록한다.


1.5 LogMessage 1

Log를 띄우는 프로세서

  • 프로세서 이름에서 확인하듯이, CSV Header를 걸러내기 위함이다.
    • 데이터베이스 스키마가 ID (int), Name (string) 이므로
      해당 데이터 형태가 아니라면 여기서 Filtering 된다.
    • (Header는 보통 string, string --- 형식이기 때문에 걸러진다.)

1.6 PutDatabaseRecord

  • Record Reader
    • JsonTreeReader를 생성해서 그대로 적용해준다.
      • JsonTree 형식의 데이터를 읽기 위함이다.
      • CSV To JSON으로 변환되어 적용되기 때문이다.
  • D.C.P.S
    • MySQL과 연결하므로, 이전에 만들어둔 ConnectionPool를 설정한다.

1.5 LogMessage 2

성공 Log를 띄우는 프로세서


2. 실행

2.1 실행 전 확인,

MySQL 데이터베이스 TEST2 테이블 비어있는 것을 확인

실습할 간단한 csv 파일 확인


2.2 실행

LogMessage

2024-01-05 13:37:34,641 INFO [Timer-Driven Process Thread-3] o.a.nifi.processors.standard.LogMessage LogMessage[id=d735a963-018c-1000-37cf-ac0e0a8aa1cc] ^^^^^^^^^^^^^^HEADER^^^^^^^^^^^^^^LO2DB
2024-01-05 13:37:34,651 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,697 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,697 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,734 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,734 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,748 INFO [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,748 INFO [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
2024-01-05 13:37:34,760 INFO [Timer-Driven Process Thread-1] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB

출처

How to read data from local and store it into MySQL table in NiFi

profile
클라우드, 데이터, DevOps 엔지니어 지향 || 글보단 사진 지향

0개의 댓글