Spark DataFrames
Wrangling Data with DataFrames
The following code blocks provide an illustration of wrangling a music service log data.
First, the libraries are imported, a SparkSession in instantiated, and then dataset is read.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Wrangling Data").getOrCreate()
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)
Data Exploration
The next code block shows the schema of the dataset.
print(user_log.printSchema())
Output:
root
|-- artist: string (nullable = true)
|-- auth: string (nullable = true)
|-- firstName: string (nullable = true)
|-- gender: string (nullable = true)
|-- itemInSession: long (nullable = true)
|-- lastName: string (nullable = true)
|-- length: double (nullable = true)
|-- level: string (nullable = true)
|-- location: string (nullable = true)
|-- method: string (nullable = true)
|-- page: string (nullable = true)
|-- registration: long (nullable = true)
|-- sessionId: long (nullable = true)
|-- song: string (nullable = true)
|-- status: long (nullable = true)
|-- ts: long (nullable = true)
|-- userAgent: string (nullable = true)
|-- userId: string (nullable = true)
The next code block explores the dataset and its statistics:
print(user_log.take(5))
print(user_log.describe().show())
print(user_log.describe("artist").show())
print(user_log.describe("sessionId").show())
print(user_log.count())
The next code blocks selects specific columns from the dataframes at explores them:
print(user_log.select("page").dropDuplicates().sort('page').show())
Output:
+----------------+
| page|
+----------------+
| About|
| Downgrade|
| Error|
| Help|
| Home|
| Login|
| Logout|
| NextSong|
| Save Settings|
| Settings|
|Submit Downgrade|
| Submit Upgrade|
| Upgrade|
+----------------+
print(user_log.select(["userId", "firstname", "page", "song"]) \
.where(user_log.userId == "1046").collect())
Output:
[Row(userId='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
Row(userId='1046', firstname='Kenneth', page='Home', song=None),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='War on war'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
Row(userId='1046', firstname='Kenneth', page='Logout', song=None),
Row(userId='1046', firstname='Kenneth', page='Home', song=None),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Heads Will Roll'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Bleed It Out [Live At Milton Keynes]'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Clocks'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Love Rain'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song="Ry Ry's Song (Album Version)"),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='The Invisible Man'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Ask The Mountains'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Given Up (Album Version)'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='El Cuatrero'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Hero/Heroine'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Spring'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Rising Moon'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Tough Little Boys'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song="Qu'Est-Ce Que T'Es Belle"),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Secrets'),
Row(userId='1046', firstname='Kenneth', page='NextSong', song='Under The Gun')]
Calculating Statistics by Hour
Following code block calculates number of songs users are listening to in a particular hour.
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
user_log = user_log.withColumn("hour", get_hour(user_log.ts))
songs_in_hour = user_log.filter(user_log.page == "NextSong") \
.groupby(user_log.hour).count() \
.orderBy(user_log.hour.cast("float"))
print(songs_in_hour.show())
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played")
Below are the details about the above code block:
The first line defines a user defined function to compute the timestamp to datetime from epoch time, ultimately providing the hour of the day.
Next, we add a new column, "hour" to the dataframe.
Then, "next song page requests" are counted and the result is ordered by the "hour".
Next, we call the method
show()
on the dataframe otherwise the above statement won't be evaluated.Finally, the Spark Dataframe is converted to a Pandas Dataframe and the statistics are visualized using Matplotlib.
Drop Rows with Missing Values
It turns out there are no missing values in the useId
or sessionId
columns.
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])
user_log_valid.count()
# Output would be 10000
But there are userId
values that are empty strings.
user_log.select("userId").dropDuplicates().sort("userId").show()
Output:
+------+
|userId|
+------+
| |
| 10|
| 100|
| 1000|
| 1003|
| 1005|
| 1006|
| 1017|
| 1019|
| 1020|
| 1022|
| 1025|
| 1030|
| 1035|
| 1037|
| 104|
| 1040|
| 1042|
| 1043|
| 1046|
+------+
only showing top 20 rows
Following code block filters out all the userId
entries with empty strings.
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")
user_log_valid.count()
# Output would be 9664
Users Downgrade Their Accounts
First, let's find a user who downgraded their service from a paid one to a free one.
user_log_valid.filter("page = 'Submit Downgrade'").show()
Output:
+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+----------------+-------------+---------+----+------+-------------+--------------------+------+----+
|artist| auth|firstName|gender|itemInSession|lastName|length|level| location|method| page| registration|sessionId|song|status| ts| userAgent|userId|hour|
+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+----------------+-------------+---------+----+------+-------------+--------------------+------+----+
| null|Logged In| Kelly| F| 24| Newton| null| paid|Houston-The Woodl...| PUT|Submit Downgrade|1513283366284| 5931|null| 307|1513768454284|Mozilla/5.0 (Wind...| 1138| 11|
+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+----------------+-------------+---------+----+------+-------------+--------------------+------+----+
Apparently, userId
1138, a user called Kelly downgraded her service. Let's look at her user activity:
user_log.select(["userId", "firstname", "page", "level", "song"]).where(user_log.userId == "1138").collect()
Output:
[Row(userId='1138', firstname='Kelly', page='Home', level='paid', song=None),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Everybody Everybody'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Gears'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Use Somebody'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Love Of My Life (1993 Digital Remaster)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Down In The Valley Woe'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Treat Her Like A Lady'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="Everybody Thinks You're An Angel"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Fourteen Wives'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Love On The Rocks'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Breakeven'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Leaf House'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='NAISEN KANSSA'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="You're In My Heart"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Roll On Down The Highway'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Plasticities (Remix)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Secrets'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Hello'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='I Never Told You'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Love Break Me'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='One Touch One Bounce'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Undo'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Overdue (Blackbeard Remix)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Slave To Love (1999 Digital Remaster)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Stronger'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='All Of Us (Album Version)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Sehr kosmisch'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='March Of The Celts'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Electricity'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Aces High'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Bananeira'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='The General'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='HÃ\x83©roe De Leyenda (VersiÃ\x83³n Maxi)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="Don't Stop The Music"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="You're The One"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Entering White Cecilia'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Piccolo Cesare'),
Row(userId='1138', firstname='Kelly', page='Help', level='paid', song=None),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Last Christmas (Album Version)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='You Shook Me'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Going Steady'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='My Name Is'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Undo'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Secrets'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Good Times Gone (Album Version)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Angelito'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Batdance ( LP Version )'),
Row(userId='1138', firstname='Kelly', page='Home', level='paid', song=None),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='DiÃ\x83¡kdal'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Whirring'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Potholderz (feat. Count Bass D)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Seaside'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Louder Than A Bomb'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Just Like You'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="You're The One"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Turn It Again (Album Version)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Everywhere I Go'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song="Easy Skankin'"),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Roses'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Killing Me Softly With His Song'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='The Razor (Album Version)'),
Row(userId='1138', firstname='Kelly', page='NextSong', level='paid', song='Idols and Anchors'),
Row(userId='1138', firstname='Kelly', page='Downgrade', level='paid', song=None),
Row(userId='1138', firstname='Kelly', page='Submit Downgrade', level='paid', song=None),
Row(userId='1138', firstname='Kelly', page='Home', level='free', song=None),
Row(userId='1138', firstname='Kelly', page='NextSong', level='free', song='Bones'),
Row(userId='1138', firstname='Kelly', page='Home', level='free', song=None),
Row(userId='1138', firstname='Kelly', page='NextSong', level='free', song='Grenouilles Mantidactylus (Small Frogs)')]
Next, we will create a new column downgraded
which flags those log entries where users donwgrade their accounts.
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))
user_log_valid.head()
Output:
Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth',
gender='M', itemInSession=112, lastName='Matthews', length=232.93342,
level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT',
page='NextSong', registration=1509380319284, sessionId=5132,
song='Christmas Tears Will Fall', status=200, ts=1513720872284,
userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"',
userId='1046', hour='22', downgraded=0)
Next, we use a Window
function and cumulative sum to distinguish each user's data as either pre or post downgrade events.
from pyspark.sql import Window
windowval = Window.partitionBy("userId").orderBy(desc("ts")). \
rangeBetween(Window.unboundedPreceding, 0)
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded"). \
over(windowval))
user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]). \
where(user_log.userId == "1138").sort("ts").collect()
Output:
[Row(userId='1138', firstname='Kelly', ts=1513729066284, page='Home', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513729066284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513729313284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513729552284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513729783284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513730001284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513730263284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513730518284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513730768284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513731182284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513731435284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513731695284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513731857284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513732160284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513732302284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513732540284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513732770284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513732994284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513733223284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513733456284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513733738284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513733941284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513734289284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513734598284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513734863284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513735174284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513735385284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513736040284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513736237284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513736446284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513736709284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513736915284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513737160284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513737460284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513737728284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513737936284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513738144284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513738197284, page='Help', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513738432284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513763195284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513763453284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513763622284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513763890284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513764238284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513764462284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513764782284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513765075284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513765075284, page='Home', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513765448284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513765655284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513765818284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513766091284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513766189284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513766385284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513766599284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513766838284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513767203284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513767413284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513767643284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768012284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768242284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768452284, page='NextSong', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768453284, page='Downgrade', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768454284, page='Submit Downgrade', level='paid', phase=1),
Row(userId='1138', firstname='Kelly', ts=1513768456284, page='Home', level='free', phase=0),
Row(userId='1138', firstname='Kelly', ts=1513814880284, page='NextSong', level='free', phase=0),
Row(userId='1138', firstname='Kelly', ts=1513821430284, page='Home', level='free', phase=0),
Row(userId='1138', firstname='Kelly', ts=1513833144284, page='NextSong', level='free', phase=0)]
Last updated