Skip to content

Performing ETL pipeline with Spark, on data lake resides in S3

Notifications You must be signed in to change notification settings

huaigulin/Data_Lake_with_Spark_Project

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data Lake with Spark Project

  • The purpose of the project is to apply what I have learnt on data lake, Spark and AWS, and build an ETL pipeline for a data lake hosted on Amazon S3.

Background

  • An imaginary startup called Sparkify has a new music streaming app, and the company's data analytics team is interested in knowing what songs its users are listening to.

  • Sparkify has two datasets before this project:

    1. The song dataset resides in s3://udacity-dend/song_data, and each JSON file in the directory contains the metadata about a real song and its artist. For example, the file TRAAAAW128F429D538.json looks like:

      {
        "num_songs": 1,
        "artist_id": "ARD7TVE1187B99BFB1",
        "artist_latitude": null,
        "artist_longitude": null,
        "artist_location": "California - LA",
        "artist_name": "Casual",
        "song_id": "SOMZWCG12A8C13C480",
        "title": "I Didn't Mean To",
        "duration": 218.93179,
        "year": 0
      }

      The file name is the song's track ID. And the files in the S3 bucket are partitioned by the first 3 letters of their track IDs:

      s3://udacity-dend/song-data/A/A/A/TRAAAAW128F429D538.json
      s3://udacity-dend/song-data/A/A/C/TRAACCG128F92E8A55.json
      

      Note: this dataset is a subset of real data from Million Song Dataset.

    2. The log dataset resides in s3://udacity-dend/log_data, and each JSON file in the directory is a log file that contains user activities from the music streaming app. For example, 2018-11-01-events.json looks like:

      {"artist":null,"auth":"Logged In","firstName":"Walter","gender":"M","itemInSession":0,"lastName":"Frye","length":null,"level":"free","location":"San Francisco-Oakland-Hayward, CA","method":"GET","page":"Home","registration":1540919166796.0,"sessionId":38,"song":null,"status":200,"ts":1541105830796,"userAgent":"\"Mozilla\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"","userId":"39"}
      {"artist":null,"auth":"Logged In","firstName":"Kaylee","gender":"F","itemInSession":0,"lastName":"Summers","length":null,"level":"free","location":"Phoenix-Mesa-Scottsdale, AZ","method":"GET","page":"Home","registration":1540344794796.0,"sessionId":139,"song":null,"status":200,"ts":1541106106796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"","userId":"8"}
      {"artist":"Des'ree","auth":"Logged In","firstName":"Kaylee","gender":"F","itemInSession":1,"lastName":"Summers","length":246.30812,"level":"free","location":"Phoenix-Mesa-Scottsdale, AZ","method":"PUT","page":"NextSong","registration":1540344794796.0,"sessionId":139,"song":"You Gotta Be","status":200,"ts":1541106106796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"","userId":"8"}
      {"artist":null,"auth":"Logged In","firstName":"Kaylee","gender":"F","itemInSession":2,"lastName":"Summers","length":null,"level":"free","location":"Phoenix-Mesa-Scottsdale, AZ","method":"GET","page":"Upgrade","registration":1540344794796.0,"sessionId":139,"song":null,"status":200,"ts":1541106132796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"","userId":"8"}
      {"artist":"Mr Oizo","auth":"Logged In","firstName":"Kaylee","gender":"F","itemInSession":3,"lastName":"Summers","length":144.03873,"level":"free","location":"Phoenix-Mesa-Scottsdale, AZ","method":"PUT","page":"NextSong","registration":1540344794796.0,"sessionId":139,"song":"Flat 55","status":200,"ts":1541106352796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"","userId":"8"}
      {"artist":"Tamba Trio","auth":"Logged In","firstName":"Kaylee","gender":"F","itemInSession":4,"lastName":"Summers","length":177.18812,"level":"free","location":"Phoenix-Mesa-Scottsdale, AZ","method":"PUT","page":"NextSong","registration":1540344794796.0,"sessionId":139,"song":"Quem Quiser Encontrar O Amor","status":200,"ts":1541106496796,"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"","userId":"8"}
      ...

      The file name represents the date that the log file was generated. And the files in the S3 bucket are partitioned by year and month:

      s3://udacity-dend/log_data/2018/11/2018-11-12-events.json
      s3://udacity-dend/log_data/2018/11/2018-11-13-events.json
      

      Note: this dataset is generated by this event simulator based on the songs in the song dataset mentioned above.

  • Since it is not easy to query song play data from these two datasets, my job is using Spark to build an ETL pipeline to create a database in the form of parquet files on Amazon S3 to help the data analytics team analyze song play data.

Database Schema

  • For the purpose of song play analysis, I create a star schema in which there are a fact table songplays and four dimension tables users, songs, artists and time:

    1. The songplays table holds records in the log dataset that are associated with song plays i.e. records with page NextSong. The table has the following columns:

      Column Data Type Comment
      songplay_id long Auto-generated when a row is inserted into the table.
      start_time timestamp The timestamp that the user starts to play the song, which corresponds to the ts(timestamp) data from the log dataset.
      user_id string The user's id, which corresponds to the userId data from the log dataset.
      level string The user's level(subscription type), which corresponds to the level data from the log dataset.
      song_id string The id of the song that's been played, which corresponds to the song_id data from the song dataset and can be retreived from songs table after joining songs and artists tables.
      artist_id string The artist id of the song that's been played, which corresponds to the artist_id data from the song dataset and can be retreived from artists table after joining songs and artists tables.
      session_id long The id of the user's playing session, which corresponds to the sessionId data from the log dataset.
      location string The location of the user, which corresponds to the location data from the log dataset.
      user_agent string The device that the user used to play the song, which corresponds to the userAgent data from the log dataset.
    2. The users table holds data for users of the app. It has the following columns:

      Column Data Type
      user_id integer
      first_name string
      last_name string
      gender string
      level string
    3. The songs table holds data for all the songs in the song dataset. The columns are:

      Column Data Type
      song_id string
      title string
      artist_id string
      year integer
      duration double
    4. The artists table holds data for all the artists in the song dataset. The columns are:

      Column Data Type
      artist_id string
      name string
      location string
      latitude double
      longitude double
    5. The time table is the timestamps of records in songplays broken down into specific units. The columns are:

      Column Data Type
      start_time timestamp
      hour integer
      day integer
      week integer
      month integer
      year integer
      weekday string

ETL Pipeline

  • etl.py contains code for connecting to AWS EMR, creating a Spark session and running ETL pipeline, including extracting data from AWS S3, transforming data with Spark using schema on read technique, and loading data back into S3.

How to Run

  1. Create an IAM Role and an EMR cluster on AWS and get ACCESS_KEY_ID and SECRET_ACCESS_KEY from an IAM role that can access EMR. Fill out the information in dl.cfg config file.

  2. Create an S3 bucket called sparkify-tables, which is used to store the parquet file output of the ETL pipeline.

  3. Run ETL pipeline:

    python etl.py
    
  4. Wait until ETL pipeline finish running and check tables created in S3. Note that running the pipeline takes long time since there are many input files. To make it faster, we can limit input to some part of the whole directory. For example, use "s3a://udacity-dend/song_data/A/A/A/*.json" instead of "s3a://udacity-dend/song_data/*/*/*/*.json"

Results

About

Performing ETL pipeline with Spark, on data lake resides in S3

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published