![[Spark] PySpark의 기본 데이터 가공 프로세스](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FbIfC9j%2FbtsCTYkaiwQ%2FEROiDkoqGHE7I118HJC2F1%2Fimg.png)
🧐 개요
이번 포스트에서는 데이터프레임 가공 과정에서 가장 기본적으로 사용되는 PySpark 로직을 설명하고자 합니다.
PySpark의 기본적인 가공 로직들은 다양한 분야 및 데이터 기반의 프로젝트에서 반복적으로 사용되는 내용이지만, 구현 방식이 다채롭고 활용 범위가 넓다보니 꾸준히 사용하지 않으면 빠르게 잊혀지고 내용을 잘못 기억하게 되더군요(내 기억력의 문제인가..). 이를 개인적으로 예방하고 필요한 시기에 찾아볼 수 있도록 하는 차원에서 포스트를 작성합니다.
📁 파일 데이터 불러오기
# read json files
df = spark.read.option("multiline", "true").json("<json_파일_폴더_경로>")
# read csv files
df = spark.read.csv("<csv_파일_폴더_경로>", header=True, inferSchema=True)
# read parquet files
df = spark.read.parquet("<parquet_파일_폴더_경로>")
# set path
local_dir = "file://<로컬_절대경로>"
hdfs_dir = "hdfs://<hdfs_절대경로>"
s3a_dir = "s3a://<버킷_이름>/<aws_s3_절대경로>"
파일 데이터를 불러 데이터프레임을 생성하는 로직을 설명하고 있습니다. 데이터를 불러올 때 고려해야 할 부분들을 정리하면 다음과 같습니다.
- 일반적으로 json 데이터는 복잡한 구조를 쉽게 확인할 수 있도록 여러 줄로 구성되어 있습니다. 여러 줄로 나뉘어진 json 데이터를 데이터프레임으로 만들 경우, multiline=true 옵션을 부여해야 합니다(해당 옵션은 기본적으로 false로 설정되어 있습니다).
- PySpark에서 데이터 파일을 읽어 데이터프레임을 생성할 경우, 파일의 디렉토리를 정확하게 지정해주는 것이 중요합니다. 파일이 위치한 디렉토리에 따라 절대 경로 앞에 file:// 과 같은 접두어가 위치해야 합니다.
🔎 스키마 구조 파악하기
이미 1차적인 가공이 완료된 csv 또는 parquet 파일들의 경우 대부분 스키마 구조를 파악할 필요가 없습니다. 하지만 매우 복잡한 구조를 가진 json 데이터를 사용할 경우, 필요로 하는 데이터가 어느 위치에 있는지를 파악하는 과정이 중요합니다. Spark에서는 데이터프레임의 스키마 구조를 파악할 수 있도록 기능적으로 지원하고 있습니다.
아래의 명령어를 통해 데이터프레임의 스키마 구조를 확인할 수 있습니다.
df.printSchema()
root
|-- artists: array (nullable = true) #
| |-- element: struct (containsNull = true)
| | |-- external_urls: struct (nullable = true)
| | | |-- spotify: string (nullable = true)
| | |-- followers: struct (nullable = true)
| | | |-- href: string (nullable = true)
| | | |-- total: long (nullable = true)
| | |-- genres: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- href: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- images: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- height: long (nullable = true)
| | | | |-- url: string (nullable = true)
| | | | |-- width: long (nullable = true)
| | |-- name: string (nullable = true)
| | |-- popularity: long (nullable = true)
| | |-- type: string (nullable = true)
| | |-- uri: string (nullable = true)
예제 스키마 구조입니다. Spotify API에서 제공하는 artist response 데이터입니다. 보시는 바와 같이, 불러온 json에 대하여 아무런 가공 처리도 하지 않은 상태입니다.
기본 고려 요소들
해당 스키마 구조를 확인하면서 필요로 하는 데이터를 선별하는 1차 가공을 시도해 보겠습니다. 우선 필요로 하는 컬럼들의 위치를 파악하고, 데이터 추출을 위한 로직을 구성해야 합니다. 경험적으로 데이터를 가공하면서 중요하게 고려하게 된 부분들은 아래와 같습니다.
- 데이터가 array 타입인 컬럼은 반드시 explode 작업을 수행해야 합니다. explode 작업을 진행하지 않은 데이터를 SELECT할 경우 상위 데이터와 마찬가지로 array 형태로 전환되며, 내부에 중복 데이터가 발생하게 됩니다. explode 작업은 withColumn 또는 select 로직과 조합하여 아래와 같이 진행할 수 있습니다.
df \
.select(explode("artists").alias("artists")))
df \
.withColumn("artists", explode("artists"))
- 데이터가 array 또는 struct 타입인 컬럼의 하위 컬럼을 SELECT할 수 있습니다.
예를 들어, 위의 데이터에 대하여 아래의 식이 수행 가능합니다.
# 컬럼 추가하기 : artists(array 타입)
df \
.withColumn("artists", explode("artists")) \
.withColumn("id", expr("artists.id"))
# 컬럼 선정하기 : artists(array 타입), external_urls(struct 타입)
df \
.withColumn("artists", explode("artists")) \
.select("artists.followers", "artists.external_urls.spotify")
- 데이터가 array 타입인 컬럼의 하위 컬럼은 filter 조건으로 지정할 수 없습니다(array 타입인 컬럼의 하위 컬럼에 대하여 필터 조건을 수행하려면 array 컬럼에 대한 explode 작업을 수행해야 합니다).
예를 들어, 위의 데이터에 대해서는 아래의 식을 수행할 수 없습니다.
df \
.filter("artists.followers.total >= 200000")
가공 프로세스
앞서 언급한 고려 요소들을 기반으로 데이터 가공 프로세스를 구성하면 다음과 같습니다(해당 프로세스는 주니어 개발자의 시점에서 구성한 것으로, 현업에서의 로직과는 차이가 있을 수 있습니다).
- 데이터 스키마 구조 확인
- array 타입인 데이터의 explode 진행
- filter 작업을 통한 데이터 축소 ** 3번 로직과 2번 로직의 순서는 바뀔 수 있음 **
- (+추가) 필요로 하는 컬럼 생성 (withColumn)
- 사용할 컬럼 선정 (select)
🛠️ 데이터 가공 함수
Spark 내에는 매우 다양한 데이터 가공 로직이 내재되어 있지만, 원천 데이터에 대한 1차 가공 과정에서는 withColumn, explode, selectExpr, 그리고 (select + expr) 조합까지 총 4가지 정도의 함수를 주로 사용하게 됩니다.
df \
.withColumn('artists', explode('artists')) \
.withColumn("image", explode("artists.images")) \
.filter("image.height == 640") \
.selectExpr(
"artists.id as artist_id",
"artists.name as artist_name",
"explode(artists.genres) as genre", # 단일 데이터만 존재하는 array는 마지막에 explode
"artists.followers.total as followers",
"image.url as artist_image_url"
)
앞서 확인한 스키마 구조를 가공한 예시입니다. 일전에 설정한 가공 프로세스에 맞추어 구성되어 있습니다. 각 함수의 세부적인 내용은 아래에서 살펴보도록 하겠습니다.
select
select 함수는 필요로 하는 컬럼을 선별하는 용도로 사용합니다.
# 컬럼 지정하기
df.select('자동차', '공구', '과일.열대과일', '동물.새.*', .. )
--> 자동차 | 공구 | 열대과일 | 참새 | 기러기 | 백조 | ..
# 표현식 지정하기
from pyspark.sql.functions import expr
df.select(expr('새.긴부리').alias('긴부리새'), '과자')
--> 긴부리새 | 과자
from pyspark.sql.functions import explode
df.select('자동차', explode('과일').alias('과일_리스트'))
--> 자동차 | 과일_리스트
앞서 언급한 것처럼, struct 타입이나 (explode 과정을 거친) array 타입의 하위 컬럼을 선택할 수도 있습니다. Spark에서는 하위 컬럼 전체를 선택할 수 있도록 와일드카드(*)도 지원합니다.
위의 예시에서 간단한 select("upper_col.lower_col") 구조 대신 번거로운 select(expr("upper_col.lower_col").alias("name")) 구조를 사용하는 이유는, 해당 위치의 컬럼명을 바꾸어 주기 위해서입니다. 복잡한 구조의 데이터는 복수의 'id' column을 지니는 경우가 많기 때문에, 이름을 'people_id' 등으로 바꾸어 각 항목의 의미가 혼동되지 않도록 수정해주는 작업이 필요합니다.
selectExpr
selectExpr 함수는 select 함수와 expr 함수를 결합해서 사용하는 형태입니다. 단일 로직에 여러개의 select 작업을 구현할 때 사용되지만, 해당 로직은 spark.sql을 기반으로 구동되기에 sql을 지원하지 않는 일부 어플리케이션과 호환되지 않을 수 있다고 하니 이를 고려해서 사용해야 합니다.
df \
.selectExpr(
"음료수.탄산 as 탄산음료",
"탄산음료.용기.규격 as 탄산음료_용기_규격"
)
selectExpr을 통해 스크립트를 구현하면 sql 기반으로 작업을 수행하기 때문에 expr 함수를 호출할 필요가 없다는 장점이 있습니다. 개인적으로는 코드의 간결성을 높이는 용도로 사용하면 좋을 것 같습니다.
withColumn
withColumn 함수는 (주로 기존 컬럼의 데이터를 가공하여) 새로운 컬럼을 추가하는 용도로 사용합니다.
from pyspark.sql.functions import round, explode
df.withColumn("용기_용량", round("액체.음료수.용기.용량"))
-> .. | 용기_용량
df.withColumn("음료수_리스트", explode("액체.음료수"))
--> .. | 음료수_리스트
새로운 컬럼을 추가하는 로직이기 때문에 위와 같이 새로운 컬럼의 이름을 지정해주어야 합니다. 위에서는 round 함수를 사용하여 반올림을 진행하고, explode 함수를 사용하여 리스트 병렬화를 수행하고 있습니다.
filter
filter 함수는 특정 조건으로 데이터(열)를 선별하는 용도로 사용합니다.
df.filter("사람.키 <= 180")
filter 함수는 가급적 초반에 배치하여 Spark가 가공해야 할 데이터의 볼륨을 축소시키는 역할로 사용할 수 있습니다. 일반적으로 (실시간 streaming 처리가 아닌) 대부분의 데이터 가공 작업은 용량이 큰 데이터를 대상으로 진행되기 때문에 가급적 다루는 데이터의 양이 적을수록 좋습니다.
📝 마치며
데이터프레임의 가공 로직은 Spark 엔진을 기반으로 한 데이터 분석 및 ML 로직 구현 과정에서 자주 사용됩니다. 데이터 전처리 및 ML Pipeline의 설계를 위해서는 더욱 다양하고 복잡한 함수들이 필요하지만, 기본 로직에 대한 숙련도를 충분히 갖추면 보다 빠른 이해에 크게 도움이 될 것입니다.
'데이터 이모저모 > Spark' 카테고리의 다른 글
[Spark] Buffer 단위로 Kafka Streaming 작업 수행하기 (2) | 2024.01.22 |
---|---|
[Spark] Kafka Streaming - 인코딩 된 JSON 데이터 가공하기 (1) | 2024.01.06 |
[Spark] Spark Streaming(PySpark) - Kafka 메세지 가공 및 parquet 파일 저장하기 (1) | 2023.12.26 |
[Spark] Spark Streaming(PySpark) - '배치 함수'를 작성하여 작업 경과 시간을 기록하고 Kafka로 발행하기 (0) | 2023.12.24 |
[Spark] Spark Streaming(PySpark)으로 Kafka 메세지 수신하기 (0) | 2023.12.24 |
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그