Write message to kafka using hive

This is a demonstration of data migration from Hive to Kafka.

Info

This article was originally written in Korean and has been translated using ChatGPT.

Overview

In contrast to the previous discussions, I’ll delve into the process of transmitting messages from Hive to Kafka.
Viewed from Kafka’s standpoint, Hive takes on the role of a Producer.

Drawback

While transmitting from Hive to Kafka, core functionalities are catered for, making it suitable for most scenarios.
Yet, there are a few unsupported features, including the Header.
If the need arises to incorporate a Header, considering an alternative application over Hive is advised.

Considerations for Kafka metadata

When transmitting a message from the Producer to Kafka, you can incorporate associated metadata.
If the metadata is omitted, Kafka is understood to generate it automatically.
This principle holds true for Hive as well. Since __timestamp, __partition, and __offset are attributes tracked by Kafka, writing in the mentioned manner allows Kafka to automatically populate these fields.

  • __timestamp : -1
  • __partition : null
  • __offset : -1

However, the available value range for __key is limited, making the choices restricted, but careful configuration is advised.
Kafka utilizes the key for determining the partition, which can impact message distribution. Additionally, the application of the key depends on the cleanup.policy configuration.
In Kafka, values are assigned to the same partition based on the key. Therefore, when configured as log.cleanup.policy = compact, it’s advantageous to input the key as a byte array.

  • Integer to byte array : cast(cast(1 AS STRING) AS BINARY)
  • String to byte array : cast(‘1’ AS BINARY)

JSON test case

I’ll proceed to transmit a message in JSON format to Kafka.

Creating a topic

I’m creating a topic to store test messages.
Below is an example that leverages a script bundled with the Kafka installation to establish a topic.
./kafka-topics.sh --create --topic "${토픽토픽}" --replication-factor 1 --partitions 1 --bootstrap-server "{servers}"

Creating a table

Having set up a topic for transmission to Kafka, we’ll now establish a table responsible for the actual transmission process.
Enable the Kafka Handler by designating the KafkaStorageHandler.
The configuration for TBLPROPERTIES aligns closely with the guidelines previously mentioned.
But, given its role in transmitting messages, set it as a producer instead of a consumer.
The example provided below can serve as a reference.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE EXTERNAL TABLE test_export_to_kafka(
	`id` STRING,
	`value` STRING
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
    "kafka.topic" = "${토픽토픽}",
    "kafka.bootstrap.servers" = "${servers}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

Sending test data

We will proceed to insert data into the table we created.
It’s essential to provide values for the columns explicitly mentioned in the table, which are id and value.
Regarding Kafka Metadata, you can include them in the following order: key, partition, offset, and timestamp.
insert into test_export_to_kafka values ('1', '1', null, null, -1, -1);

Test case for ONLY LONG TYPE

This time, we will attempt to send data in PLAIN TEXT format, rather than JSON.

Creating a topic

It is similar to the JSON test case I explained earlier.
We are creating a topic to store test messages.
The example below creates the topic using a script provided with Kafka installation.
./kafka-topics.sh --create --topic "${topic}" --replication-factor 1 --partitions 1 --bootstrap-server "{servers}"

Creating a table

Kafka Handler and TBLPROPERTIES are configured in the same manner as previously described in the test case.
However, this time, as we are using the PLAIN TEXT format, we will create only one column.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE EXTERNAL TABLE test_export_to_kafka_long(
	id bigint
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
    "kafka.topic" = "${topic}",
    "kafka.bootstrap.servers" = "${servers}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

Sending test data

We will proceed to insert data into the table we created.
It is mandatory to include a value for the id column, as specified in the table.
For the remaining columns, you can add Kafka Metadata information as done previously.
insert into test_export_to_kafka_long values (1, null, null, -1, -1);

Testing messages with the same key being placed in the same partition

When creating an application to be used as a Producer, you have the option to specify a Partitioner that determines the partition where Kafka will store the message.
However, in Hive, there is no separate provision to specify a Partitioner.
It should be assumed that the Default Partitioner is in use.

If the message keys are identical, they should be placed in the same partition.
However, since this is the first attempt to transmit through Hive, it was necessary to verify whether they are indeed directed to the same partition.
Tests were conducted to ascertain this.

Following the transmission of multiple messages with different keys, we proceed to send a message with the same key as the previous one but with different content.
You can access detailed information through the query provided below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
CREATE EXTERNAL TABLE test_export_to_kafka_partition(
	id INT,
	value STRING
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
    "kafka.topic" = "${topic}",
    "kafka.bootstrap.servers" = "${servers}",
    "kafka.producer.security.protocol" = "PLAINTEXT"
);

./kafka-topics.sh --create --topic ${topic} --bootstrap-server {servers} --config "cleanup.policy=compact" --partitions 4 --replication-factor 2

INSERT INTO TABLE test_export_to_kafka_partition 
VALUES
(1, '1', CAST('1' AS BINARY), NULL, -1, -1), 
("2", "2", CAST('2' AS binary), -1, -1, -1),
("3", "3", CAST('3' AS binary), -1, -1, -1),
("4", "4", CAST('4' AS binary), -1, -1, -1);

CREATE TEMPORARY TABLE test_partition(id INT, value STRING);

INSERT INTO TABLE test_partition 
VALUES 
(2, '2'),
(3, '3'),
(4, '4');

INSERT INTO table test_export_to_kafka_partition
SELECT 
    id,
    value,
    CAST(CAST(id AS string) AS binary) AS `_key`,
    NULL AS `_partition`,
    -1,
    -1
FROM test_partition;

Even though you didn’t provide a result screen, the test confirmed that data with the same key value is stored in the same partition.

Conclusion

I have confirmed through this article that it is possible to transmit Hive Table data to Kafka.

In fact, there are several methods to transfer data, and there are other good alternatives available. (For example, you can connect to HiveServer2 via JDBC or use Spark.)
While I didn’t explicitly mention it in the feature description, you might have noticed that there are some limitations. If you execute queries that involve JOINs with other tables and attempt to send data immediately, errors may occur, as you pointed out. To overcome this issue, you should create temporary tables to store the data and then write queries that use these temporary tables for data transmission.
There may be other constraints that I haven’t identified.
Despite these constraints, a significant advantage is that you don’t need to create a separate application.

I hope this information serves as an opportunity for you to realize that this method can be one of the choices.
Thank you for reading this far.

Built with Hugo
Theme Stack designed by Jimmy