[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기
데이터 이모저모/Spark2023. 12. 26. 14:24[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기

🧐 개요 이전 포스트 - [Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 이전 포스트 - [Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 이번 포스트는 지난 포스트에 이어, kafka를 통해 수신한 데이터를 가공하는 로직을 소개합니다. 해당 로직은 제가 데모를 돌리면서 실습한 부분이기 때문에 실제 현업에서의 사용 방식과는 차이가 있습니다. 이번 포스트에서 구현한 로직은 아래와 같습니다. Kafka에서 JSON 구조로 작성된 STRING 타입의 value 데이터를 발행 Spark에서 value 데이터의 타입을 Struct 타입으로 변형 Spark에서 데이터를 가공 후 기타 작업을..

[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기
데이터 이모저모/Spark2023. 12. 24. 22:56[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기

🧐 개요 이전 포스트 - [Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 이번 포스트는 저번 포스트에 이어서, Spark Streaming으로 Kafka에 메세지를 발행하는 다른 예제를 소개합니다. 구체적으로는 작업 경과 시간을 기록하여 Kafka로 발행하고자 합니다. 이번에는 해당 기능을 구현하는 과정에서 '배치 함수'를 만들어 각 배치별로 적용될 PySpark 가공 로직을 구성해보도록 하겠습니다. (해당 기능은 추후 Grafana 대시보드에서 Kafka 스크립트의 동작 시간을 실시간 대시보드로 모니터링하기 위한 기능입니다. 해당 내용도 블로그에 포스팅하도록 하겠습니다) 🖍️ 예제 스크립트 간단한 pub/sub 기능을 구현하는 PySpark Streaming 예..

[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기
데이터 이모저모/Spark2023. 12. 24. 15:51[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기

🧐 개요 이번 포스트는 Spark Streaming 기능을 사용해 Kafka 채널에 발행된 메세지를 수신하는 방법을 설명합니다. Apache Spark는 Kafka에서 발행한 메세지를 스트리밍 처리하도록 기능적으로 지원하고 있습니다. 일반적으로는 JAVA API 어플리케이션을 통해 Spark Streaming 작업을 구현한다고 들었습니다만 저는 PySpark 스크립트를 작성해 구현해보도록 하겠습니다. 참고로 제가 테스트를 진행하는 환경은 다음과 같습니다. Spark Version: v3.5.0 JAVA Version: openjdk-8-jdk Kafka Host: localhost 🖍️ 공식 가이드 참조하기 Apache Spark 공식 홈페이지에 Kafka 스트리밍 관련 가이드 문서를 제공하고 있습니다...

[Spark] CPU 및 메모리 사용량 설정하기
데이터 이모저모/Spark2023. 12. 22. 16:20[Spark] CPU 및 메모리 사용량 설정하기

🧐 개요 특별한 설정을 추가하지 않으면, Apache Spark는 작업을 수행하는 과정에서 컴퓨터의 CPU 및 메모리 리소스를 전부 사용하도록 설정되어 있습니다. 따라서 무거운 작업을 반복적으로 구동하는 경우, 컴퓨터의 리소스 범위를 초과하게 되어 에러가 발생하거나 시스템 리소스가 다운되는 경우가 발생하게 됩니다(저 역시도 로컬 머신에서 360억 row의 parquet 데이터를 읽는 도중 시스템이 멈추는 상황이 발생하였습니다). 다행히도 Spark에서는 작업 수행 과정에서 사용할 리소스 양을 설정하는 환경 설정을 기능적으로 제공하고 있습니다. 방법이 어렵지 않기 때문에 오늘은 CPU 및 메모리 사용량을 설정하는 방법을 다루겠습니다. 📙 공식 가이드 확인하기 spark-env.sh 스크립트에 Apache ..

[GitHub] 깃허브 레포지토리 잔디가 생기지 않는 경우
클라우드 이모저모/GitHub2023. 12. 17. 14:56[GitHub] 깃허브 레포지토리 잔디가 생기지 않는 경우

🧐 개요 이번 포스트는 GitHub Contribution 연동이 정상적으로 이루어지지 않을 때의 해결 방법을 소개합니다. 작업 내용이 코드로는 반영이 되었는데, Github Contribution으로 인정되지 않는 경우가 발생할 수 있습니다. 이런 현상은 주로 작업 환경을 이전하는 시기에, 입력한 Configuration 정보에 무언가 문제가 있기 때문에 발생합니다. 깃허브의 내용물을 살펴보면, 당일 수행한 커밋 작업들이 제대로 남아 있습니다. 즉 GitHub 활동의 클라우드 연동과 관련된 인증 요소에 무언가 문제가 있는 것입니다. 🛠️ 해결 방법 새로운 컴퓨터에 Git Commit 작업을 처음 수행할 때, username과 password 정보를 입력하도록 안내합니다. 입력한 정보는 git confi..

[GCP] 방화벽 규칙 생성 및 태그 적용하기
클라우드 이모저모/GCP2023. 12. 5. 17:59[GCP] 방화벽 규칙 생성 및 태그 적용하기

🧐 개요 개인 및 팀 단위의 프로젝트 내에서 구글 클라우드 플랫폼(약칭 GCP) 서비스들을 이용하고 있던 차에 이런 메일들을 심심치 않게 받아 왔습니다. 요약해서 이야기하자면 ‘귀하의 인스턴스(구글 가상머신)가 암호화폐 체굴에 사용되었으며, 이것은 우리의 정책 위반이니 잘 처신해라’ 라는 내용입니다. 그런데 당연하지만, 컴퓨터 분야를 이제 막 시작한 취준생이 암호화폐 체굴을 할 수 있을 리가 없죠. 외부 IP로부터 도스(DoS) 공격을 당해 인스턴스가 해킹 및 크래킹을 당해서 해당 상황이 발생하는 것입니다. 즉 뭔가 보안 요소에 문제가 있는 것이죠. VM 인스턴스의 목록을 보니, 크래킹으로 인해 암호화폐 체굴에 사용되던 namenode 인스턴스가 다운되어 있습니다. 이런 빅 데이터 솔루션들의 클러스터 구..

image