Elasticsearch Spark

Elastic Search Index데이터를 Spark를 사용해 Hive에 저장하는 과정을 설명합니다.

Introduction

ES가 검색엔진이다보니, 설정한 조건에 맞는 데이터를 빠르게 찾아낼 수 있습니다.
하지만 조건으로 설정하기 위해서는 색인하는 과정이 필요하고, 색인을 하기위해서는 많은 리소스가 필요로 합니다.
이러한 제약조건으로 인해, 많은 양의 데이터 속에서 조건에 맞는 데이터를 찾아 효율적으로 추출하기는 어렵습니다.

따라서 다른 방법으로 데이터를 조회해야할텐데, 이러한 상황이라면 대부분의 경우 Hadoop을 사용을 검토합니다.
만약 Elasticsearch에 저장할 데이터를 전송하는 역할을 팀원 또는 본인이라면, 적절한 방법으로 HDFS에 기록할 수 있습니다.
하지만 데이터 전송에 대한 제어권이 없고, Elasticsearch만 사용이 가능하다면 어떻게 해야할까요?

당연히 Elasticsearch에서 데이터 추출을 검토해야할 것입니다.
이러한 상황에 사용하기 적합한 방법을 설명하고자 합니다.
Spark에서 library를 사용해, Elasticsearch 데이터를 ETL하는 과정을 Getting Started 정도의 내용으로 설명합니다.

Objective

ES에 색인되어있는 특정 index를 Hive로 ETL한다.

Required

  • Spark에 조금의 지식이 필요합니다.
  • Elasticsearch에 조금의 지식이 필요합니다.

Environment

  • Scala : 2.12
  • Spark : 3.x
  • Java : 8
  • ElasticSearch : 7.x

Dependency

  • elasticsearch-hadoop
  • elasticsearch-spark-30_2.12
  • calcite-core

ElasticSearch 8버전으로 사용한다면, Release Note를 보고 적절한 버전을 찾는것을 권장합니다.
elastic-* 라이브러리는 7.17.0 버전 이상을 사용하는게 좋습니다.
7.17.0 버전에서 중요한 패치가 이루어졌기 때문입니다.

calcite-core는 deserialize할때 사용했던것으로 기억합니다.
추가하지않으면, Dataframe을 생성하는 과정에 문제가 발생합니다.

Getting Started

Connect

가이드대로 하면 Spark SQL을 사용할 수있습니다. 하지만 Spark SQL은 구조화된 데이터이므로, 동일한 데이터구조를 가지지않을경우 문제가 생길수있습니다.

구버전은(Spark version < 3.x) 설명하진 않겠습니다.

Spark3버전부터는 SparkSession을 생성해 사용하게 됩니다.
Session을 생성하면서, 연결하고자 하는 es의 정보들을 입력해주어야합니다.

Configuration을 읽으면 상세한 내용을 확인할 수 있습니다.
ES에 연결하기위해서는 아래의 2개는 필수적으로 작성해주어야 합니다.

  • es.nodes
  • es.port

인증이 필요하거나, https연결이 필요하지않을 경우 아래와 같은 설정을 입력해줍니다.

  • es.net.http.auth.user
  • es.net.http.auth.pass
  • es.net.ssl

인증과 관련된 내용은 링크를 확인해주세요. 제한된 환경인 WAN환경에 구축된 ES를 사용해야하는 경우 추가해주어야하는 파라미터가 있습니다. 이 파라미터를 추가할 경우, es.nodes 에 명시된 nodes만 사용합니다. 다른 node는 탐색하지 않습니다.

  • es.nodes.wan.only

상세 내용은 링크에서 보실수 있습니다. 위와같은 설정 정보들을 모두 사용하면 아래와 같습니다.

1
2
3
4
5
6
7
8
9
sparkSession = SparkSession.builder()
                           .master(sparkConfiguration.getMaster())
                           .appName(sparkConfiguration.getAppName())
                           .config("spark.es.nodes", "es-nodes")
                           .config("spark.es.port", "es-port")
                           .config("spark.es.net.http.auth.user", "user")
                           .config("spark.es.net.http.auth.pass", "password")
                           .config("spark.es.nodes.wan.only", true)
                           .config("spark.es.net.ssl", true)

ElasticSearch hadoop은 Spark가 접근해올 수 있는 방법을 4가지 제공하고있습니다.

  • Native RDD
  • Spark Streaming
  • Spark SQL
  • Spark Structured Streaming

위 4가지를 성격에 따라 나누어 보았습니다.

  • Structured vs Unstructured
  • Streaming vs Non-Streaming(Batch)

Streaming을 고려해야한다면, Spark Streaming이나 Spark Structured Streaming을 검토해볼수있습니다.
그게아니라면 Native RDD나 Spark SQL을 고려해볼 수 있습니다. ES데이터를 Hive에서 사용할 목적으로 진행하는 프로젝트이므로, Structure 형태를 가질수 있는 Spark SQL로 진행하였습니다. 1.5 이상부터는 아래처럼 Dataframe을 생성할 수 있습니다. (1.5 이전 버전은 가이드 문서를 통해 확인해 주세요.)

1
val df = sql.read.format("es").load("spark/index")

예시는 Java도 있긴하지만, Scala로 위주로 설명하는게 대다수입니다.
유지 보수를 위해 Java Project로 진행해야했기때문에, 사용 예시는 앞으로 Java로 보여드리겠습니다.

1
2
3
Dataset<Row> df = sparkSession.read()
                              .format("es")
                              .load("template_data");

위와같은 방법으로 DataFrame 객체를 생성할 수 있습니다.
만약에 Java DataFrame 객체를 생성해야 한다면, 아래와 같이 할 수 있습니다.

1
2
3
4
Map<String, String> cfg = Maps.newHashMap();
// cfg는 configuration을 추가하기위한 객체입니다.
cfg.put("es.read.metadata", "true");
Dataset<Row> df = JavaEsSparkSQL.esDF(sparkSession, "template_data", cfg);

Read from ES

지금까지 설명드린 방법을 통해 DataFrame에 접근할 수 있게 되었습니다.
이제는 데이터를 읽어와서 사용해보도록 하겠습니다.
우선은 DataFrame에 어떤 데이터들이 있는지 확인해보겠습니다.
아래와 같이 printSchema()함수를 사용하면, DataFrame이 어떤 구조를 갖추었는지 알 수 있습니다.

1
2
3
4
5
6
7
df.printSchema();

root
 |-- _class: string (nullable = true)
 |-- column1: long (nullable = true)
 |-- column2: string (nullable = true)
...

index에 설정된 mapping정보를 토대로 DataFrame의 Schema를 구성합니다.
여기까지 직접 진행해보시면 아시겠지만, document id와 같은 metadata가 없습니다. 이부분은 좀 의아하긴 하지만, 설정값을 추가하면 쉽게 해결할 수 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Dataset<Row> df = sparkSession.read()
                                      .format("es")
                                      .option("es.read.metadata", "true")
                                      .load("template_data");

df.printSchema();

root
|-- _class: string (nullable = true)
|-- column1: long (nullable = true)
|-- column2: string (nullable = true)
...
|-- _metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

또 한가지 의아한 점이 있을 겁니다. array 필드는 어떻게 되는거지? 🤔
mapping에는 array 타입을 지정해줄 수 없기 때문입니다.
가이드 문서를 살펴봐도 element에 사용되는 데이터 타입을 명시하도록 안내하고 있습니다.
DataFrame Schema가 잘못 지정되서, 파싱 에러가 발생할 수 있지 않을까? 질문으로 이어집니다.
이 부분도 마찬가지로 설정으로 해결가능하게끔 만들어져 있습니다.
저도 이 문제를 겪었다보니, 설정으로 해결하였습니다.

1
2
3
4
5
Dataset<Row> df = sparkSession.read()
                                      .format("es")
                                      .option("es.read.metadata", "true")
                                      .option("es.read.field.as.array.include", "productMapping,sellerMapping")
                                      .load("template_data");

어떤 필드를 array 로 파싱해야하는지 명시하였습니다.
이것과 반대로 array로 파싱하면 안되는 필드를 명시할 수도 있습니다.
es.read.field.as.array.exclude 값을 설정해주면 됩니다.
ES에 저장된 데이터들과 mapping의 데이터 타입이 불일치 하지않는 이상, 여기까지 진행했다면 ES데이터를 파싱하는데 문제가 발생하지 않습니다.
DataFrame을 직접 Handling할 수도 있지만, 임시적으로 사용할 View를 생성해주면 아래처럼 SQL을 사용할 수도있습니다.

1
2
df.createTempView("tmp_front");
Dataset<Row> filtered_df = sparkSession.sql("select _metadata, " + String.join(",", columns) + " from tmp_front");

Customize Field

ES에 있는 데이터를 그대로 사용하는것이 아니라, 변환을 거쳐 사용해야할 수도 있습니다.
예를들면 Serialized Json String이 필드로 존재한다면, Deserialize한 후에 Map으로 사용하는 경우가 있습니다.
물론 예시는 저의 사례입니다. ㅎ
Processing을 하기에 앞서 Spark가 어떻게 실행되는지 간단히 이야기해보겠습니다.

우리가 작성한 프로그램은 Spark에서 Driver라고 불리며, 다수의 워커 관리 및 운영프로그램의 실행관리 및 모니터링과 같은 역할을 수행합니다.
데이터 조회와 분산처리의 경우에는 Worker(Executor)가 할당되어 처리하게됩니다. 다수의 Worker가 처리하기 때문에 큰 규모의 데이터를 처리하더라도 빠른시간안에 처리가 가능합니다.
물론 Worker에서 처리한 데이터를 Driver에서 처리하게 할수는 있지만, 상당히 느려질 것입니다.
Spark SQL로 만든 DataFrame은 Spark의 Worker에 분산되어 저장되어있습니다.
따라서 필드의 Processing을 위해서는 다수의 Worker에서 함수를 실행할 수 있어야합니다.

이를 위해서는 Serializable Interface로 구현해주면 됩니다.
하지만 이미 만들어진 Class를 Serializable로 변경하는 작업은 쉽지않을 겁니다.
대안으로 static function으로 만들어서 해결할 수 있습니다.

예시로 말씀드렸던 Deserialize한 후 컬럼으로 등록하는 과정을 나열하면 아래와 순서대로 되겠습니다.

  • 필드를 가공할 수 있는 UDF(User Defined Function)을 static function으로 만든다.
  • SparkSession에 UDF 등록
  • DataFrame의 컬럼으로 추가

위의 3가지 과정을 코드로 작성하면 아래와 같습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public static UDF1<String, HashMap<String, String>> jsonStringToMap = jsonStr -> {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.readValue(jsonStr, new TypeReference<HashMap<String, String>>() {});
};

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    sparkSession.udf().register("jsonStringToMap", jsonStringToMap, DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType));

    df.createTempView("tmp_front");
    Dataset<Row> filtered_df = sparkSession.sql("select _metadata, " + String.join(",", columns) + " from tmp_front");
    filtered_df = filtered_df.withColumn("dataFieldMap", functions.callUDF("jsonStringToMap", filtered_df.col("dataFields")));
}

이렇게 UDF 함수를 각 Worker에서 호출할 수 있게 됨으로서, 분산저장된 DataFrame을 병렬로 Processing할 수 있습니다.

Conclusion

여기까지 Spark를 사용해 Elasticsearch 데이터를 Hive에 연동하는 과정까지 진행해보았습니다. 4가지 방식중 저의 니즈에 맞는 Spark-SQL만 설명드렸지만, 사용하는 방법은 나머지도 비슷비슷합니다. 상황에 맞는 적절한 방식을 선택해서 진행하시면 되겠습니다.

Guide 문서만 보고 시작을 헀는데, 실력이 부족해서 그런지 삽질 좀 했었습니다. 🫠
이와 비슷한 일을 하셔야하는 분들에게 도움이 되었으면 해서 기록으로 남겨봅니다.
감사합니다.

Reference

Hugo로 만듦
JimmyStack 테마 사용 중