![[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FbqIpLd%2FbtsCBLj0fjg%2FpJK860c5LjXczHyXvpiefk%2Fimg.png)
🧐 개요
이번 포스트는 Spark Streaming 기능을 사용해 Kafka 채널에 발행된 메세지를 수신하는 방법을 설명합니다. Apache Spark는 Kafka에서 발행한 메세지를 스트리밍 처리하도록 기능적으로 지원하고 있습니다. 일반적으로는 JAVA API 어플리케이션을 통해 Spark Streaming 작업을 구현한다고 들었습니다만 저는 PySpark 스크립트를 작성해 구현해보도록 하겠습니다.
참고로 제가 테스트를 진행하는 환경은 다음과 같습니다.
- Spark Version: v3.5.0
- JAVA Version: openjdk-8-jdk
- Kafka Host: localhost
🖍️ 공식 가이드 참조하기
Apache Spark 공식 홈페이지에 Kafka 스트리밍 관련 가이드 문서를 제공하고 있습니다.
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.5.0 Documentation
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka. Linking For Scala/Java applications using SBT/Maven project definitions, link
spark.apache.org
가이던스 내용을 확인해보면, kafka 데이터를 조회하기 위해 spark-sql-kafka 의존성 패키지가 필요합니다.
해당 의존성 패키지는 --packages 수식어를 통해 추가할 수 있습니다.
해당 명령어를 실행하면 spark가 필요로 하는 의존성 패키지를 자동으로 jar 디렉토리에 저장 및 설치합니다.
직접 jar 파일 디렉토리에 다운로드해도 스크립트 실행 시 해당 명령어를 지속적으로 입력해야 합니다.
🖍️ PySpark 스크립트 작성하기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
공식 가이드에 소개된 Spark Stream 스크립트
앞에서 제시된 Spark 공식 가이드 스크립트를 참조하여 Kafka 데이터를 스트리밍하는 PySpark 스크립트를 작성해 보겠습니다.
위에서 가공한 데이터프레임으로 특정 작업을 수행하는 쿼리를 정의해야 합니다.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("streaming") \
.setMaster("spark://workspace:7077")
sparkContext = SparkContext(conf=conf)
spark = SparkSession(sparkContext=sparkContext)
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "hoonie"
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic) \
.load()
transformed_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = transformed_df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
발행된 메세지의 Key와 Value 값을 출력하는 Spark Streaming 스크립트입니다.
- setMaster("spark://workspace:7077") 부분은 로컬 환경에 동작중인 Spark의 마스터 host 이름을 지정한 것입니다. 실제 Spark를 구동하시는 환경에 맞게 수정해주셔야 합니다.
🛠️ Spark Submit 실행
작성한 스크립트는 아래의 명령어를 통해 실행할 수 있습니다.
$SPARK_HOME/bin/spark-submit \\
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \\
<PySpark_스크립트_절대경로>
저는 의존성 패키지의 스칼라 버전을 2.12로 지정하였습니다. 실제로 각자의 Python 환경에 설치된 jar 파일들의 스칼라 버전을 확인하시고 맞는 버전을 지정해주시면 됩니다.
🧑🏭 어플리케이션 동작 테스트
위에서 실행시킨 어플리케이션의 동작 테스트를 해 보겠습니다.
어플리케이션이 종료되지 않고 동작되는 것을 확인했다면, 또 다른 터미널창을 열어서 Kafka 메세지를 발행하도록 하겠습니다. 일반적으로는 사용되지 않지만, 터미널 환경에서 사용자 입력으로 Kafka 채널에 메세지를 발행할 수 있습니다.
아래의 명령어를 사용해 사용자입력 창으로 접근합니다.
sudo /usr/local/kafka/bin/kafka-console-producer.sh \\
--broker-list localhost:9092 \\
--topic hoonie \\
--property "parse.key=true" \\
--property "key.separator=:"
- topic은 사용 중이신 kafka의 토픽 이름으로 수정하셔야 합니다. 저는 hoonie 채널을 사용하고 있습니다.
테스트로 메세지를 입력해봅니다. DEMO 키에 Hello, Kafka! 메세지를 발행하도록 하겠습니다.
Kafka 어플리케이션에서 key와 value를 출력하고 있습니다. 이런 식으로 스크립트의 정상 동작 여부를 확인할 수 있습니다.
🛠️ 추가: Kafka 메세지 수신 및 재발행
단일 PySpark 어플리케이션에서 복수의 쿼리를 사용하기 위해선 쿼리별 체크포인트 디렉토리를 별도/추가로 설정해주어야 합니다. 체크포인트 디렉토리는 쿼리의 상태를 저장하고 작업의 지속성을 제공하는 역할을 수행합니다. 단일 쿼리를 수행할 경우 특별히 지정해주지 않아도 되지만, 다중 쿼리를 수행할 경우 별도의 저장 공간을 지정해야 합니다.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("streaming") \
.setMaster("spark://workspace:7077")
sparkContext = SparkContext(conf=conf)
spark = SparkSession(sparkContext=sparkContext)
kafka_bootstrap_servers = "localhost:9092"
input_kafka_topic = "hoonie"
output_kafka_topic = "hoonie_back"
checkpoint_dir = "file:///home/hooniegit/git/study/rdd-manufacture/demo/kafka/checkpoint"
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", input_kafka_topic) \
.load()
transformed_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query1 = transformed_df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", f"{checkpoint_dir}/query1_checkpoint") \
.start()
query2 = transformed_df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("topic", output_kafka_topic) \
.option("checkpointLocation", f"{checkpoint_dir}/query2_checkpoint") \
.start()
query1.awaitTermination()
query2.awaitTermination()
해당 스크립트에서는 query2를 추가로 정의하여 hoonie 채널에서 수신한 데이터를 hoonie_back 채널에 재발행합니다. 각 쿼리의 체크포인트 디렉토리를 별도로 설정해 주었습니다.
📝 마치며
비록 대용량 데이터의 분석 및 과학 측면에서 더욱 유명하지만, Spark는 실시간 데이터의 스트리밍 및 가공 목적으로 사용되는 대표적인 기술 스택입니다. Spark를 통해 Kafka와 같은 대규모 pub/sub 시스템으로 실시간 전송되는 데이터를 가공 및 적재하여 보다 효율적이고 직관적인 데이터 파이프라인을 구축할 수 있습니다.
'데이터 이모저모 > Spark' 카테고리의 다른 글
[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기 (1) | 2024.01.06 |
---|---|
[Spark] PySpark의 기본 데이터 가공 프로세스 (1) | 2024.01.03 |
[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기 (1) | 2023.12.26 |
[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 (0) | 2023.12.24 |
[Spark] CPU 및 메모리 사용량 설정하기 (1) | 2023.12.22 |
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그