Elasticsearch Spark

The process of transferring Elastic Search Index data to Hive using Spark is detailed.

Info

This article was originally written in Korean and has been translated using ChatGPT.

Introduction

Given that ES operates as a search engine, it excels in swiftly locating data fitting specific conditions.
But, the preconditioning for such capabilities demands an indexing stage, which in turn, consumes significant resources.
Owing to these limitations, extracting pertinent data from vast datasets becomes challenging.

In light of this, alternative data retrieval methods become necessary, with Hadoop often emerging as a primary consideration.
If the task of transmitting data to Elasticsearch falls on you or a team member, it’s feasible to efficiently store it in HDFS.
However, faced with the constraint of exclusive Elasticsearch usage without control over data transfer, what steps should be taken?

Clearly, the initial step would be to contemplate data extraction from Elasticsearch.
In addressing this scenario, I intend to highlight an apt methodology.
The procedure for ETLing data from Elasticsearch via a library in Spark will be detailed at an introductory ‘Getting Started’ level.

Objective

Perform ETL on a specific index from ES to Hive.

Required

  • Basic knowledge of Spark is essential.
  • Basic knowledge of Elasticsearch is essential.

Environment

  • Scala : 2.12
  • Spark : 3.x
  • Java : 8
  • ElasticSearch : 7.x

Dependency

  • elasticsearch-hadoop
  • elasticsearch-spark-30_2.12
  • calcite-core

If deploying ElasticSearch version 8, consulting the Release Note for the most suitable version is advised.
For the elastic-* library, utilizing version 7.17.0 or above is preferable.
An essential patch was incorporated in the 7.17.0 release, warranting this recommendation.

I remember utilizing the calcite-core during the deserialization process.
Omitting it can lead to complications when forming a Dataframe.

Getting Started

Connect

If you adhere to the guide, you can leverage Spark SQL.
But, as Spark SQL deals with structured data, discrepancies in data structures can pose challenges.

I won’t delve into the earlier versions (Spark version < 3.x).

Starting with Spark version 3, the use of SparkSession becomes the norm.
During the session creation, it’s imperative to provide the details of the desired Elasticsearch connection.

For a comprehensive understanding, refer to the Configuration documentation.
To establish a connection with ES, these two entries are mandatory.

  • es.nodes
  • es.port

When authentication is deemed necessary or if an HTTPS connection isn’t mandated, apply the settings below.

  • es.net.http.auth.user
  • es.net.http.auth.pass
  • es.net.ssl

For authentication specifics, refer to the provided link.
Should you operate ES within constrained settings like a WAN environment, there are essential parameters to incorporate.
Once this parameter is integrated, it exclusively employs the nodes delineated in es.nodes, bypassing all others.

  • es.nodes.wan.only

For in-depth details, please refer to the link.
When incorporating all the aforementioned configuration details, the resultant setting appears as below.

1
2
3
4
5
6
7
8
9
sparkSession = SparkSession.builder()
                           .master(sparkConfiguration.getMaster())
                           .appName(sparkConfiguration.getAppName())
                           .config("spark.es.nodes", "es-nodes")
                           .config("spark.es.port", "es-port")
                           .config("spark.es.net.http.auth.user", "user")
                           .config("spark.es.net.http.auth.pass", "password")
                           .config("spark.es.nodes.wan.only", true)
                           .config("spark.es.net.ssl", true)

ElasticSearch Hadoop offers four distinct avenues for Spark integration.

  • Native RDD
  • Spark Streaming
  • Spark SQL
  • Spark Structured Streaming

I’ve categorized the aforementioned four based on their nature.

  • Structured vs Unstructured
  • Streaming vs Non-Streaming(Batch)

If streaming is on the agenda, deliberation on Spark Streaming or Spark Structured Streaming might be apt.
In other instances, Native RDD or Spark SQL emerges as viable options.
Given the project’s objective to utilize ES data within Hive, we opted for Spark SQL, capable of embracing a structured format.
From version 1.5 onwards, the Dataframe can be instantiated as depicted below. (For guidance on versions preceding 1.5, consult the respective documentation.)

1
val df = sql.read.format("es").load("spark/index")

Although Java examples exist, the lion’s share of the explanations leans towards Scala.
Given our commitment to maintenance, we opted for a Java Project. Consequently, forthcoming examples will be showcased in Java.

1
2
3
Dataset<Row> df = sparkSession.read()
                              .format("es")
                              .load("template_data");

You have the capability to instantiate a DataFrame object using the aforementioned method.
Should the need arise to craft a Java DataFrame object, the procedure can be executed as delineated below.

1
2
3
4
Map<String, String> cfg = Maps.newHashMap();
// cfg serves as an object designated for incorporating configuration.
cfg.put("es.read.metadata", "true");
Dataset<Row> df = JavaEsSparkSQL.esDF(sparkSession, "template_data", cfg);

Read from ES

Through the methodologies outlined thus far, you’ve gained access to the DataFrame.
It’s time to fetch and utilize the data contained within.
Initially, we’ll ascertain the kind of data stored in the DataFrame.
Employing the printSchema() function, as demonstrated below, reveals the inherent structure of the DataFrame.

1
2
3
4
5
6
7
df.printSchema();

root
 |-- _class: string (nullable = true)
 |-- column1: long (nullable = true)
 |-- column2: string (nullable = true)
...

The DataFrame’s schema is formulated using the mapping specifications designated within the index.
If you’ve walked through this process firsthand, you’d discern the absence of metadata, such as the document id.
While this aspect may appear perplexing, it can be readily addressed by integrating the specified configuration value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Dataset<Row> df = sparkSession.read()
                                      .format("es")
                                      .option("es.read.metadata", "true")
                                      .load("template_data");

df.printSchema();

root
|-- _class: string (nullable = true)
|-- column1: long (nullable = true)
|-- column2: string (nullable = true)
...
|-- _metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

There might be another lingering query on your mind: how are array fields addressed? 🤔
The crux lies in the inability to designate an array type within the mapping.
As evident from the guide, it emphasizes declaring the data type associated with the element.
Could there be potential parsing issues due to misconfiguration of the DataFrame Schema? This question naturally ensues.
This hurdle is surmountable through appropriate configuration.
I, too, encountered this challenge and turned to this specific configuration for resolution.

1
2
3
4
5
Dataset<Row> df = sparkSession.read()
                                      .format("es")
                                      .option("es.read.metadata", "true")
                                      .option("es.read.field.as.array.include", "productMapping,sellerMapping")
                                      .load("template_data");

I’ve delineated which fields warrant parsing as arrays.
On the flip side, fields that should abstain from array parsing can also be identified.
This is facilitated by adjusting the es.read.field.as.array.exclude parameter.
Given that the data types between ES stored data and mapping harmonize, no discrepancies will arise during the ES data parsing phase.
Though direct DataFrame manipulation is feasible, establishing a temporary view empowers you to employ SQL, as exemplified below.

1
2
df.createTempView("tmp_front");
Dataset<Row> filtered_df = sparkSession.sql("select _metadata, " + String.join(",", columns) + " from tmp_front");

Customize Field

It’s not always a straightforward approach to utilize data from ES in its original form; transformations might be imperative at times.
Consider a scenario where a field comprises a serialized Json string; deserialization may precede its utilization as a Map.
Indeed, this illustration stems from my personal encounters. 😄
Prior to embarking on the processing aspect, let’s touch upon the fundamental workings of Spark.

Our crafted program is termed a ‘Driver’ within Spark’s context, assuming responsibilities such as supervising a myriad of workers, orchestrating the operational program executions, and monitoring tasks.
Data fetching and distributed tasks are designated to Workers (or Executors). With the collaborative efforts of numerous Workers, even voluminous datasets can be tackled efficiently in a truncated timeframe.
While it’s feasible for the Driver to manage the data churned out by Workers, this approach would undeniably decelerate operations.
The DataFrames sculpted via Spark SQL reside in a distributed fashion amongst Spark’s Workers.
Thus, for adept field processing, it’s imperative to facilitate function executions across an array of Workers.

For this purpose, implementation through the Serializable Interface is recommended.
Yet, retrofitting an existing Class to adopt the Serializable trait may pose challenges.
A viable alternative is formulating a static function to address this.

When outlining the process of registering a column post-Deserialize, the sequence would be as follows.

  • Develop a UDF (User Defined Function) as a static function for field manipulation.
  • Register this UDF with SparkSession.
  • Incorporate it as a new column in the DataFrame.

If you translate the aforementioned three steps into code, it would look like the following.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public static UDF1<String, HashMap<String, String>> jsonStringToMap = jsonStr -> {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.readValue(jsonStr, new TypeReference<HashMap<String, String>>() {});
};

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    sparkSession.udf().register("jsonStringToMap", jsonStringToMap, DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType));

    df.createTempView("tmp_front");
    Dataset<Row> filtered_df = sparkSession.sql("select _metadata, " + String.join(",", columns) + " from tmp_front");
    filtered_df = filtered_df.withColumn("dataFieldMap", functions.callUDF("jsonStringToMap", filtered_df.col("dataFields")));
}

With the ability to invoke the UDF function on each Worker, distributed DataFrames can be processed in parallel.

Conclusion

So far, we’ve explored the process of integrating Elasticsearch data into Hive using Spark.
While I’ve detailed only the Spark-SQL approach tailored to my requirements among the four available strategies, the methods for the others are fairly analogous.
I recommend selecting the method that aligns best with your specific context.

I embarked on this journey solely based on the guide documentation, and perhaps due to my limited expertise, I faced some challenges. 🫠
I hope this documentation serves as a helpful reference for those undertaking similar tasks.
Appreciate your understanding.

Reference

Built with Hugo
Theme Stack designed by Jimmy