Write message to kafka using hive

Hive에서 Kafka로 데이터를 전송하는 예제입니다.

개요

앞서 설명해왔던 내용과는 반대로 Hive에서 Kafka에 메시지를 전송하는 과정을 설명하겠습니다.
Kafka 입장에서는 Hive가 Producer 역할을 수행하게 됩니다.

단점

Hive에서 Kafka로 전송할때 필수적인 기능들은 제공하다보니, 일반적인 경우에는 사용하는데 문제 없습니다.
하지만 일부 기능을 제공하지않는데, 그 중 하나가 Header입니다.
Header도 만들어주어야 하는 상황이라면, Hive 말고 다른 Application을 사용하는 것을 권장합니다.

kafka metadata 주의사항

Producer에서 Kafka로 메시지를 전송할때는 메타 데이터들을 포함시켜 전송할 수 있습니다.
포함시키지 않을 경우, Kafka에서 자동으로 생성해주는 것으로 알고 있습니다.
Hive에서도 마찬가지 입니다. __timestamp, __partition, __offset은 kafka에서 기록하는 정보이므로 아래처럼 작성하면 Kafka에서 자동으로 생성해 줄 수 있습니다.

  • __timestamp : -1
  • __partition : null
  • __offset : -1

하지만 __key는 사용할 수 있는 값의 범위는 한정적이다보니 선택지가 제한적이지만, 신중하게 설정해주는 것이 좋습니다.
Kafka는 keypartition을 결정하는데 사용하므로 메시지 분산에 영향을 미칠수 있습니다. 그리고 cleanup.policy 설정값에 따라 key 활용여부가 결정됩니다.
Kafka에서 key값을 기준으로 동일한 partition에 넣어주는 역할을 수행하고 있으며 , log.cleanup.policy = compact 로 설정할경우 key를 활용하기 때문에 byte array로 넣어주는게 좋습니다.

  • Integer to byte array : cast(cast(1 AS STRING) AS BINARY)
  • String to byte array : cast(‘1’ AS BINARY)

JSON 테스트 케이스

JSON 포맷 메시지를 Kafka에 전송해보도록 하겠습니다.

토픽 생성

테스트 메시지를 저장해둘 토픽을 생성합니다.
아래의 예시는 Kafka 설치시 함께 제공되는 스크립트를 사용하여 topic을 만듭니다.
./kafka-topics.sh --create --topic "${토픽토픽}" --replication-factor 1 --partitions 1 --bootstrap-server "{서버서버}"

테이블 생성

Kafka에 전송할 topic을 만들어보았으니, 이제는 실제 전송역할을 수행해줄 table을 생성해보겠습니다.
Kafka Handler를 사용할 수 있도록 KafkaStorageHandler를 지정해줍니다.
TBLPROPERTIES에는 앞서 설명드렸던 글과 비슷하게 진행합니다.
다만, 메시지 전송하는 역할이니 consumer가 아닌 producer로 지정해줍니다.
아래의 예시를 참고하면 좋겠습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE EXTERNAL TABLE test_export_to_kafka(
	`id` STRING,
	`value` STRING
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
    "kafka.topic" = "${토픽토픽}",
    "kafka.bootstrap.servers" = "${서버서버}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

테스트 데이터 전송

만들어준 테이블에 데이터를 입력해보도록 하겠습니다.
테이블에 명시한 컬럼인 id, value에 대한 값을 필수로 추가합니다.
그 외에는 Kafka MetataData 이며 key, partition, offset, timestamp 순서대로 생각하고 넣어주시면 됩니다.
insert into test_export_to_kafka values ('1', '1', null, null, -1, -1);

ONLY LONG TYPE 테스트 케이스

이번에는 JSON이 아닌 PLAIN TEXT Format으로 데이터를 전송해보도록하겠습니다.

토픽 생성

앞서 설명드렸던 JSON 테스트 케이스와 비슷합니다.
테스트 메시지를 저장해둘 토픽을 생성합니다.
아래의 예시는 Kafka 설치시 함께 제공되는 스크립트를 사용하여 topic을 만듭니다.
./kafka-topics.sh --create --topic "${토픽토픽}" --replication-factor 1 --partitions 1 --bootstrap-server "{서버서버}"

테이블 생성

Kafka Handler와 TBLPROPERTIES는 앞서 설명드렸던 테스트 케이스와 동일하게 설정합니다.
다만 이번엔 PLAIN TEXT Format인 만큼, 컬럼은 하나만 만들어 주겠습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE EXTERNAL TABLE test_export_to_kafka_long(
	id bigint
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
    "kafka.topic" = "${토픽토픽}",
    "kafka.bootstrap.servers" = "${서버서버}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

테스트 데이터 전송

만들어둔 테이블에 데이터를 입력해보도록 하겠습니다.
테이블에 명시한 컬럼인 id에 대한 값을 필수로 추가합니다.
그 외에는 Kafka MetaData에 대한 정보로 이전과 동일하게 추가해줍니다.
insert into test_export_to_kafka_long values (1, null, null, -1, -1);

동일한 key를 가진 메시지가 동일한 partition에 들어가는 테스트

Producer로 사용할 Application을 만들때 보면, Kafka에 전송할 Message가 어떤 Partition에 저장될지 Partitioner를 지정할 수 있습니다.
하지만 Hive에서는 별도로 Partitioner를 지정할 수 없습니다.
Default Partitioner를 사용하는것으로 생각하고 진행하셔야 합니다.

Message의 Key가 동일하다면, 동일한 Partition에 들어가야 합니다.
하지만 Hive를 통해 전송해보는 것은 처음이기 때문에 동일한 Partition에 전송되는지 확인이 필요하였습니다.
이를 확인하기 위해 테스트를 진행하였습니다.

각기 다른 Key를 가진 여러개의 메시지를 전송한 뒤, 앞서 보낸 메시지와 동일한 Key를 가지지만 다른 내용으로 메시지를 전송합니다.
자세한 내용은 아래의 쿼리를 통해 확인할 수 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
CREATE EXTERNAL TABLE test_export_to_kafka_partition(
	id INT,
	value STRING
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
    "kafka.topic" = "${토픽토픽}",
    "kafka.bootstrap.servers" = "${서버서버}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

./kafka-topics.sh --create --topic ${토픽토픽} --bootstrap-server {서버서버} --config "cleanup.policy=compact" --partitions 4 --replication-factor 2

INSERT INTO TABLE test_export_to_kafka_partition 
VALUES
(1, '1', CAST('1' AS BINARY), NULL, -1, -1), 
("2", "2", CAST('2' AS binary), -1, -1, -1),
("3", "3", CAST('3' AS binary), -1, -1, -1),
("4", "4", CAST('4' AS binary), -1, -1, -1);

CREATE TEMPORARY TABLE test_partition(id INT, value STRING);

INSERT INTO TABLE test_partition 
VALUES 
(2, '2'),
(3, '3'),
(4, '4');

INSERT INTO table test_export_to_kafka_partition
SELECT 
    id,
    value,
    CAST(CAST(id AS string) AS binary) AS `_key`,
    NULL AS `_partition`,
    -1,
    -1
FROM test_partition;

결과화면은 첨부하지 않았지만 테스트를 진행하니
동일한 Key값을 가진경우, 동일한 Partition에 저장되는 것을 확인할 수 있었습니다.

정리

이번 글을 통해 Hive Table 데이터를 Kafka로 전송가능하다는 것을 확인할 수 있었습니다.

사실 이렇게 데이터를 전송하는게 유일한 방법도 아니고, 다른 좋은 방법들도 존재합니다.
(예를 들면 HiveServer2에 JDBC로 연결할 수도 있고, Spark를 사용할 수도 있습니다.)
설명한 내용들을 읽으셨다면 제약이 많다는 것을 느끼셨을 수 있습니다.
기능 설명에 작성하진 않았지만, 단순히 전송하는 경우라면 문제없지만 다른 테이블과 JOIN해서 바로 전송하게끔 쿼리를 실행한다면 에러가 발생할 수 있습니다.
이 문제를 회피하기위해서는 임시 테이블을 만들어 데이터를 저장한 뒤, 임시테이블을 사용해 데이터 전송하게끔 쿼리 작성해야합니다.
이 외에도 제가 파악하지 못한 제약이 있을수도 있습니다.
이처럼 많은 제약이 존재하지만, 별도로 Application을 만들 필요가 없다는게 큰 장점 입니다.

이러한 방법도 여러 선택지중 하나로 사용할 수 있다는 것을 알게되는 계기가 되었으면 합니다.
여기까지 읽어주셔서 감사합니다.

Hugo로 만듦
JimmyStack 테마 사용 중