Hive Kafka Integration

Proceed with the configuration in Hive for integration.

Info

This article was originally written in Korean and has been translated using ChatGPT.

Overview

Hive is a data warehouse software that allows for reading and writing large volumes of data stored in Hadoop using SQL syntax.
In simple terms, it can be thought of as a large-scale data processing database operating on Hadoop.

By default, it supports HDFS and offers an open DataSourcing feature that allows access to Hbase or other DBMS. It also supports Kafka, which is widely used as a Message Queue. In this series, I will document the configuration for integrating Hive with Kafka, as well as the process of reading and writing messages.

In this article, the focus is on the necessary configurations and essential information for use.
If you are interested in usage examples, please read the next article.

This was written based on the official document
Although it is not stated in the document, I intend to detail additional information required for use and examples that I personally tested.

This was done in Hive version 3.x.

Install

In fact, no separate installation is required.
As you may know if you’ve looked at the official documentation, it’s already built into Hive.
While it might be hard to imagine, you can integrate with Kafka using SQL.

Create Table

To integrate Hive and Kafka, a linkage must be created.
Just as in Spring, you can easily create a connection using Binder and Binding, in Hive, you can do this using StorageHandler.
You can specify StorageHandler when defining a table. Additionally, with some more configurations, you can use it seamlessly.

However, there are a few things to be cautious of when creating a table in Hive that integrates with Kafka.

The first thing is that you must create it as an EXTERNAL table.
Since the method we will be using involves Hive accessing and using the data stored in Kafka, you need to add the EXTERNAL keyword.

The second thing is that you must specify the KafkaStorageHandler.
When defining the table, it should include STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'.
Only with this specified can you integrate and use Kafka.

The third thing is that you need to configure the table.
Using the TBLPROPERTIES keyword, you can set the properties for the table.
Within this, you must write the configuration necessary for integrating with Kafka.

By applying all three points and writing the DDL, it can be crafted as shown in the example below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE EXTERNAL TABLE kafka_table (
    `timestamp` TIMESTAMP,
    `page` STRING,
    `newPage` BOOLEAN,
    `added` INT, 
    `deleted` BIGINT,
    `delta` DOUBLE
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES ( 
    "kafka.topic" = "test-topic",
    "kafka.bootstrap.servers" = "localhost:9092"
);

Kafka metadata

Messages stored in Kafka consist not only of the Payload but also have four pieces of metadata: Key, Partition, Offset, and Timestamp.
These data can also be used in Hive.
The previously mentioned KafkaStorageHandler, used when defining the table, automatically registers these metadata as columns.

  • __key (byte array)
  • __partition (int32)
  • __offset (int64)
  • __timestamp (int64)

I will elaborate on the usage methods in the next article where I’ve written real-use examples.

Precautions When Using Avro Format

If you are using the Avro format through the Confluent Connector, you need to remove 5 bytes from the message.
Of these 5 bytes, 1 byte is the magic byte, and 4 bytes correspond to the schema id of the schema registry.
This can be resolved by setting "avro.serde.type"="skip" and "avro.serde.skip.bytes"="5".
If you are using avro, it would be beneficial to thoroughly refer to the guide for detailed information.

Support for the confluent avro format seems to be added in Hive version 4.0.
If you are using version 3.x, it would be good to be cautious. (Because I had a hard time with it…)

Serde(Serializer, Deserializer)

If you’ve ever sent data from an Application to Kafka, you’ve probably had experience specifying a serializer and deserializer.
Similarly, Hive also offers various serializers and deserializers.

Supported Serializers and Deserializers description
org.apache.hadoop.hive.serde2.JsonSerDe JSON
org.apache.hadoop.hive.serde2.OpenCSVSerde CSV
org.apache.hadoop.hive.serde2.avro.AvroSerDe AVRO
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe binary
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Plain Text

The method of specification is simpler than you might think.
You can easily apply it by adding it to TBLPROPERTIES.
For instance, if the data format you wish to read from Kafka is JSON or if you want to send data in JSON format, you can add it as shown below.
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe"

Authentication

Kafka also offers a feature that requires authentication to grant permissions to read and write data.
The authentication details needed for using Kafka can also be provided through Hive.

In my case, I didn’t add any authentication and used it, so I will document it as an instance without authentication. If you are in a situation where you need authentication, please refer to and proceed with the official documentation.

If you haven’t added the TBLPROPERTIES value for authentication, an error occurs when you attempt to connect.
You might think, “If I haven’t set it up, shouldn’t it naturally be usable without authentication?” (That’s what I thought.)
By default, Hive tries to authenticate using an id and password.
Therefore, you have to add a configuration to use it without authentication.

The official documentation didn’t provide a specific solution for this problem, so it was a challenging aspect for me.
To get straight to the point, you can resolve this by adding TBLPROPERTY.

  • In the case of a producer : "kafka.producer.security.protocol" = "PLAINTEXT"
  • In the case of a consumer : "kafka.consumer.security.protocol" = "PLAINTEXT"

If you want to delve deeper into authentication, it would be helpful to refer to this document.
As I’ll mention later, if you set kafka. as a prefix when configuring, you can pass the configuration value to Kafka.

Key Serializer

The Serializers I explained earlier were Value Serializers.
Though more are provided than you might expect, conversely, only the Byte Serializer is supported for Key Serializer.
Even if you want to change and use a different Serializer, it’s not possible to do so.
I’m not sure why it was implemented this way, but you can understand the details by looking at the code.
As you might be curious, I’m also providing a link.

Additionally

Beyond what has been written so far, there may be additional configurations you’d like to add when using Kafka.
Although it’s not described in the official documentation, you can pass configuration values to be used in Kafka.

  • In the case of a producer “kafka.producer.{property}” = “{value}”
  • In the case of a consumer “kafka.consumer.{property}” = “{value}” By adding to TBLPROPERTIES, the configuration values are passed to Kafka, allowing you to apply your desired settings.

If you’re curious about how the configuration values are passed or the implemented code, it would be helpful to refer to the link below.

Summary

Until now, we’ve looked into the necessary configurations and helpful information for reading and writing to Kafka from Hive.
In the next article, I will document the process of reading messages from Kafka in Hive.
Thank you.

Built with Hugo
Theme Stack designed by Jimmy