개요
이전 글에서 Kafka와 Hive 연동에 사용하는 설정들에 대해서 알아보았습니다.
이제부터는 Kafka에서 데이터를 읽어들이고, 쓰는 작업에 대해서 설명하려고 합니다.
이번 글에서는 Kafka에 있는 데이터들을 읽어들이는 것부터 진행해보도록 하겠습니다.
Hive를 Kafka Consumer로 사용하는 방벙리라 생각하시면 이해하기 쉬우실 겁니다.
차이점이 있다면, 일반 Consumer와는 다르게 Consumer Group으로 지정되지 않습니다.
JSON
Kafka로부터 읽어들일 데이터들이 json format 일 경우를 가정하고, 진행하는 예제입니다.
Sample data
아래와 같은 형태의 데이터를 예시를 기준으로 설명드리도록 하겠습니다.
필요에 따라 더 많은 필드들을 추가하실 수 있습니다.
|
|
테이블 정의
json 스키마와 동일한 구조를 가지는 테이블을 정의해보도록 하겠습니다. id 필드는 정수값을 가지고, value 필드는 문자열을 가지고 있습니다. Hive에서 정수값은 INT 또는 BIGINT Data Type으로 정의할 수 있습니다. 그리고 문자열은 STRING으로 정의합니다.
이 정보들을 토대로 DDL쿼리를 만들어보면 아래와 같습니다.
|
|
test_export_to_kafka
은 테이블명으로, 원하는대로 사용할 수 있습니다.
테이블에서 사용하는 컬럼들은 읽어들일 json 구조대로 작성해주면 됩니다.
컬럼명은 json key와 동일하게 작성하고, 컬럼의 타입은 json value의 데이터 타입과 동일하게 맞춰주면 됩니다.
hive 데이터 타입은 json보다 다양하므로, 문서를 참고해 적절한 타입을 선택해서 사용하는 것을 권장합니다.
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe"
으로 지정해줌으로서, hive가 json데이터를 deserialize할 수 있게 되었습니다.
그 외에 특별히 TBLPROPERTIES에서 살펴봐야할 내용은 없습니다.
kafka.topic
만 사용하고 싶은 kafka topic명으로 변경해주면 됩니다.
READ
|
|
정상적으로 설정되었다면, topic에 들어있는 메시지가 출력되는것을 확인하실 수 있습니다.
하지만 설정이 잘못되었다면 에러메시지가 출력되거나, 각 컬럼이 NULL로 출력 될 것입니다.
여기까지 설명했던 내용과 다르게 설정한 내용이 있지않은지 확인해주세요.
TEXT
JSON처럼 구조화된 데이터가 아니라, PLAIN TEXT를 읽어오는 예제입니다.
Sample data
PLAIN TEXT라, 예제도 특별할 것은 없습니다.
단순히 숫자값을 예시로 사용해보도록 하겠습니다.
|
|
테이블 정의
|
|
앞서 설명했던 JSON 테스트 케이스와 유사합니다.
다른점이 있다면 Serde와 컬럼을 다르게 지정해준 것입니다.
Serde의 경우 "kafka.serde.class" = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
로 변경하면 됩니다.
READ
|
|
정상적으로 설정되었다면, topic에 들어있는 메시지가 출력되는것을 확인하실 수 있습니다.
하지만 설정이 잘못되었다면 에러메시지가 출력되거나, 각 컬럼이 NULL로 출력 될 것입니다.
여기까지 설명했던 내용과 다르게 설정한 내용이 있지않은지 확인해주세요.
Kafka Metadata 활용
Kafka에는 메시지와 함께 메타 데이터들이 함께 기록됩니다.
테이블을 정의할때 메타 데이터들도 자동으로 컬럼으로 추가됩니다. Hive에서 메시지를 읽어올때, 메타 데이터도 함께 가져오게 됩니다.
메타 데이터가 컬럼으로 정의되다보니, SQL 조건문으로 활용할 수 있습니다.
예를 들면 특정 시간 이후로 유입된 데이터만 가져오던가, 특정 파티션에 있는 데이터만 가져올 수 있습니다.
__partition
와 __offset
의 경우에는 정수 타입을 사용하기 때문에 사용하기 편리합니다.
- ex1)
__partition = 0
- ex2)
__offset > 5000
하지만 __timestamp
의 경우에는 unix_timestamp를 사용하기에 int64로 기록됩니다.
이는 일상에서 사용하는 시간 표기방식과 다르고, 변환이 필요하기에 사용하기 불편합니다.
편하게 조건문으로 활용하기 위해서는 아래처럼 변경해서 사용하는것을 추천합니다.
unix_timestamp라는 함수를 사용 하면 되는데 아래 예제를 보겠습니다.
unix_timestamp('2022-03-08 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') * 1000
- 첫번째 인자는 변환하고 싶은 시간을 작성합니다.
- 두번째 인자는 첫번째 인자에 입력한 시간포맷을 작성합니다.
함수를 사용하여 특정 시간을 unix_timestamp로 변경 할 수 있게 되었고, __timestamp
비교 연산을 할 수 있게되었습니다.
하지만 hive로 통해 읽어들인 __timestamp
는 마이크로초 단위이기 때문에 단위를 맞춰주는 추가연산이 필요합니다.
이때문에 1000을 곱하였습니다. 아래는 사용 예제입니다.
ex) unix_timestamp('2022-03-08 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') * 1000 < __timestamp
추가 예제
|
|
정리
이번 글을 통해 Hive에서 Kafka에 저장된 JSON 또는 PLAIN-TEXT를 읽어서 사용할 수 있게 되었습니다.
metadata를 필터링 용도로 사용한다면, Kafka에 원하는 조건을 가진 메시지를 손쉽게 추출해 볼 수 있겠습니다.
아쉬운 점으로는 Hive에 저장되어있는 다른 테이블들과 join이 되지 않는다는 점입니다. (제가 시도했을때는 인증으로 인한 문제가 발생했습니다.)
이 때문에 사용용도가 많이 제약되었지만, 개선된다면 활용도가 높을 것으로 보입니다.
다음 글에서는 Hive데이터들을 Kafka로 전송하는 과정을 기록해보겠습니다.
감사합니다.