Spotify's editorial playlists are "carefully curated by [its] music experts and genre specialists from around the globe" (Spotify for Artists). Every artist knows the importance of being added to a Spotify editorial playlist. As an artist, I know when I get the notification that a song of mine has been added to a Spotify playlist that it will gain thousands of streams a day for the duration of its time on that playlist. One of my favorite playlists to be added to in the indie genre is 'undercurrents.' Wouldn't it be great as an artist if I could write and release songs that I knew would be best suited for this playlist? Knowing what type of song a playlist curator is looking for could help an artist choose which song to pitch. Further, I think it's important that artists know how their songs are categorized in terms of features. We may think we have an upbeat, danceable song, but if Spotify categorizes it differently, we'd want to know that. Are there certain aspects of a song that affect whether or not it is playlisted? My objective is to identify those aspects, as well as predict whether or not a song will be playlisted based on those aspects.
Using spotipy, I first pulled the tracks and artist names from 'undercurrents.' Then, I created a dataset of every artist's discography with an indicator variable = 1 if the song is on 'undercurrents' and 0 otherwise. Next, I wrote a function that added features of every track to the dataset. Finally, I scaled the features and dropped any songs that did not have features available. The resulting dataset has 1547 songs by 64 unique artists with 10 song features.
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import numpy as np
auth_manager = SpotifyClientCredentials()
sp = spotipy.Spotify(auth_manager=auth_manager)
# returns dataframe of all playlist tracks, track ids, artists, and artist ids given a playlist uri
def playlist_df(playlist):
results = sp.playlist_tracks(playlist, fields = 'items')
A = []
B = []
C = []
D = []
for i in range(len(results['items'])):
A.append(results['items'][i]['track']['name'])
B.append(results['items'][i]['track']['id'])
C.append(results['items'][i]['track']['artists'][0]['name'])
D.append(results['items'][i]['track']['artists'][0]['id'])
dict = {'Track': A, 'Track ID': B, 'Artist': C, 'Artist ID': D}
df = pd.DataFrame(dict)
return df
# returns lists of artist's discography in tracks and ids given artist id
def artist_albums(artist_id):
track_ids = []
albums = []
tracks = []
results = sp.artist_albums(artist_id, album_type='album')
albums.extend(results['items'])
while results['next']:
results = sp.next(results)
albums.extend(results['items'])
unique = set() # skip duplicate albums
for album in albums:
name = album['name'].lower()
if name not in unique:
unique.add(name)
this_tracks, this_id = show_album_tracks(album)
tracks += this_tracks
track_ids += this_id
this_tracks, this_id = top_tracks(artist_id)
tracks += this_tracks
track_ids += this_id
tracks, ids = unique_tracks(tracks, track_ids)
return tracks, ids
# given lists of tracks and ids, returns lists of only unique tracks and ids
def unique_tracks(ls_t, ls_i):
unique_t = []
unique_i = []
for i in range(0,len(ls_t)):
if ls_t[i] not in unique_t:
unique_t.append(ls_t[i])
unique_i.append(ls_i[i])
return unique_t, unique_i
# given album, returns list of all tracks and track ids from that album
def show_album_tracks(album):
ids = []
tracks = []
track_names = []
results = sp.album_tracks(album['id'])
tracks.extend(results['items'])
while results['next']:
results = sp.next(results)
tracks.extend(results['items'])
for track in tracks:
track_names.append(track['name'])
ids.append(track['id'])
return track_names, ids
# returns top tracks of artist
def top_tracks(artist_id):
t = []
d = []
tracks = sp.artist_top_tracks(artist_id)
for i in range(len(tracks['tracks'])):
t.append(tracks['tracks'][i]['album']['name'])
d.append(tracks['tracks'][i]['album']['id'])
return t, d
# function that adds all artists' discographies to df given current df's artist IDs
def add_songs(df):
old_df = df
for i in range(0, len(df['Artist ID'])):
artist = df['Artist'][i]
artist_id = df['Artist ID'][i]
tracks, ids = artist_albums(artist_id)
for j in range(0, len(tracks)):
new_df = pd.DataFrame([tracks[j], ids[j], artist, artist_id])
new_df = new_df.transpose()
new_df = new_df.rename(columns = {0: 'Track', 1: 'Track ID', 2: 'Artist', 3: 'Artist ID'}, index = {0: len(df['Artist ID'])+ i + 1})
df = df.append(new_df)
df2 = df.sort_values(by = ['Artist'])
df2 = df2.reset_index(drop = True)
df2['Playlisted'] = df2['Track'].isin(old_df['Track']).astype(int)
return df2
# function that adds features to df for every song
def features(df):
danceability = []
energy = []
key = []
loudness = []
mode = []
speechiness = []
acousticness = []
instrumentalness = []
liveness = []
valence = []
tempo = []
dur = []
sig = []
ls = [danceability, energy, key, loudness, mode, speechiness,acousticness, instrumentalness, liveness, valence,
tempo, dur, sig]
quotes = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness','acousticness', 'instrumentalness', 'liveness', 'valence',
'tempo', 'duration_ms', 'time_signature']
for id_ in df['Track ID']:
features = sp.audio_features(id_)
if features == [None]:
for item in ls:
item.append(np.nan)
else:
features = features[0]
for i,j in zip(ls,quotes):
i.append(features[j])
dic = {'Danceability': danceability, 'Energy': energy, 'Key': key, 'Loudness': loudness, 'Mode': mode,
'Speechiness': speechiness, 'Acoustic': acousticness, 'Instrumental': instrumentalness,
'Liveness': liveness, 'Valence': valence, 'Tempo': tempo, 'Dur': dur, 'Sig': sig}
return dic
from sklearn.preprocessing import StandardScaler
# function that cleans the dataframe (gets rid of null values, resets index, and sales features)
def clean(df):
df = df.dropna()
df = df.reset_index(drop = True)
addback = [df['Playlisted'], df['Artist ID'], df['Artist'], df['Track ID'], df['Track'],]
to_scale = df.drop(columns = ['Playlisted','Track', 'Track ID', 'Artist','Artist ID'])
scaler = StandardScaler()
data_scaled = pd.DataFrame(scaler.fit_transform(to_scale), columns = to_scale.columns)
for column in addback:
data_scaled = pd.concat([pd.DataFrame(column), data_scaled], axis = 1)
return data_scaled
def main():
playlist_id = 'spotify:playlist:37i9dQZF1DX9myttyycIxA'
df = playlist_df(playlist_id)
df = add_songs(df)
feats = pd.DataFrame(features(df))
df = pd.concat([feats, df], axis = 1)
df = clean(df)
df.to_csv(r'/Users/harlanhutton/Documents/spotify/undercurrentsdata.csv', index = False)
main()
from IPython.display import display, HTML
data = pd.read_csv('/Users/harlanhutton/Documents/spotify/undercurrentsdata.csv')
data = data.drop(columns = ['Track', 'Track ID', 'Artist', 'Artist ID'])
def getDfSummary(dat):
'''
Get descriptive stats
'''
#Get the names of the columns
cols = data.columns.values
c_summ = []
#Outer Loop for the cols
for c in cols:
#Count the NAs
missing = sum(pd.isnull(dat[c]))
#Use describe to get summary statistics, and also drop the 'count' row
sumval = dat[c].describe().drop(['count'])
#Now count distinct values...note that nunique removes missing values for you
distinct = dat[c].nunique()
#Append missing and distinct to sumval
sumval = sumval.append(pd.Series([missing, distinct], index=['missing', 'distinct']))
#Add each sumval to a list and then convert the entire thing to a DS
c_summ.append(sumval)
return pd.DataFrame(c_summ, index=cols)
desc = getDfSummary(data)
display(HTML(desc.to_html()))
Looking at the above description of the data, I can confirm that there are no missing values. and that all of the data minus the target variable is normalized with mean 0 and std 1. Looking at the number of distinct values, the variables range from binary (like mode), to almost all unique, like duration or tempo.
As another means of exploratory analysis, I want to look at the covariance matrix of the features. There are a few correlations between features that make sense intuitively, like loudness and energy, acousticness and energy, valence and danceability, and valence and energy. Valence is the how positive (happy, cheerful) or negative (sad, angry) a song sounds. Thinking ahead to model selection, if I use a tree-based algorithm I will not need to correct for these correlations, as they are generally robust enough to handle these correlations. If I use a linear algorithm which is more sensitive to highly correlated features, I may want to drop some features or transform the data using Principal Components. However, because I don't have an overwhelming number of features, and most have correlations less than 0.3, I will probably not need to use PCA.
Y = data['Playlisted']
X = data.drop('Playlisted',1)
corr = X.corr()
corr.style.background_gradient(cmap='coolwarm')
Next, I want to take a look at the mutual information between the features and target variable. I can get some clues as to the best drivers of playlisting, as well as use this if I decide to go for dimensionality reduction. In the graph below, I can see that key, mode, and signature do not have a strong relationship with playlisted. The rest of the mutual info between the features and playlisted make intuitive sense, except I was surprised to see that as loudness increases, the likelihood of being playlisted decreases.
import sklearn.metrics as skm
import matplotlib.pyplot as plt
def plotMI(dat, lab, width = 0.35, signed = 0):
'''
Draw a bar chart of the normalized MI between each X and Y
'''
X = dat.drop(lab, 1)
Y = dat[[lab]].values
cols = X.columns.values
mis = []
#Start by getting MI
for c in cols:
mis.append(skm.normalized_mutual_info_score(Y.ravel(), X[[c]].values.ravel()))
#Get signs by correlation
corrs = dat.corr()[lab]
corrs[corrs.index != lab]
df = pd.DataFrame(list(zip(mis, cols)), columns = ['MI', 'Lab'])
df = pd.concat([df, pd.DataFrame(list(corrs), columns = ['corr'])], axis=1, ignore_index = False, join = "inner")
if signed == 0:
makeBar(df, 'MI', 'Lab', width)
else:
makeBarSigned(df, 'MI', 'Lab', width)
def makeBarSigned(df, h, lab, width):
'''
Contains
'''
df_s = df.sort_values(by = [h], ascending = False)
#Get a barplot
ind = np.arange(df_s.shape[0])
labs = df_s[[lab]].values.ravel()
h_pos = (df_s[['corr']].values.ravel() > 0) * df_s.MI
h_neg = (df_s[['corr']].values.ravel() < 0) * df_s.MI
fig = plt.figure(facecolor = 'w', figsize = (12, 6))
ax = plt.subplot(111)
plt.subplots_adjust(bottom = 0.25)
rec = ax.bar(ind + width, h_pos, width, color='r', label = 'Positive')
rec = ax.bar(ind + width, h_neg, width, color='b', label = 'Negative')
ax.set_xticks(ind + getTickAdj(labs, width))
ax.set_xticklabels(labs, rotation = 45, size = 14)
plt.legend()
def makeBar(df, h, lab, width):
'''
Contains
'''
df_s = df.sort_values(by = [h], ascending = False)
#Get a barplot
ind = np.arange(df_s.shape[0])
labs = df_s[[lab]].values.ravel()
fig = plt.figure(facecolor = 'w', figsize = (12, 6))
ax = plt.subplot(111)
plt.subplots_adjust(bottom = 0.25)
rec = ax.bar(ind + width, df_s[h].values, width, color='r')
ax.set_xticks(ind + getTickAdj(labs, width))
ax.set_xticklabels(labs, rotation = 45, size = 14)
def getTickAdj(labs, width):
lens = list(map(len, labs))
lens = -1 * width * (lens - np.mean(lens)) / np.max(lens)
return lens
plotMI(data, 'Playlisted', 0.4, signed=1)
plt.title('Mutual Information Between Feature and Playlisted');
plt.ylabel('Mutual Information');
A final exploratory step I want to take is variable importance. This is just one more way I can see which features will add the most significance to a potential model. Because signature and mode are again the least important, I'm going to drop them from the dataset when I do dimensionality reduction with confidence that it will not affect a future model's accuracy. Because loudness did better than energy in terms of both mutual info and variable importance, I'm going to drop energy as well to avoid that correlation of 0.8.
# add features and their importances to a dictionary
from sklearn.tree import DecisionTreeClassifier
dt = DecisionTreeClassifier()
dt.fit(X,Y)
feature_mi = dt.feature_importances_
mi_dict = dict(zip(X.columns.values, feature_mi))
# sort dict and plot
sorted_dict = {k: v for k, v in sorted(mi_dict.items(), reverse = True, key=lambda item: item[1])}
plt.figure(facecolor = 'w', figsize = (12, 6))
plt.bar(sorted_dict.keys(), sorted_dict.values(), color = 'cornflowerblue')
plt.xticks(range(X.shape[1]), sorted_dict.keys(), fontsize=12, rotation=45)
plt.title('Feature Importance')
plt.xlabel('Feature')
plt.ylabel('Importance')
plt.show()
Now that I feel I have a good grasp on my features, I am going to run a bakeoff for different models. Because my data is imbalanced, I am going to upsample as well. (I will first run the bakeoff on the data without upsampling, and then compare performance of models). To confirm my features analysis, I will also assess the performance of the models with the dropped features and without.
Algorithms: Logistic Regression, Naive Bayes, Random Forest, Decision Tree, K Nearest Neighbors, Gradient Boosted Tree
Hyperparameters: Need to choose appropriate ranges for every model's specific hyperparams.
Features: All included vs. without energy, signature, and mode
Imbalanced Data: Without upsampling playlisted songs vs. with upsampling playlisted songs
I will use AUC, accuracy, precision, and recall to evaluate the models. I will split the data into 80% train, 20% test.
from sklearn.model_selection import *
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import *
from imblearn.over_sampling import SMOTE
models = [LogisticRegression(), GaussianNB(), RandomForestClassifier(), DecisionTreeClassifier(), KNeighborsClassifier(), GradientBoostingClassifier()]
labels = ["Log Reg", "Naive Bayes", "Random Forest", "Decision Tree", "K Nearest Neighbors", "Gradient Boosting" ]
def plot_roc_curve(models, X_train, y_train, X_test, y_test, dropped, upsampled, labels, design_params, ax1, ax2):
fpr = []
tpr = []
auc = []
acc = []
recall = []
precision = []
these_labels = []
f_scores = []
if dropped == True:
X_train = X_train.drop(['Sig', 'Mode', 'Energy'],1)
X_test = X_test.drop(['Sig', 'Mode', 'Energy'],1)
for i in range(len(models)):
mod = models[i].fit(X_train, y_train)
preds = mod.predict_proba(X_test)[:,1]
this_fpr,this_tpr,thresh = roc_curve(y_test, preds)
auc.append( roc_auc_score(y_test, preds) )
fpr.append( this_fpr )
tpr.append( this_tpr )
acc.append(accuracy_score(y_test, mod.predict(X_test)))
recall.append(recall_score(y_test,mod.predict(X_test) ))
precision.append(precision_score(y_test, mod.predict(X_test), zero_division = 0))
f_scores.append(f1_score(y_test, mod.predict(X_test)))
axs[ax1,ax2].plot(this_fpr,this_tpr, label = labels[i] + ', AUC = {}, F1 = {}'.format(round(auc[i],2), round(f_scores[i],2)))
these_labels.append(labels[i] + design_params)
axs[ax1,ax2].plot([0,1],[0,1], 'b--', label = 'Random Chance')
axs[ax1,ax2].title.set_text('ROC Curve {}'.format(design_params))
axs[ax1,ax2].set_xlabel('False Positive Rate')
axs[ax1,ax2].set_ylabel('True Positive Rate')
axs[ax1,ax2].legend();
df = pd.DataFrame(these_labels, columns = ['Model'])
df['Accuracy'],df['AUC'],df['Recall'],df['Precision'],df['F1 Score'] = acc,auc,recall,precision,f_scores
return df
fig, axs = plt.subplots(2, 2, figsize=(18, 12))
X_train, X_test, y_train, y_test = train_test_split(data.drop('Playlisted',1), data['Playlisted'], test_size=0.2, random_state=42)
sm = SMOTE(random_state=12, sampling_strategy = 1.0)
up_X_train, up_y_train = sm.fit_sample(X_train, y_train)
subset_imbalanced = plot_roc_curve(models, X_train, y_train, X_test, y_test, dropped = True, upsampled = False, labels = labels, design_params = ": feat. subset, not upsampled", ax1 = 0, ax2 = 0)
allfeats_imbalanced = plot_roc_curve(models, X_train, y_train, X_test, y_test, dropped = False, upsampled = False, labels = labels, design_params = ": all feats, not upsampled", ax1 = 0, ax2 = 1)
subset_upsampled = plot_roc_curve(models, up_X_train, up_y_train, X_test, y_test, dropped = True, upsampled = True, labels = labels, design_params = ": feat. subset, upsampled", ax1 = 1, ax2 = 0)
allfeats_upsampled = plot_roc_curve(models, up_X_train, up_y_train, X_test, y_test, dropped = False, upsampled = True, labels = labels, design_params = ": all feats, upsampled", ax1 = 1, ax2 = 1)
comparison = pd.concat([subset_upsampled, allfeats_upsampled], ignore_index = True)
comparison.sort_values(by=['AUC'], ascending = False)
Based on these out of the bag models, it is apparent that upsampling is generally important for model performance, or the models will learn to only predict the majority class to score well. Based on AUC, K Nearest Neighbors, Random Forest, and Gradient Boosting models performed the best, whether using all features or the feature subset. Because dimensionality isn't a huge problem for this project and these models are pretty robust when it comes to correlated features, I'm going to tune the hyperparameters for these three models with all of the features.
rf_baseline = comparison.loc[comparison['Model'] == 'Random Forest: all feats, upsampled']
rf_baseline.reset_index(drop = True)
from sklearn.model_selection import RandomizedSearchCV
# Number of trees in random forest
n_estimators = [int(x) for x in np.linspace(start = 200, stop = 2000, num = 10)]
# Number of features to consider at every split
max_features = ['auto', 'sqrt']
# Maximum number of levels in tree
max_depth = [int(x) for x in np.linspace(10, 110, num = 11)]
max_depth.append(None)
# Minimum number of samples required to split a node
min_samples_split = [2, 5, 10]
# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 2, 4]
# Method of selecting samples for training each tree
bootstrap = [True, False]
# Create the random grid
random_grid = {'n_estimators': n_estimators,
'max_features': max_features,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf,
'bootstrap': bootstrap}
print(random_grid)
# Use the random grid to search for best hyperparameters
# First create the base model to tune
rf = RandomForestClassifier()
# Random search of parameters, using 3 fold cross validation,
# search across 100 different combinations, and use all available cores
rf_random = RandomizedSearchCV(estimator = rf, param_distributions = random_grid, n_iter = 100, cv = 3, verbose=2, random_state=42, n_jobs = -1)
# Fit the random search model
rf_random.fit(up_X_train, up_y_train)
print(rf_random.best_estimator_)
# best estimator: bootstrap = False, max_depth = 40, n_estimators = 1400
rf_best = RandomForestClassifier(bootstrap=False, max_depth=40, n_estimators=1400)
fpr = []
tpr = []
auc = []
acc = []
recall = []
precision = []
these_labels = []
f_score = []
rf_best.fit(up_X_train,up_y_train)
preds = rf_best.predict_proba(X_test)[:,1]
auc.append(roc_auc_score(y_test, preds))
acc.append(accuracy_score(y_test, rf_best.predict(X_test)))
recall.append(recall_score(y_test,rf_best.predict(X_test)))
precision.append(precision_score(y_test, rf_best.predict(X_test), zero_division = 0))
f_score.append(f1_score(y_test, rf_best.predict(X_test)))
df = pd.DataFrame()
df['Model'],df['Accuracy'],df['AUC'],df['Recall'],df['Precision'],df['F1 Score'] = ['Random Forest Tuned'],acc,auc,recall,precision,f_score
rf_baseline.append(df)