🧐 개요
이번 시간에는 Buffer 단위로 Kafka Streaming 작업을 수행하는 예제를 다루도록 하겠습니다.
Buffer이란 무엇일까?
Buffer은 데이터를 일시적으로 저장하는 메모리 공간입니다. 데이터가 '물'이라면, buffer은 '물을 담는 양동이' 정도로 표현할 수 있을 것입니다. 지속적으로 흐르는 물로 세수를 할 수도 있고, 양동이에 담은 물로 논밭에 물을 줄 수도 있는 것처럼 buffer은 일정한 크기의 데이터를 수집하여 한번에 작업을 수행하도록 도와주는 장치입니다. 일반적으로는 실시간 데이터를 수집하는 파트에서 주로 사용되는 개념입니다.
Spark Streaming Kafka 앱에서 Buffer을 사용해야 하는 이유
Spark Streaming 어플리케이션의 작업 속도는 결코 빠르다고 할 수 없습니다. 보다 정확하게 이야기하면 아래와 같이 이야기할 수 있습니다.
- '대용량의 데이터를 한번에 처리하는 측면'에서의 Spark의 속도는 매우 빠르다.
- '단일 Batch 수준의 데이터를 처리하는 측면'에서의 Spark의 속도는 매우 느리다.
Spark는 대용량의 데이터를 Scale-Out이 가능한 분산 환경에서 처리하기 위해 매우 복잡한 구조로 설계되었습니다. 따라서 아주 가벼운 수준의 작업은 단일 스크립트를 동작시키는 것에 비해 매우 느린 속도로 동작하게 되며, 상시 가동중인 스트리밍 어플리케이션이라 하더라도 내부 로직과 모듈을 로드하는 데에 짧지 않은 시간이 소요됩니다.
따라서 buffer을 통해 충분한 양의 데이터를 모으고, 모인 데이터를 한 번에 처리할 수 있도록 구성하는 것이 Spark 작업에 있어 유리합니다.
Python 환경에서 Buffer 구성 시 고려해야 할 점
* Buffer을 구성하고 관리하기 위한 다양한 외부 프로그램과 모듈, 그리고 오픈 소스 소프트웨어들이 있지만, 지금의 제가 알지 못하므로 Python 환경에서 단순하게 Buffer을 구현하고 있습니다. 추후 관련 내용을 학습하여 포스트를 작성하도록 하겠습니다.
Spark Dataframe은 Python 환경의 리스트 내부에 추가되지 않습니다. Python 환경의 리스트는 로컬 메모리에 저장되는 단일 머신에서의 데이터이지만, Spark Dataframe은 분산 환경에서 관리되는 RDD 기반의 불변성 데이터이기 때문입니다. 따라서 해당 포스트에서는 '일반적인 Python 작업에 소요되는 시간이 적다는 점을 감안'하여, Buffer 리스트에 데이터를 축적할 수 있도록 Dataframe을 Python 환경의 딕셔너리로 변환하는 방식을 사용합니다.
* 요약하면 아래의 방식으로 스크립트가 동작합니다.
BatchDF 입력 -> 딕셔너리 데이터 생성 -> buffer 리스트에서 저장 : 일정 크기 도달 시 Spark Dataframe 생성 -> 배치 작업 수행
🖍️ 스크립트 작성하기
Spark App tree 구조
app
.
├── app.py # kafka streaming 어플리케이션
└── lib
├── __init__.py
├── buffer # buffer 데이터(dict)를 생성하는 모듈 모음
│ ├── __init__.py
│ └── export_datas.py
├── dataframe # 생성된 dataframe 관련 작업을 수행하는 모듈 모음
│ ├── __init__.py
│ └── print_to_terminal.py
└── json # buffer 데이터(dict)를 dataframe 형태로 병합하는 모듈 모음
├── __init__.py
└── create_dataframe.py
디렉토리의 tree 구조는 위와 같이 구성되어 있습니다.
Spark 어플리케이션의 기본 스크립트인 app.py를 최대한 간략한 형태로 작성하기 위해, 모든 기능들은 모듈화하여 lib 디렉토리 내부에 배치하고 있습니다.
lib/buffer/export_datas.py
def export_all(batchDF, spark):
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
first_row_dict = batchDF \
.withColumn("key", col("key").cast(StringType())) \ # 2진 데이터 decoding
.withColumn("value", col("value").cast(StringType())) \ # 2진 데이터 decoding
.select(*batchDF.columns).first().asDict() # 전체 컬럼 데이터를 선정하여 dict로 변환
print(first_row_dict) # test
return first_row_dict
해당 라이브러리는 buffer에 추가할 딕셔너리 데이터를 만드는 함수들을 보관하고 있습니다.
예제 함수는 Kafka 서버 내에 인코딩되어 있는 Key 및 Value값을 디코딩한 후, 모든 컬럼 요소를 추출하여 딕셔너리를 생성합니다.
이전 포스트에서 설명한 바와 같이, Kafka 채널에 발행되는 메세지는 2진 데이터로 인코딩되어 있기 때문에
lib/json/create_dataframe.py
def create_dataframe(buffer:list, spark):
from pyspark.sql.functions import explode
from pyspark.sql import Row
print(buffer) # test
rdd = spark.sparkContext.parallelize([Row(**item) for item in buffer])
df = spark.createDataFrame(rdd)
df.show() # test
return df
해당 라이브러리는 buffer 리스트에 모인 딕셔너리 데이터들을 병합하여 Spark Dataframe을 만드는 함수들을 보관하고 있습니다.
Spark는 딕셔너리 데이터를 RDD로 만들고, RDD 데이터를 데이터프레임으로 만들 수 있도록 기능적인 지원을 하고 있습니다. 해당 함수에선 buffer 리스트를 각 항목에 대하여 RDD 데이터로 변환한 후 이를 기반으로 Spark Dataframe을 만들고 있습니다.
lib/dataframe/print_to_terminal.py
def print_dataframe(df_buffer, spark):
df_buffer \
.write \
.format("console") \
.save()
해당 라이브러리는 buffer 기반으로 생성된 Spark Dataframe으로 작업을 수행하는 함수들을 보관하고 있습니다.
해당 예제에서는 간단한 테스트를 위해 단순 터미널 출력 기능을 수행하고 있습니다.
app.py
from pyspark.sql import SparkSession
# set spark master
# [임시]
spark = SparkSession.builder \
.appName("KafkaStreaming") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.master("spark://neivekim76.local:7077") \
.getOrCreate()
# set kafka servers & topics
kafka_bootstrap_servers = 'localhost:9092' # [임시]
input_kafka_topic = "spark_streaming" # [임시]
# set buffer & batch
buffer, batch = [], []
buffer_size = 5
# read stream
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", input_kafka_topic) \
.load()
def set_buffer(batchDF, batchId, spark):
from lib.buffer.export_datas import export_all
from lib.json.create_dataframe import create_dataframe
from lib.dataframe.print_to_terminal import print_dataframe
try:
global buffer, batch # 글로벌(함수 바깥) 변수 사용
data = export_all(batchDF=batchDF, spark=spark) # buffer 데이터(dict) 생성
buffer.append(data)
batch.append(batchId)
if len(buffer) >= buffer_size:
df_buffer = create_dataframe(data=data, spark=spark) # dataframe 생성
print_dataframe(df_buffer=df_buffer, spark=spark) # batch 작업 수행
print(">>>>>> Let's check out batches " + str(batch)) # batch 확인 / need to fix
buffer, batch = [], [] # buffer 및 batch 초기화
except Exception as E: # 초기 blank batchDF에 대한 예외처리 필요
print(f">>>>>> Oops, error appeared.. \n{E}")
# set query
query = df \
.writeStream \
.foreachBatch(lambda batchDF, batchId: set_buffer(batchDF, batchId, spark)) \
.start()
# await termination
query.awaitTermination()
스트리밍을 수행하는 어플리케이션 스크립트입니다. 해당 스크립트에서의 주요 특징들을 정리하면 아래와 같습니다.
- spark-sql-kafka 패키지를 추가합니다.
- buffer 리스트를 지속적으로 갱신 및 초기화하기 위해 어플리케이션 스크립트에 직접 buffer을 정의합니다.
- buffer 프로세스를 구현하는 함수를 정의합니다.
- 외부 변수를 직접 사용하기 위해 global 메소드를 사용합니다.
- try & except 기반의 예외 처리 로직을 구성합니다. Spark Structured Streaming 어플리케이션은 초기 빈 데이터프레임을 입력받기 때문에 예외 처리를 수행하지 않으면 어플리케이션 동작이 중단됩니다.
- buffer 프로세스 작업 수행을 위한 라이브러리 내 모듈들을 호출해 사용합니다.
🔎 실행 결과
앞서 작성한 스크립트를 실행한 결과입니다. 초기에는 blank batchDF에 대한 예외 처리가 수행되었고, 각 배치를 통해 입력된 데이터가 딕셔너리의 형태로 변환됨을 확인할 수 있습니다. buffer이 지정한 크기에 도달했을 때 데이터프레임이 정상 출력되고, 이를 통해 수행한 작업과 누적된 배치 ID 정보들까지 정상적으로 확인할 수 있습니다.
조금 복잡하게 구현했지만, 이제 실시간 데이터 처리를 원하는 크기 단위로 묶어서 처리할 수 있게 되었습니다!
📝 마치며
프로그래밍을 배우면서, '높은 수준의 기술 스택 구현'을 이행하는 경우와 '낮은 수준의 기술 스택의 조합'을 이행하는 경우가 공존하는 것 같습니다. 이번에 다룬 buffer 로직은 단순 Python 로직들을 복수로 조합한 내용이라 후자에 가까운 것 같습니다. 어느 한 쪽이 더 중요하다고 이야기할 수는 없지만, '아키텍처의 완성도를 결정하는 것이 한 끗 차이일 수도 있겠구나', 라는 고민을 갖게 하는 과정이었습니다.
'데이터 이모저모 > Spark' 카테고리의 다른 글
[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기 (1) | 2024.01.06 |
---|---|
[Spark] PySpark의 기본 데이터 가공 프로세스 (1) | 2024.01.03 |
[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기 (1) | 2023.12.26 |
[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 (0) | 2023.12.24 |
[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 (0) | 2023.12.24 |
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그