![[주저리] Kafka는 발행된 메세지의 데이터 타입을 기억할까? (JSON과 string의 비교)](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2F7dYUk%2FbtsCOy0EcBQ%2FNDsj51OY9gUhlez0kIO3tK%2Fimg.png)
🧐 개요
Apache Kafka 환경에 발행된 메세지는 자동으로 인코딩(직렬화)됩니다. 일전에 Spark Streaming 앱에서 수신한 Kafka 메세지에서 CAST를 진행해주었던 이유도 직렬화 된 데이터를 문자열로 변환시켜주기 위해서였죠. 발행된 메세지를 재사용하기 위해서는 적절한 디코딩을 통해 데이터를 읽기 및 쓰기가 가능한 포맷으로 바꾸어주어야 합니다. Kafka 내부적으로 디폴트로 사용되는 인코더가 있지만 사용자가 직접 Serializer 객체를 정의하여 인코딩을 수행할 수도 있습니다.
confluent-kafka 모듈을 학습하던 도중 궁금해졌던 부분은 JSON 데이터의 처리였습니다. 구체적으로는 "Kafka는 str(dict) 데이터와 json.dumps(dict).encode('utf-8') 데이터를 동일하게 취급할까?" 라는 의문이 들었습니다. 둘이 아무런 차이를 가지고 있지 않다면, 굳이 메세지 발행 모듈을 여러 종류로 세분화 할 필요가 없으니까요(str_publisher, json_publisher 등..).
Kafka의 인코딩 과정을 거치지 않는다면 전자의 데이터는 json.loads 로직을 통해 디코딩할 수 없습니다. 과연 인코딩한 데이터는 어떤 모습을 하고 있을까요? 간단한 메세지 발행 테스트를 통해 확인해보도록 하겠습니다.
🖍️ 테스트 진행
메세지 발행하기
def publish_message(conf, topic:str, key:str, message):
from confluent_kafka import Producer
try:
producer = Producer(conf)
producer.produce(topic=topic, key=key, value=message)
producer.flush()
except Exception as E:
print(E)
def publish_message_json(conf, topic:str, key:str, message:dict):
from confluent_kafka import Producer
import json
try:
producer = Producer(conf)
producer.produce(topic=topic, key=key, value=json.dumps(message).encode('utf-8'))
producer.flush()
except Exception as E:
print(E)
if __name__ == "__main__":
conf = {'bootstrap.servers': 'localhost:9092'}
topic = "serializingProducer"
key = "2024-01-02"
message = {'intro':'hello, kafka!'}
publish_message(conf=conf, topic=topic, key=key, message=str(message)) #1
publish_message_json(conf=conf, topic=topic, key=key, message=message) #2
메세지는 다음과 같은 로직을 통해 발행하였습니다. #1 로직은 str 데이터를 발행하고, #2 로직은 json 모듈을 사용한 인코딩 데이터를 발행하고 있습니다.
발행된 데이터의 상태를 확인해볼까요? 두 가지 데이터 모두 비슷한 형식으로 출력되고 있습니다. 차이점이 있다면, json 모듈을 사용해 인코딩한 데이터는 작은따옴표가 큰따옴표로 바뀌어 있습니다. 문론 해당 변화는 Kafka와는 관련이 없는 Python 스크립트 환경에서 발생하는 변화입니다.
메세지 수신 및 인코딩하기
def consume_messages(conf, topic:str):
import json
from confluent_kafka import Consumer
consumer = Consumer(conf)
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
key = msg.key().decode('utf-8') if msg.key() else None
try:
value= json.loads(msg.value().decode('utf-8')) if msg.value() else None
except json.JSONDecodeError:
value = None
value_type = type(value)
print(f"key: {key}, value: {value}, value type: {value_type}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
if __name__ == "__main__":
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'TEST',
'auto.offset.reset': 'earliest'
}
topic = "serializingProducer"
consume_messages(conf=conf, topic=topic)
메세지 수신 코드에서는 발행된 메세지의 value 값을 디코딩하고, 해당 데이터를 dict 타입으로 변환하고 있습니다. 간단한 테스트용 스크립트이기 때문에 KafkaError 등은 구현하지 않았습니다.
🔎 결과
(응? 혹시나.. 하는 마음에 2번 실행했습니다)
#1 로직의 데이터는 데이터 타입 변환에 실패하여 예외 처리로 인해 None 값이 호출되었습니다. #2 로직의 데이터는 정상적으로 dict 데이터 타입으로 변환되었습니다. 결론적으로 Python 환경에서의 차이와 마찬가지로, Kafka를 경유하여 인코딩 된 데이터도 JSON 타입의 딕셔너리 데이터와 string 타입의 딕셔너리 데이터를 구분하고 있습니다.
이로서 Kafka 환경에 데이터를 발행할 때, 발행하는 데이터의 타입이 유지된다는 사실을 확인할 수 있습니다.
📝 마치며
이번 테스트를 통해서 Kafka의 인코딩이 입력된 데이터 타입을 보존한다는 사실을 확인하게 되었습니다. 그동안 Kafka에 메세지를 발행할 때 동일하게 취급되는 것으로 오해하여 항상 문자열 데이터를 전송하였었는데, 덕분에 PySpark Streaming 어플리케이션의 데이터 수신 로직도 from_json 모듈을 사용하는 등 잘못 구성하게 된 것 같습니다. 지금이라도 문제점을 파악하였으니, 해당 부분을 수정하고 스크립트를 새롭게 구성해야겠네요.
'데이터 이모저모 > Kafka' 카테고리의 다른 글
[주저리] 개념으로 이해하는 메세지 큐(Message Queue)와 카프카(Kafka) (1) | 2023.12.24 |
---|
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그