[Spark] Buffer 단위로 Kafka Streaming 작업 수행하기
데이터 이모저모/Spark2024. 1. 22. 23:59[Spark] Buffer 단위로 Kafka Streaming 작업 수행하기

🧐 개요 이번 시간에는 Buffer 단위로 Kafka Streaming 작업을 수행하는 예제를 다루도록 하겠습니다. Buffer이란 무엇일까? Buffer은 데이터를 일시적으로 저장하는 메모리 공간입니다. 데이터가 '물'이라면, buffer은 '물을 담는 양동이' 정도로 표현할 수 있을 것입니다. 지속적으로 흐르는 물로 세수를 할 수도 있고, 양동이에 담은 물로 논밭에 물을 줄 수도 있는 것처럼 buffer은 일정한 크기의 데이터를 수집하여 한번에 작업을 수행하도록 도와주는 장치입니다. 일반적으로는 실시간 데이터를 수집하는 파트에서 주로 사용되는 개념입니다. Spark Streaming Kafka 앱에서 Buffer을 사용해야 하는 이유 Spark Streaming 어플리케이션의 작업 속도는 결코 빠르다..

[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기
데이터 이모저모/Spark2024. 1. 6. 01:10[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기

🧐 개요 Apache Kafka를 통해 발행된 메세지의 Key 및 Value는 (Kafka의 자체적인 로직으로 인해) 인코딩되어 있습니다. 따라서 해당 데이터를 디코딩하지 않으면 읽을 때 데이터가 2진법으로 표기됩니다. Spark-Streaming-Kafka 앱의 경우에도 수신한 Kafka 메세지를 특정 데이터 타입으로 CAST 하도록 공식 가이드에서도 권고하고 있습니다. 메세지가 일반적인 STRING 또는 INTEGER 데이터 타입인 경우에는 그냥 CAST를 진행해 주면 아무런 문제가 없습니다. 문제는 메세지가 JSON 데이터 타입인 경우에 발생하는데, 현재의 Spark는 인코딩된 STRUCT 데이터를 자동으로 디코딩해주지 못합니다. 디코딩을 위해서는 기존 데이터가 가지고 있던 스키마(schema) 정..

[Spark] PySpark의 기본 데이터 가공 프로세스
데이터 이모저모/Spark2024. 1. 3. 02:52[Spark] PySpark의 기본 데이터 가공 프로세스

🧐 개요 이번 포스트에서는 데이터프레임 가공 과정에서 가장 기본적으로 사용되는 PySpark 로직을 설명하고자 합니다. PySpark의 기본적인 가공 로직들은 다양한 분야 및 데이터 기반의 프로젝트에서 반복적으로 사용되는 내용이지만, 구현 방식이 다채롭고 활용 범위가 넓다보니 꾸준히 사용하지 않으면 빠르게 잊혀지고 내용을 잘못 기억하게 되더군요(내 기억력의 문제인가..). 이를 개인적으로 예방하고 필요한 시기에 찾아볼 수 있도록 하는 차원에서 포스트를 작성합니다. 📁 파일 데이터 불러오기 # read json files df = spark.read.option("multiline", "true").json("") # read csv files df = spark.read.csv("", header=Tru..

[주저리] Kafka는 발행된 메세지의 데이터 타입을 기억할까? (JSON과 string의 비교)
데이터 이모저모/Kafka2024. 1. 2. 02:39[주저리] Kafka는 발행된 메세지의 데이터 타입을 기억할까? (JSON과 string의 비교)

🧐 개요 Apache Kafka 환경에 발행된 메세지는 자동으로 인코딩(직렬화)됩니다. 일전에 Spark Streaming 앱에서 수신한 Kafka 메세지에서 CAST를 진행해주었던 이유도 직렬화 된 데이터를 문자열로 변환시켜주기 위해서였죠. 발행된 메세지를 재사용하기 위해서는 적절한 디코딩을 통해 데이터를 읽기 및 쓰기가 가능한 포맷으로 바꾸어주어야 합니다. Kafka 내부적으로 디폴트로 사용되는 인코더가 있지만 사용자가 직접 Serializer 객체를 정의하여 인코딩을 수행할 수도 있습니다. confluent-kafka 모듈을 학습하던 도중 궁금해졌던 부분은 JSON 데이터의 처리였습니다. 구체적으로는 "Kafka는 str(dict) 데이터와 json.dumps(dict).encode('utf-8'..

[PostgreSQL] 데이터베이스 백업 및 S3 업로드
데이터 이모저모/SQL2023. 12. 27. 23:58[PostgreSQL] 데이터베이스 백업 및 S3 업로드

🧐 개요 이번 포스트는 PostgreSQL 데이터베이스의 백업본을 생성하고 S3 스토리지에 업로드하는 방법을 설명합니다. AWS와 같은 대규모 클라우드 서비스들은 RDS 데이터베이스에 대한 백업 스냅샷을 주기적으로 생성하여 언제든지 데이터베이스를 백업할 수 있도록 기술적으로 지원하고 있습니다(해당 데이터를 외부로 이관할 수 있도록 s3 스토리지에 스냅샷을 업로드할 수도 있죠). 그런데 이러한 서비스들이 포함되어 있기 때문에 동일 사양의 인스턴스 대비 청구되는 비용이 높다는 단점이 있습니다. 다행히도, 대부분의 RDBMS 시스템은 데이터베이스를 백업 및 복원하기 위한 자체 기능을 내장하고 있습니다. 따라서 이번 포스트는 PostgreSQL의 자체 내장 기능을 사용하여 AWS의 스냅샷 기능을 벤치마킹 하도록..

[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기
데이터 이모저모/Spark2023. 12. 26. 14:24[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기

🧐 개요 이전 포스트 - [Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 이전 포스트 - [Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 이번 포스트는 지난 포스트에 이어, kafka를 통해 수신한 데이터를 가공하는 로직을 소개합니다. 해당 로직은 제가 데모를 돌리면서 실습한 부분이기 때문에 실제 현업에서의 사용 방식과는 차이가 있습니다. 이번 포스트에서 구현한 로직은 아래와 같습니다. Kafka에서 JSON 구조로 작성된 STRING 타입의 value 데이터를 발행 Spark에서 value 데이터의 타입을 Struct 타입으로 변형 Spark에서 데이터를 가공 후 기타 작업을..

image