![[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2Ft6KLl%2FbtsC7pHSBni%2Ff6K01DwyHPpcoippkUZIOk%2Fimg.png)
🧐 개요
Apache Kafka를 통해 발행된 메세지의 Key 및 Value는 (Kafka의 자체적인 로직으로 인해) 인코딩되어 있습니다. 따라서 해당 데이터를 디코딩하지 않으면 읽을 때 데이터가 2진법으로 표기됩니다. Spark-Streaming-Kafka 앱의 경우에도 수신한 Kafka 메세지를 특정 데이터 타입으로 CAST 하도록 공식 가이드에서도 권고하고 있습니다. 메세지가 일반적인 STRING 또는 INTEGER 데이터 타입인 경우에는 그냥 CAST를 진행해 주면 아무런 문제가 없습니다.
문제는 메세지가 JSON 데이터 타입인 경우에 발생하는데, 현재의 Spark는 인코딩된 STRUCT 데이터를 자동으로 디코딩해주지 못합니다. 디코딩을 위해서는 기존 데이터가 가지고 있던 스키마(schema) 정보가 필요합니다. 따라서 복잡한 구조의 JSON 데이터를 전송하거나, 구조가 유동적으로 바뀌는 JSON 데이터를 전송하는 경우 스크립트를 일일히 수정해주어야 한다는 문제가 있습니다.
정리하자면 다음과 같이 이야기할 수 있을 것입니다.
- Spark-Streaming-Kafka 앱에서 JSON 데이터를 다루기 위해서는 2가지가 필요하다
- Kafka 채널에 JSON 데이터 타입의 메세지 발행하기
- Spark 앱에 디코딩에 사용할 JSON 데이터 스키마 제공하기
이번 포스트는 Spark 앱에 JSON 데이터의 스키마를 제공하는 과정을 설명합니다. PySpark 기반의 데이터 가공 로직을 사용하고 있으므로 스크립트는 Python 문법으로 작성되어 있습니다.
🔎 외부 자료 참조하기
우선, 관련 내용을 정리하신 분이 있어 글을 참고하였습니다.
해당 글에서는 Spark 어플리케이션에서 Kafka 채널에 발행된 JSON 데이터를 읽는 로직을 설명하고 있습니다.
Reading json message from Kafka topic and process using Spark Structured Streaming and write it…
Spark Structured Streaming example
medium.com
해당 스크립트에서는 Struct 타입의 스키마 정보를 직접 입력하는 방식을 사용하고 있습니다. 스키마의 구조가 앞으로 바뀔 일이 없다면, 이런 식으로 직접 스키마를 입력하는 것이 좋은 방법일 것입니다. 스키마를 제공하기 위해서 준수해야 할 형식자를 확인할 수 있습니다.
schema = StructType([
StructField("row_id", DoubleType()),
StructField("res_id", DoubleType()),
...
StructField("user_ratings", ArrayType(StructType([
StructField("rating_text", StringType()),
StructField("user_id", StringType()),
StructField("rating", StringType())
]))),
])
제공해야 할 스키마는 각 필드의 데이터 타입을 StructField 모듈을 사용하여 정의하며, 컬럼명과 데이터 타입을 내부에 기재하고 있습니다.
🖍️ 1번 방법 - 스키마 정보 제공하기
이제 스크립트를 작성해 보도록 하겠습니다. 전체 스크립트의 구성은 아래와 같습니다.
def infer_schema(json_data:dict):
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType
def infer_field(name, value):
if isinstance(value, bool):
return StructField(name, BooleanType(), True)
elif isinstance(value, int):
return StructField(name, IntegerType(), True)
elif isinstance(value, list):
element_type = infer_field(name, value[0]).dataType
return StructField(name, ArrayType(element_type), True)
elif isinstance(value, dict):
nested_fields = [infer_field(sub_name, sub_value) for sub_name, sub_value in value.items()]
return StructField(name, StructType(nested_fields), True)
else:
return StructField(name, StringType(), True)
fields = [infer_field(name, value) for name, value in json_data.items()]
return StructType(fields)
def receive_json(batchDF, batchId, spark):
from pyspark.sql.functions import from_json
import json
try:
# Create Schema
json_data = batchDF \
.selectExpr(
"CAST(value AS STRING)"
) \
.select("value").first()[0]
schema = infer_schema(json.loads(json_data))
print(schema)
# Change Datatype
df = batchDF \
.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)"
) \
.withColumn("value", from_json("value", schema))
# Test
df.write \
.format("console") \
.save()
# Test
df \
.selectExpr(
"key",
"value.message",
"value.date"
) \
.write \
.format("console") \
.save()
except Exception as E:
print(f">>>>>>>> {E}")
batchDF \
.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)"
) \
.write \
.format("console") \
.save()
해당 스크립트는 Spark-Streaming-Kafka 어플리케이션의 query에 입력하는 함수 모듈의 내용입니다.
이하에서는 각 스크립트의 세부적인 내용을 설명하겠습니다.
스키마 구조 생성하기
Python의 json 타입 데이터를 호출하여 스키마 데이터를 생성하는 모듈이 필요합니다. JSON 데이터의 각 열이 어떤 데이터 타입인지 확인하고, 그 순서에 맞게 스키마를 생성하도록 모듈을 구성해야 합니다.
def infer_schema(json_data:dict):
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType
def infer_field(name, value):
if isinstance(value, bool):
return StructField(name, BooleanType(), True)
elif isinstance(value, int):
return StructField(name, IntegerType(), True)
elif isinstance(value, list):
element_type = infer_field(name, value[0]).dataType
return StructField(name, ArrayType(element_type), True)
elif isinstance(value, dict):
nested_fields = [infer_field(sub_name, sub_value) for sub_name, sub_value in value.items()]
return StructField(name, StructType(nested_fields), True)
else:
return StructField(name, StringType(), True)
fields = [infer_field(name, value) for name, value in json_data.items()]
return StructType(fields)
infer_field 함수에서는 입력받은 컬럼명과 데이터의 타입을 확인하여 StructField를 반환하는 로직을 수행하고 있습니다. 데이터 타입이 STRING, INTEGER 또는 BOOL인 경우에는 단순히 출력을 진행하면 되지만, ARRAY 또는 STRUCT 구조인 경우에는 하위 항목이 포함되어 있으므로 다음과 같이 처리해야 합니다.
- INTEGER: 데이터의 0번째 인덱스에 대하여 함수의 재귀적 호출
- STRUCT: 전체 데이터의 Key, Value 값에 대하여 함수의 재귀적 호출
이런 식으로 함수를 재귀적으로 호출할 경우 하위 컬럼에 대한 반복 작업을 단일 함수로 쉽게 구현할 수 있습니다. 다행히도 스키마 구조가 리스트의 형태로 감싸져 있기 때문에 STRUCT 데이터 타입을 재귀적으로 정의할 수 있습니다(아니었다면 복수의 함수를 통해 복잡한 로직을 구성해야 했을 것입니다).
def receive_json(batchDF, batchId, spark):
from pyspark.sql.functions import from_json
import json
try:
# Create Schema
json_data = batchDF \
.selectExpr(
"CAST(value AS STRING)"
) \
.select("value").first()[0]
schema = infer_schema(json.loads(json_data))
print(schema)
Spark 어플리케이션에서는 STRING 데이터 타입으로 디코딩한 value 데이터를 추출하고, 이를 디코딩시킨 후 앞에서 정의한 infer_schema 함수에 json 데이터를 전달합니다. Kafka 메세지를 수신한 직후 생성되는 데이터프레임은 단일 row 구조로 되어 있으므로, first() 함수를 사용하여 데이터를 선정해도 특별한 문제가 없습니다.
"왜 바로 json 타입으로 디코딩하지 않고, 불편하게 스키마를 만들까?" 라는 의문이 들으실 수 있습니다. 스키마를 만들어주는 이유는, Spark의 STRUCT 데이터 타입과 Python의 json 데이터 타입이 상호 호환되지 않기 때문입니다. 따라서 PySpark에 udf를 정의하여 데이터 타입을 json으로 인코딩해도 Spark는 여전히 해당 데이터의 타입을 STRING으로 인식합니다.
* 동일한 이유로, STRING으로 CAST를 진행한 JSON 데이터 역시 json(decoding) 타입으로 구성되어 있습니다. 인코딩 과정에서는 기존 데이터의 형식자가 유실되지 않기 때문에 해당 로직을 통해 복원이 가능한 것입니다.
데이터 타입 변환하기
# Change Datatype
df = batchDF \
.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)"
) \
.withColumn("value", from_json("value", schema))
앞에서 생성한 스키마 데이터를 from_json 함수에 부여합니다. from_json 함수는 스키마 구조를 기반으로 STRING 데이터를 STRUCT 데이터로 변환해주는 역할을 수행합니다.
🖍️ 2번 방법 - RDD 사용하기
앞에서 사용한 방법은 JSON 데이터의 스키마 구조를 직접 생성하였으며, batchDF 내부의 데이터 타입을 변환시키는 방법이었습니다. 하지만 batchDF의 나머지 요소들을 필요로 하지 않는 경우, value 데이터만 추출하여 새로운 데이터프레임을 만들 수도 있습니다.
Spark는 RDD라는 추상화된 데이터 타입을 경유해서 다양한 형식의 데이터들을 읽고 처리할 수 있습니다. 이러한 특성을 응용해 JSON 데이터를 RDD로 변환하고, 이를 read.json() 함수로 읽어 (JSON 파일을 직접 읽을 때와 동일한 로직으로) 새로운 데이터프레임을 생성합니다.
def receive_json(batchDF, batchId, spark):
import json
try:
# Create Schema
json_data = batchDF \
.selectExpr(
"CAST(value AS STRING)"
) \
.select("value").first()[0]
# Change Datatype to JSON
json_value = json.loads(json_data)
# Create RDD with JSON
rdd_json = spark.sparkContext.parallelize([json_value])
# Create Dataframe with RDD
df_json = spark.read.json(rdd_json, multiLine=True)
# Test
df_json.write \
.format("console") \
.save()
except Exception as E:
print(f">>>>>> {E}")
해당 로직을 사용할 경우의 장점은, Spark가 아닌 Python 로직을 사용해 JSON 데이터를 정의하므로 Spark 스키마를 제공할 필요가 없다는 점입니다. 구체적으로 해당 스크립트는 JSON 데이터를 병렬화하여 RDD 데이터를 생성하고, 이를 데이터프레임 형태로 변환하는 방식을 사용하고 있습니다. 스키마를 제공했을 경우와 스크립트 실행 속도 측면에서도 큰 차이는 없습니다.
다만, 이런 방식을 채택할 경우 BatchDF 내에 포함된 Kafka 정보 컬럼들을 사용하지 않게 된다는 단점도 있습니다. 따라서 해당 로직을 사용하면서 Kafka 데이터도 함께 사용해야 한다면, BatchDF 컬럼에서 데이터를 일부 추출하거나 생성한 데이터프레임을 BatchDF와 JOIN하여 사용해야 합니다.
🔎 결과

(맨 앞줄의 ERROR 내용과 하단의 빈 데이터프레임은 Spark 앱 실행시 초기에 빈 데이터로 로직이 1회 동작하기 때문입니다)
value 데이터에 Key 값이 표기되지 않는 것으로 보아, 데이터가 정상적으로 STRUCT 타입으로 변경되었습니다. 추가 예제로 작성한 하위 컬럼을 지정하는 로직도 잘 동작하고 있습니다. 먼 길을 돌아간 것 같지만, JSON 데이터를 정상적으로 가공할 수 있게 되었군요.
📝 마치며
내용적인 부분이 많지 않음에도 이번 포스트는 작성에 꽤 오랜 시간이 걸렸습니다. 해당 로직을 찾아내는 과정에서 많은 가설의 과정을 거쳐야 했기 때문입니다. Python 환경과 Spark 환경에서 데이터 타입이 어떤 식으로 취급되는지, Kafka에 발행된 메세지는 어떻게 인코딩 및 디코딩되는지 등을 직접 테스트해야 했기 때문에 상당히 많은 시간이 소요되었습니다. 막상 작성된 게시글을 보니 내용이 무척 적은 것 같아 웃기면서도 슬픈 기분이네요. 아무쪼록 이것으로 Spark 환경에서 다양한 데이터 타입에 대하여 조금 더 유동적으로 대처할 수 있게 된 것 같습니다.
'데이터 이모저모 > Spark' 카테고리의 다른 글
[Spark] Buffer 단위로 Kafka Streaming 작업 수행하기 (2) | 2024.01.22 |
---|---|
[Spark] PySpark의 기본 데이터 가공 프로세스 (1) | 2024.01.03 |
[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기 (1) | 2023.12.26 |
[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 (0) | 2023.12.24 |
[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 (0) | 2023.12.24 |
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그