빅데이터/Apache Nifi / / 2023. 9. 3. 18:45

Apache Nifi 프로세서

  • 주요 프로세서
  • ExecuteSQ
  • ConvertRecord
  • ConvertAvroToJSON
  • JoltTransformJSON
  • SplitAvro
  • PutDatabaseRecord
  • UpdateRecord
  • InvokeHTTP

 

1. 주요 프로세서

   
ExecuteSQL 제공된 SQL 선택 쿼리를 실행합니다. 쿼리 결과는 Avro 형식으로 변환됩니다
ConvertRecord 구성된 레코드 리더 및 레코드 쓰기 컨트롤러 서비스를 사용하여 레코드를 한 데이터 형식에서 다른 데이터 형식으로 변환합니다. 판독기와 기록기는 일치하는 스키마로 구성되어야 합니다. 이는 스키마의 필드 이름이 동일해야 함을 의미합니다.
ConvertAvroToJSON 바이너리 Avro 레코드를 JSON 객체로 변환합니다. 이 프로세서는 Avro 필드를 JSON 필드에 직접 매핑하므로 결과 JSON이 Avro 문서와 동일한 계층 구조를 갖게 됩니다.
JoltTransformJSON Jolt 사양 목록을 플로우 파일 JSON 페이로드에 적용합니다. 새로운 FlowFile이 생성되고 ‘성공’ 관계로 라우팅됩니다
SplitAvro 구성된 출력 크기에 따라 바이너리로 인코딩된 Avro 데이터 파일을 더 작은 파일로 분할합니다. 출력 전략은 더 작은 파일이 Avro 데이터 파일인지 아니면 FlowFile 속성에 메타데이터가 있는 베어 Avro 레코드인지 결정합니다
ConvertAvroToJSON 바이너리 Avro 레코드를 JSON 객체로 변환합니다. 이 프로세서는 Avro 필드를 JSON 필드에 직접 매핑하므로 결과 JSON이 Avro 문서와 동일한 계층 구조를 갖게 됩니다
PutDatabaseRecord 지정된 RecordReader를 사용하여 수신 흐름 파일의 레코드를 입력합니다
UpdateRecord 레코드 지향 데이터가 포함된 FlowFile의 콘텐츠를 업데이트 합니다
InvokeHTTP 구성 가능한 HTTP 엔드포인트와 상호작용할 수 있는 HTTP 클라이언트 프로세서입니다.

 

2. ExecuteSQL

데이터베이스 SQL SELECT 쿼리를 실행하여 Avro 형식으로 데이터를 얻는 프로세서입니다

 

Configure Properties 구성에서 가장 중요한 부분은 Service 연결입니다

  • Database Connection Pooling Service

위와 같은 이미지에서 노란색행의 No value set을 눌러줍니다

 

등록해 둔 service가 있으면 목록에 나오고,
첫 진행이라면 하단의 Create new service… 를 눌러서 등록을 진행합니다

 

다시 한번 Add Controller Service라는 레이어가 뜨고,

Compatible Controller Services에서 DBCPConnectionPool 1.22.0을 눌러줍니다

 

다음과 같이 Value에 새로 생성한 커넥션 풀이 보입니다

(만약 Value에 생성한 커넥션 풀이 보이지 않는다면 No value set을 다시 눌러주어 생성된 연결을 선택합니다)

 

이때부터 좌측 상단의 Invalid 를 신경 써 주면서 프로퍼티 설정을 진행합니다.

Invalid는 모든 연결이 정상일 때 메시지가 사라집니다

이어서 파란색 박스의 화살표를 눌러줍니다

 

이번 레이어에서도 Invalid를확인할 수 있습니다.

아직 설정이 완료되지 않아서 뜨고 있으니 우측 톱니바퀴 모양을 눌러 설정을 진행합니다

 

Properties 탭에서 Database 설정 정보를 입력 및 세팅합니다

  • Database Connection URL : 데이터베이스 연결 URL
  • Database Driver Class Name : 데이터베이스 드라이버 클래스 (ex postgresql = org.postgresql.Driver)
  • Database Driver Location : 데이터베이스 드라이버 위치 (각 데이터베이스 공식 홈페이지에 파일 제공)
  • Database User
  • Password

 

위 이미지에 우측 파란색 박스의 ⓥ를 눌러 Attribute 추가할 수 있습니다

잘 설정하고 나왔다면 이전 레이어팝업에 제일 우측 톱니바퀴 옆에 Disable 버튼이 활성화 또는 비활성화되어있습니다

번개모양으로 활성화되어 있고 State 열에 Enabled로 뜬다면 정상입니다. (이때 Invalid 뜨던 것도 사라집니다)

 

여기까지 잘 진행되었다면 마지막으로,

Configure Processor 레이어에서 ⓥ를 눌러주어 프로세서 검증을 할 수 있습니다

이때 좌측 상단의 Invalid 뜨던 것도 사라집니다

 

3. ConvertRecord

ConvertRecord 프로세서부터는 ExecuteSQL 프로세서를 작성해 봤다면 쉽게 구성할 수 있습니다

ConverRecord는 구성된 레코드 리더 및 레코드 쓰기 컨트롤러 서비스를 사용하여

레코드를 한 데이터 형식에서 다른 데이터 형식으로 변환합니다

 

 

ConvertRecord Configure Properties 구성에서도 가장 중요한 부분은 Service 연결입니다

ConvertRecord에서는 레코드 리더와 레코드 쓰기 서비스를 작성합니다

 

작성 패턴은 ExecuteSQL과 동일하고 작성 후 위 이미지와 같이 확인할 수 있습니다

 

JsonTreeReader에서의 중요점

  • Schema 명으로 접근가능
  • Starting Field Name 설정을 통해 시작 필드를 설정할 수 있습니다
    ( InvokeHTTP 프로세서 전송데이터 payload 접근 )

 

일반적으로 ConvertRecord를 단일로 사용하기보다는

목표 데이터 형태에 맞는 ConvertAvroToJSON (DB to JSON), JoltTransformJSON (HTTP Shift to JSON)로 사용

 

4. ConvertAvroToJSON

ConvertAvroToJSON 프로세서는 Avro 레코드를 JSON 객체로 변환합니다

 

ExecuteSQL를 통해서 Avro 형식으로 데이터가 들어오면 JSON 형태로 변환합니다.

 

ConvertAvroToJSON Configure Processor Properties 탭에서 다음과 같은 설정을 할 수 있습니다.

  • JSON container options : 레코드 형태 설정
  • Wrap Single Record : 단일 레코드 여부 설정
  • Avro schema : 스키마 설정

 

 

5. JoltTransformJSON

JoltTransformJSON 프로세서는 Jolt 사양 목록을 플로우 파일 JSON 페이로드에 적용합니다.

 

새로운 FlowFile로 라우팅 되기 때문에 프로세서 다음의 레코드 형태를 유의하여 사용해야 합니다.

 

JoltTransformJSON 사용 예시로 위 이미지처럼 구성 할 수 있으며, Configure Properties 탭에서 Jolt Specification을 작성하기 전 ADVANCED 버튼을 통해 사전 결과를 얻어 볼 수 있습니다

 

 

 

 

 

아래 이미지는 ADVANCED 버튼을 눌러서 진입한 JoltTransform DSL 레이어입니다.


6. SplitAvro

구성된 출력 크기에 따라 바이너리로 인코딩된 Avro 데이터 파일을 더 작은 파일로 분할합니다.

 

출력 전략은 더 작은 파일이 Avro 데이터 파일이 아니면 FlowFile 속성에 메타데이터가 있는 bare Avro 레코드로 결정됩니다. 출력은 항상 이진 인코딩됩니다.

 

 

7. UpdateRecord

레코드 지향 데이터가 포함된 FlowFile의 콘텐츠를 업데이트 합니다. 이 프로세서를 사용하려면 하나 이상의 사용자 정의 속성을 추가해야 합니다. 속성 이름은 업데이트해야 하는 필드를 결정하는 RecordPath를 나타내야 합니다. 

 

 

 

 

 

 

 

8. PutDatabaseRecord

PutDatabaseRecord 프로세서는 지정된 RecordReader를 사용하여 수신 흐름 파일의 레코드(여러가지 가능)을 입력합니다. 이러한 레코드는 SQL 문으로 변환되어 단일 트랜잭션으로 실행됩니다. 오류가 발생하면 플로우 파일은 실패 또는 재시도로 라우팅 되고, 레코드가 성공적으로 전송되면 수신 플로우 파일은 성공으로 라우팅됩니다.

Properties 탭에서 다음과 같은 설정을 할 수 있습니다.

  • Record Reader : 
  • Database Type : 
  • Statement Type : 
  • Database Connection Pooling Service : 
  • Schema Name : 
  • Table Name : 

9. InvokeHTTP

구성 가능한 HTTP 엔드포인트와 상호작용할 수 있는 HTTP 클라이언트 프로세서입니다. FlowFile 속성은 HTTP 헤더로 변환되고 FlowFile 내용은 요청 본문으로 포함됩니다. 

'빅데이터 > Apache Nifi' 카테고리의 다른 글

Apache Nifi SplitJson, JsonPath Expression  (0) 2023.11.10
Apache Nifi Cron Schedule 작성  (0) 2023.11.09
Apache Nifi 실행  (0) 2023.09.01
Apache Nifi 란?  (1) 2023.08.30
Apache Nifi 설치하기  (0) 2023.07.24
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유