Hive Kafka Integration

Hive에서 연동을 위한 설정을 진행합니다.

개요

Hive는 Hadoop에 저장된 대용량의 데이터들을 SQL 문법으로 읽고 쓸수있게 해주는 data warehouse software입니다.
간단하게 Hadoop에서 동작하는 대용량 데이터 처리 Database라고 생각해 볼 수 있을것 같습니다.

기본으로 HDFS를 지원하며, Hbase 또는 기타 DBMS에 접근할 수 있도록 개방적인 DataSourcing 기능을 제공하고 있습니다. Message Queue로 많이 활용되는 Kafka도 지원합니다. 이번 시리즈에서는 Hive와 Kafka 연동을 위한 설정부터 Message를 읽고 쓰는 과정까지 기록해보려고 합니다.

이번 글에서는 사용할때 필요한 설정들과 알아야하는 정보들을 위주로 기록합니다. 사용 예시가 궁금하시다면 다음 글을 읽어주세요.

공식문서를 토대로 작성하였습니다.
문서에는 작성되어있지않으나, 사용하려면 추가로 알아야할 내용과 직접 테스트한 예제들을 상세히 기록하고자 합니다.

Hive 3.x 버전에서 진행하였습니다.

설치

사실 별도의 설치는 필요하지않습니다.
공식문서를 한번 살펴보셨다면 아시겠지만, Hive에 이미 내장 되어있습니다.
어떻게 할 지 상상이 안될수도 있지만, SQL을 사용하면 Kafka에 연동을 할 수 있습니다.

테이블 생성

Hive와 Kafka를 연동해주기 위해서는 연결고리를 만들어주어야합니다.
Spring에서는 Binder와 Binding을 사용하면 손쉽게 연결고리를 만들 수 있듯이, Hive에서는 StorageHandler를 사용하여 만들 수 있습니다.
StorageHandler는 table을 정의할 때 함께 명시하면 됩니다. 이 외에도 설정을 추가하면 완벽하게 사용하실수 있습니다.

참쉽죠
다만 Hive에서 Kafka와 연동된 테이블을 만들기 위해서는 몇가지 주의해야하는 사항이 있습니다.

첫번째로는 EXTERNAL 테이블로 생성해야한다는 것입니다.
우리가 사용할 방식은 Hive에서 Kafka에 접근하여 저장된 데이터들을 사용하는 것이기 때문에, EXTERNAL 키워드를 추가해주어야합니다.

두번째로는 KafkaStorageHandler 를 명시해야합니다.
테이블을 정의할 때 STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' 가 함께 적혀있어야 합니다.
적혀 있어야만 Kafka를 연동하여 사용하실 수 있습니다.

세번째로는 테이블설정을 해주어야합니다.
TBLPROPERTIES 키워드 사용하면 테이블에 대한 Property를 설정할 수 있게됩니다. 여기에 Kafka 연동에 필요한 Configuration을 작성해야 합니다.

세가지를 모두 적용하여 DDL을 작성해보면, 아래의 예시처럼 만들어질 수 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE EXTERNAL TABLE kafka_table (
    `timestamp` TIMESTAMP,
    `page` STRING,
    `newPage` BOOLEAN,
    `added` INT, 
    `deleted` BIGINT,
    `delta` DOUBLE
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES ( 
    "kafka.topic" = "test-topic",
    "kafka.bootstrap.servers" = "localhost:9092"
);

Kafka metadata

Kafka에 저장된 Message는 Payload 외에도 Key, Partition, Offset, Timestamp 4개의 metadata가 존재합니다. 이 데이터들도 Hive에서 사용할 수 있습니다.
앞서 테이블을 정의할 때 사용하였던 KafkaStorageHandler가 자동으로 metadata들을 컬럼으로 등록해 줍니다.

  • __key (byte array)
  • __partition (int32)
  • __offset (int64)
  • __timestamp (int64)

활용하는 방안은 실사용 예제를 작성한 다음글에서 자세히 설명드리도록 하겠습니다.

Avro 포맷 사용시 주의사항

만약 Confluent Connector를 통해 Avro포맷을 사용한다면, 메시지에서 5-bytes를 제거해주어야합니다.
이 5-bytes중 1-byte는 매직 byte, 4-byte는 schema registry의 schema id에 해당됩니다.
이는 "avro.serde.type"="skip" 그리고 "avro.serde.skip.bytes"="5"를 설정해주면 해결할 수 있습니다.
avro를 사용할 경우, 상세한 내용은 가이드를 꼭 참고하면 좋겠습니다.

confluent avro format 지원은 Hive 4.0버전에 추가될 것으로 보입니다. 3.x 버전을 사용하시는 분이라면, 주의해주시면 좋을것 같습니다. (제가 삽질했던거라서요…)

Serde(Serializer, Deserializer)

Application에서 Kafka로 데이터 전송을 해본 적 있다면, serializer, deserializer를 지정해보신 경험이 있으실 겁니다.
Hive에서도 마찬가지로 여러 serializer, deserializer를 제공하고있습니다.

Supported Serializers and Deserializers description
org.apache.hadoop.hive.serde2.JsonSerDe JSON 포맷
org.apache.hadoop.hive.serde2.OpenCSVSerde CSV 포맷
org.apache.hadoop.hive.serde2.avro.AvroSerDe AVRO 포맷
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe binary..?
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Plain Text

지정하는 방법은 생각보다 매우 간단합니다.
TBLPROPERTIES에 추가하면 손쉽게 적용할 수 있습니다.
예를 들면 Kafka에서 읽어들일 데이터포맷이 JSON의 경우이거나, 전송할 데이터가 JSON으로 하고 싶다면 아래처럼 추가하면 됩니다.
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe"

인증

Kafka에도 인증을 해야 데이터 읽고 쓸수 있는 권한을 주는 기능을 제공하고 있습니다.
Kafka 사용하는데 필요한 인증정보들 또한 Hive를 통해 제공할 수 있습니다.

저의 경우에는 별도로 인증을 추가하지 않고 사용했다보니, 인증없이 사용한 경우로 기록하겠습니다.
인증을 사용해야하는 상황이라면, 공식 문서를 읽어보고 진행해 주세요.

인증을 위한 TBLPROPERTIES 값을 추가하지 않은 상태라면, 연결을 시도했을때 에러가 발생합니다.
별도의 설정을 안해두었다면, 당연히 인증없이도 사용가능해야되는거 아닌가? 라고 생각할 수 있을텐데요. (제가 그랬습니다.) Hive는 기본값으로 id, password를 사용하여 인증을 시도하려 합니다.
따라서 인증을 사용하지않는 설정을 추가해주어야만 사용이 가능한데요.

공식문서에 이 문제를 해결하기 위한 방법을 따로 제시하지않아 많이 어려움을 겪었던 부분이었습니다.
결론부터 이야기하면 TBLPROPERTY를 추가하면 해결할 수 있습니다.

  • producer일 경우 : "kafka.producer.security.protocol" = "PLAINTEXT"
  • consumer일 경우 : "kafka.consumer.security.protocol" = "PLAINTEXT"

인증에 상세히 알아보고 싶다면, 문서를 참고하시면 좋을것 같습니다.
후에 서술하겠지만 설정할때 kafka.를 prefix로 지정해두면, kafka에 설정값이 전달할 수 있습니다.

Key Serializer

앞서 설명드렸던 Serializer들은 Value Serializer 였습니다.
예상하던것보다 많이 제공했지만 반대로 Key Serializer는 Byte Serializer밖에 지원하지 않습니다.
다른 Serializer로 변경해서 사용하고 싶더라도 변경할 수 없게 되어 있습니다.
왜 이렇게 구현한지는 모르겠지만, 자세한 내용은 코드를 보면 알 수 있습니다.
궁금하실 수 있을 것 같아, 링크도 첨부합니다.

그외

지금까지 작성한 내용들 이외에도 Kafka를 사용할때, 추가하고 싶은 설정이 더 있을 수 있습니다.
공식문서에서는 설명하진 않지만, Kafka에 사용할 설정값을 전달할 수 있습니다.

  • producer로 사용할 경우 “kafka.producer.{property}” = “{value}”
  • consumer로 사용할 경우 “kafka.consumer.{property}” = “{value}” TBLPROPERTIES에 추가해주면, Kafka에 설정값들이 전달되며 원하는 설정을 적용할 수 있습니다.

어떻게 설정값이 전달될 수 있는지, 구현된 코드가 궁금하다면 아래의 링크를 참고하시면 좋겠습니다.

정리

여기까지 Hive에서 Kafka를 Read, Write하기 위해서 필요한 설정들과, 도움이 되는 정보들에 대해 알아 보았습니다.
다음 글에서는 Kafka에 있는 Message들을 Hive에서 Read하는 과정을 기록해보겠습니다.
감사합니다.

Hugo로 만듦
JimmyStack 테마 사용 중