![[Spark/Spotify] 군집화(KMeans/clustering)를 기반으로 한 음원 추천 시스템 구현하기](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FbfqpVs%2FbtsC7opk2U2%2FRnidR5JV0scZRxTHlyko4k%2Fimg.png)
🧐 개요
이번 포스트는 pyspark.ml 내부의 군집화(clustering) 모델을 기반으로 한 추천 시스템 로직 구현 과정을 설명합니다.
군집화란, 유사한 특정을 지닌 데이터들을 그룹으로 묶어주는 분류 방식입니다. 군집화는 비지도학습이기 때문에 사용자의 설정 부여 없이 군집을 자체적으로 생성하므로 '추천 시스템에 과연 적합할까?' 라는 의문이 들으실 수도 있습니다. 그래서 해당 포스트에서는 군집화를 통해 사용자 데이터를 군집화 분석하여 분포 위치를 파악하고, 빅데이터 내 해당 위치 인근에 있는 데이터를 선별하여 추천하는 방식을 설명합니다.
📁 데이터 준비
실습 환경에서 제가 준비한 데이터셋은 아래와 같습니다. 모든 데이터셋은 동일한 스키마를 가지고 있습니다.
- Spotify 특정 사용자의 플레이리스트(복수)에 기록된 Track 데이터
- 개인적으로 수집한 Spotify Track 빅데이터
저는 2개의 데이터셋을 합치고(UNION) 스케일링한 후 다시 분할하였지만, 해당 포스트에서는 이미 합쳐진 데이터를 사용하겠습니다.
데이터들이 공통적으로 가지고 있는 스키마의 구조는 다음과 같습니다. 해당 데이터는 Spotify API에서 'tracks' 및 'audio-features' 엔드포인트를 통해 수집한 데이터입니다.
데이터 학습 로직 미리보기
데이터 학습 로직을 구현하는 과정에서는 다음의 2가지 요소를 고려해야 합니다.
- Playlist Track Data가 수집된 Local Track Bigdata 내에 없을 수도 있다(수집된 데이터의 양이 적음)
- Dataset의 규격(스키마)이 동일하기 때문에 병합 가능
- 2개의 Dataset을 병합(UNION)하는 과정이 필요
- 데이터 스케일링은 전체 데이터셋에 대하여 진행해야 한다
- Train Dataset과 Test Dataset을 별도로 스케일링하면 범위가 과대/과소 반영될 위험이 있음
- 병합한 Dataset을 분할하기 위한 Playlist Track ID 리스트가 필요
- Vector Assemble은 Dataset 분할 후 수행해야 한다
- Vector Assemble 후 Dataset을 분할(Train/Test)하면 spark 오류 발생
따라서 해당 요소들을 고려하여 데이터 학습 로직은 2개의 데이터셋을 병합하여 스케일링을 수행하고, 다시 Train/Test로 분할하여 학습을 진행하도록 구성되어 있습니다.
임의의 Playlist Track ID 리스트 만들기
이번 포스트에서는 Union Dataset을 제공하고 있으므로, 사용자 플레이리스트에 있는 Track들의 ID 리스트를 임의로 만들도록 하겠습니다.
from pyspark.sql import SparkSession
# Build Spark Session
spark = SparkSession.builder \
.appName("recommendation_demo") \
.getOrCreate()
# Load Dataset
dw_tracks = spark.read.parquet("file:///다운받으신_폴더_절대경로/tracks")
dw_audioFeatures = spark.read.parquet("file:///다운받으신_폴더_절대경로/tracks_audioFeatures")
df = dw_tracks.join(dw_audioFeatures, "id", "inner")
# Create Playlist Track ID List
# 임의로 120곡의 Track ID 선별
track_list = [row.id for row in df.limit(120).collect()]
해당 로직을 통해 생성한 'track_list'가 원래 유저의 플레이리스트 내에 있었던 곡들이라고 가정하겠습니다.
(실제로 구현하기 위해서는 Spotify Access Token을 발급받아 API Request를 전송해야 합니다)
🖍️ 데이터 프로세싱
Dataset 스케일링 (Standard Scaling - 정규분포 기반)
군집화를 정상적으로 수행하기 위해서는 반영할 데이터들의 범위를 스케일링하는 과정이 필요합니다. 군집화는 좌표 사이의 거리 계산을 통해 데이터를 분류하기 때문에 원본 데이터를 그대로 입력하면 값이 큰 컬럼이 우선 반영될 가능성이 높기 때문입니다.
from pyspark.sql.functions import mean, stddev
raw_scaled_df = df
# Standard Scaling
for col_name in raw_scaled_df.columns[1:]:
mean_val = raw_scaled_df.select(mean(col_name)).collect()[0][0]
stds_val = raw_scaled_df.select(stddev(col_name)).collect()[0][0]
raw_scaled_df = raw_scaled_df.withColumn(col_name, (col(col_name) - mean_val) / stds_val)
해당 스크립트에서는 각 컬럼 데이터의 평균값과 표준편차를 계산하여 원본 데이터를 스케일링하고 있습니다.
Spark 내부에는 pyspark.ml.feature.StandardScaler 라는 클래스가 있습니다. 해당 클래스를 통해 Vector 컬럼의 Standard Scaling을 간편하게 진행할 수 있습니다. 하지만 Vector 컬럼의 데이터는 Dataset을 Train/Test로 분할할 경우 오류가 발생하기 때문에 현재의 추천시스템 로직에는 사용할 수 없습니다.
from pyspark.sql.functions import col
# Split Union Dataset
scaled_train = raw_scaled_df.filter(col("id").isin(track_list))
scaled_test = raw_scaled_df.filter(~col("id").isin(track_list))
스케일링이 완료되었으면 이제 Dataset을 다시 Train/Test로 분할해야 합니다. 앞서 생성한 'track_list' 정보를 filter 함수에 제공하여 데이터셋을 분할할 수 있습니다.
데이터 군집화(KMeans) 및 예측
이번 포스트에서는 가장 기본적인 군집화 모델인 KMeans를 사용하여 데모를 진행하였습니다. Spark ML의 KMeans 모델은 사용법이 정형화되어 있기 때문에 공식 문서를 그대로 참조하여 수행합니다.
Spark의 모든 ML 클래스는 예측 작업을 수행함에 있어 '벡터화된 특성 데이터 컬럼'을 필요로 합니다. 일반적으로는 pyspark.ml.feature 내 VectorAssembler 클래스를 사용하여 입력할 컬럼들을 단일 벡터 컬럼으로 묶어주는 작업을 수행하고, 해당 벡터 컬럼을 통해 예측을 수행하게 됩니다.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
selected_features = ["popularity", "key", "mode", "time_signature", "tempo", "acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "speechiness", "valence"]
# Define Assembler
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
# Assemble Features
df_assembled_train = assembler.transform(scaled_train)
df_assembled_test = assembler.transform(scaled_test)
# Create Model == Train Dataset
kmeans = KMeans(featuresCol="features", k=4, seed=1)
model = kmeans.fit(df_assembled_train)
# Return Cluster Centers
centers = model.clusterCenters()
# Test Dataset
df_result = model.transform(df_assembled_test)
해당 로직에서는 이하의 프로세스를 진행하고 있습니다.
- assembler을 정의하여 각 Dataset에 Vector 특성 컬럼 생성
- Train Dataset을 통해 데이터 학습 및 모델 정의 / 각 클러스터의 중앙 좌표값 저장
- Test Dataset 예측(데이터 군집화)
해당 로직을 통해 저장한 '각 클러스터의 중앙 좌표값' 정보는 추후 데이터를 선별하는 과정에서 사용되는 매우 중요한 데이터입니다.
각 데이터 군집에서 데이터 선별하기
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import numpy as np
# Define UDF
def calculate_distance(vector):
return float(np.linalg.norm(vector.toArray() - numpy_coordinates))
calculate_distance_udf = udf(calculate_distance, DoubleType())
# Create Recommend List
recommend_list = []
for i in range(4):
numpy_coordinates = centers[i]
df_with_distance = df_result \
.filter(f"prediction={i}") \
.select("id", "features") \
.withColumn('distance', calculate_distance_udf(col('features'))) \
.orderBy("distance", ascending=True)
collect_list = [row.id for row in df_with_distance.limit(5).collect()]
recommend_list += collect_list
해당 로직에서는 이하의 프로세스를 진행하고 있습니다.
- spark udf(User Defined Function)를 정의하여 좌표 사이의 거리 계산
- 각 군집별로 데이터의 좌표값을 중앙값과 비교하여 거리 컬럼 생성
- 거리 컬럼을 기준으로 Ascending Sort 하여 가장 가까운 거리의 데이터 5개씩 선별
이전의 로직에서 생성한 '각 클러스터의 중앙 좌표값' 데이터가 Numpy Array 형태로 생성되어있기 때문에, 거리를 비교하는 과정에서 Spark Dataframe 내 데이터를 Numpy Array 형태로 변형시켜 구현하고 있습니다.
🔎 결과
플레이리스트 데이터를 기반으로 한 추천곡 리스트가 완성되었습니다!
Spotify API 서비스를 사용하면 해당 추천 결과를 직접 Spotify 계정에 반영하는 등 다양한 방식의 확장도 가능합니다.
📝 마치며
개인 프로젝트로 진행하고 있는 'Spotify Demo Project'에서 '사용자의 플레이리스트를 기반으로 한 추천 시스템'을 만들고 적용해 보았습니다. 아직 로직 구현에 있어 심도 깊은 데이터 분석을 진행하지는 못했으나, 제가 즐겨 듣는 음악들과 성향이 유사한 곡들이 추가되는 것을 확인할 수 있었습니다(정말 마음에 드는 곡은 20곡 중 2~3곡 정도지만..).
해당 로직은 앞서 언급한 로직처럼 pyspark 내부의 ml 라이브러리를 기반으로 데이터를 군집화(Clustering)하여 선별한 것인데, 데이터를 기반으로 한 추천 시스템을 구현할 경우 군집화가 단순하면서도 효과적인 강력한 메서드라는 생각이 들었습니다. 중요하게 생각되는 컬럼들을 선별하고 스케일링만 진행한다면, 추가적인 분석 없이 빠르게 결과를 반환할 수 있으니까요. 컬럼을 조금 더 구체적인 조건을 통해 선별하고, 추가적인 로직을 통해 보완하면 보다 완성도 높은 추천 시스템을 구현할 수 있을 것 같습니다.
* 해당 프로젝트의 GitHub 소스 코드는 아래의 링크에서 확인하실 수 있습니다.
GitHub - Spotify-DemoProject/recommendation: Trial & Error to Build Recommendation System Using pypark.ml
Trial & Error to Build Recommendation System Using pypark.ml - GitHub - Spotify-DemoProject/recommendation: Trial & Error to Build Recommendation System Using pypark.ml
github.com
발자취를 로그처럼 남기고자 하는 초보 개발자의 블로그