Page cover

Spark DataFrames

Wrangling Data with DataFrames

The following code blocks provide an illustration of wrangling a music service log data.

First, the libraries are imported, a SparkSession in instantiated, and then dataset is read.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("Wrangling Data").getOrCreate()

path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

Data Exploration

The next code block shows the schema of the dataset.

Output:

The next code block explores the dataset and its statistics:

The next code blocks selects specific columns from the dataframes at explores them:

Output:

Output:

Calculating Statistics by Hour

Following code block calculates number of songs users are listening to in a particular hour.

Below are the details about the above code block:

  • The first line defines a user defined function to compute the timestamp to datetime from epoch time, ultimately providing the hour of the day.

  • Next, we add a new column, "hour" to the dataframe.

  • Then, "next song page requests" are counted and the result is ordered by the "hour".

  • Next, we call the method show() on the dataframe otherwise the above statement won't be evaluated.

  • Finally, the Spark Dataframe is converted to a Pandas Dataframe and the statistics are visualized using Matplotlib.

Drop Rows with Missing Values

It turns out there are no missing values in the useId or sessionId columns.

But there are userId values that are empty strings.

Output:

Following code block filters out all the userId entries with empty strings.

Users Downgrade Their Accounts

First, let's find a user who downgraded their service from a paid one to a free one.

Output:

Apparently, userId 1138, a user called Kelly downgraded her service. Let's look at her user activity:

Output:

Next, we will create a new column downgradedwhich flags those log entries where users donwgrade their accounts.

Output:

Next, we use a Window function and cumulative sum to distinguish each user's data as either pre or post downgrade events.

Output:

Last updated