Read message from kafka using hive

This is a tutorial on how to read data from Kafka into Hive.

Info

This article was originally written in Korean and has been translated using ChatGPT.

Overview

In our previous post, we delved into the configurations necessary for integrating Kafka with Hive.
Moving forward, I’ll elaborate on the processes of reading from and writing to Kafka.
In this piece, we’ll kick off by pulling data from Kafka.
You can think of it as leveraging Hive in the role of a Kafka Consumer for better clarity.
One distinction to note is that it doesn’t get assigned as a Consumer Group, unlike typical Consumers.

JSON

This tutorial proceeds under the assumption that the data being read from Kafka is formatted in JSON.

Sample data

I’ll walk you through using the following data structure as our reference example. Feel free to expand by adding more fields based on your requirements.

1
2
3
4
{
    "id": 12345678,
    "value:": "hello"
}

Table Definition

Let’s define a table mirroring the structure of the JSON schema.
The ‘id’ field carries an integer value, while the ‘value’ field is a string.
Within Hive, integer values can be represented using the INT or BIGINT data types. Strings, on the other hand, are characterized as STRING.

Drawing from the provided information, the DDL query can be constructed as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE EXTERNAL TABLE test_export_to_kafka(
    `id` BIGINT,
    `value` STRING
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
    "kafka.topic" = "etl.hive.catalog.used",
    "kafka.bootstrap.servers" = "{broker servers}",
    "kafka.consumer.security.protocol" = "PLAINTEXT"
);

The table name, test_export_to_kafka, can be adjusted to your preference.
Columns within the table should be outlined in alignment with the JSON structure being ingested.
Ensure that column names mirror the JSON keys, and column data types are consistent with the corresponding JSON values.
Given that Hive offers a broader spectrum of data types compared to JSON, I’d recommend referring to this documentation to pick the most suitable type.
By assigning "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe", you’re enabling Hive to deserialize JSON data.
No other particulars in TBLPROPERTIES demand attention.
Simply adjust kafka.topic to the desired Kafka topic name.

READ

1
2
SELECT *
FROM test_export_to_kafka

If everything has been configured correctly, you should be able to view the messages within the topic.
On the other hand, if there’s a misconfiguration, you might encounter error messages or see columns rendered as NULL.
Ensure that your setup doesn’t deviate from the guidelines we’ve discussed up to this point.

TEXT

This tutorial focuses on reading PLAIN TEXT, as opposed to structured data such as JSON.

Sample data

Given it’s PLAIN TEXT, the example is straightforward.
We’ll utilize basic numerical values for demonstration purposes.

1
2
3
1234
5678
91011

Table Definition

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE EXTERNAL TABLE test_export_to_kafka(
	`id` BIGINT
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES(
    "kafka.serde.class" = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
    "kafka.topic" = "etl.hive.catalog.used",
    "kafka.bootstrap.servers" = "{servers}",
    "kafka.consumer.security.protocol" = "PLAINTEXT"
);

This aligns closely with the JSON test case we touched on previously.
The distinctions lie in the distinct designations for Serde and the columns.
For the Serde configuration, switch to "kafka.serde.class" = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe".

READ

1
2
SELECT *
FROM test_export_to_kafka

If everything has been configured properly, you should be able to view the messages from the topic.
However, should there be a misconfiguration, you might either see error messages or columns displaying as NULL.
Please ensure your configurations align with the guidelines discussed up to now.

Utilizing Kafka Metadata

In Kafka, alongside the primary messages, associated metadata is also stored.
When shaping a table, this metadata seamlessly gets incorporated as columns. Consequently, when Hive fetches a message, it concurrently retrieves this accompanying metadata.
Given that metadata is configured as columns, it opens up avenues for their application within SQL conditional statements.
For illustration, you might opt to pull data entered post a designated timestamp, or exclusively data residing in a particular partition.

“For fields like __partition and __offset, their employment of integer types makes them particularly user-friendly.

  • ex1) __partition = 0
  • ex2) __offset > 5000

Yet, with __timestamp, it’s captured as int64 due to its reliance on the unix_timestamp.
This diverges from the typical time notation we’re accustomed to, necessitating a conversion and resulting in a less-than-convenient experience.
For ease in deploying within conditions, it’s advisable to modify and utilize it as illustrated below.
The function to employ here would be unix_timestamp, as demonstrated in the subsequent example.

unix_timestamp('2022-03-08 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') * 1000

  • For the first parameter, specify the time you intend to transform.
  • For the second parameter, indicate the time format corresponding to what you specified in the first.

With the function in place, it’s possible to transform a specific time into unix_timestamp, paving the way for comparison activities involving __timestamp.
Yet, bear in mind that the __timestamp fetched via Hive operates on a microsecond scale. As a result, some additional computations are required to align the units, which is why a multiplication by 1000 was executed. Here’s an illustrative example.

ex) unix_timestamp('2022-03-08 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS') * 1000 < __timestamp

Supplementary Example

1
2
3
4
5
6
7
8
9
-- print to UTC timestamp
select FROM_UTC_TIMESTAMP(`__timestamp`, 'JST') from test_kafka_product_received limit 1;

-- print timestamp, unixtime to timestamp
select current_timestamp(), from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss.SSS');

-- print converted unixtime
select unix_timestamp('2022-03-07 14:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS');
select unix_timestamp('2022-03-07 06:11:41', 'yyyy-MM-dd HH:mm:ss');

정리

With the insights from this piece, you’re now equipped to fetch JSON or PLAIN-TEXT data from Kafka utilizing Hive.
Leveraging metadata for filtering can enable effortless extraction of Kafka messages that align with specific criteria.
A noted limitation is the inability to join this with other tables housed within Hive (I faced authentication-related challenges during my attempts).
Such constraints might curtail its full potential, but with further enhancements, its utility could significantly rise.

In the subsequent post, I’ll delve into the procedure of transmitting data from Hive to Kafka.
Many thanks.

Built with Hugo
Theme Stack designed by Jimmy