Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DE임태규 - W5M1 #284

Open
wants to merge 9 commits into
base: DE임태규_W5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
####### 미션에서 큰 파일
w2/sentiment_analysis/tweets.csv
w2/sentiment_analysis/webtoon_analysis/webtoon_comments.db

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
33 changes: 0 additions & 33 deletions missions/W1/mtcars.csv

This file was deleted.

Binary file removed slides/W1 Introduction to Data Engineering.pdf
Binary file not shown.
Binary file removed slides/W2 Introduction to Big Data.pdf
Binary file not shown.
206 changes: 206 additions & 0 deletions w5/m1/W5M1.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# W5M1 - RDD"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 라이브러리 및 세션 설정"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkConf\n",
"from pyspark.sql import SparkSession, Row\n",
"from pyspark.sql.functions import isnan, when, count, col, isnull, avg, min\n",
"import pyspark.sql.functions as F\n",
"from operator import add\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master('spark://spark-master:7077') \\\n",
" .appName('W5M1') \\\n",
" .config('spark.executor.memory', '4gb') \\\n",
" .config(\"spark.executor.cores\", \"5\") \\\n",
" .getOrCreate()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 로딩"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"input_file_path = 'hdfs://spark-master:9000/user/hduser/hdfs_data/fhvhv_tripdata_2023-01.parquet'\n",
"output_dir_path = 'hdfs://spark-master:9000/user/spark_user/W5M1_output/'\n",
"ext = 'parquet'\n",
"name = \"TLC-2023-01\"\n",
"\n",
"def load_data_rdd(spark_session, file_path, extension, name):\n",
" if extension == \"csv\":\n",
" data_rdd = spark_session.read.csv(file_path).rdd\n",
" elif extension == \"parquet\":\n",
" data_rdd = spark_session.read.parquet(file_path).rdd\n",
" else:\n",
" raise NotImpelentedError\n",
" data_rdd.setName(name)\n",
" return data_rdd\n",
"\n",
"data_rdd = load_data_rdd(spark, input_file_path, ext, name)\n",
"data_rdd.take(1)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 클리닝"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def remove_row_w_none_val(row):\n",
" for val in row:\n",
" if val is None:\n",
" return\n",
" return row\n",
"\n",
"print(\"Before data cleaning: \", data_rdd.count())\n",
"data_rdd = data_rdd.filter(lambda row: remove_row_w_none_val(row))\n",
"print(\"After data cleaning: \", data_rdd.count())\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 변환 로직"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def remove_non_positive_fare(row):\n",
" if row.base_passenger_fare > 0:\n",
" return row\n",
" else:\n",
" return\n",
"\n",
"print(\"Before removing zero or negative fare: \", data_rdd.count())\n",
"data_rdd = data_rdd.filter(lambda row: remove_non_positive_fare(row))\n",
"print(\"After removing zero or negative fare: \", data_rdd.count())\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 매핑 및 변환"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def extract_and_convert_relevant_columns(row):\n",
" return Row(pickup_datetime=row.pickup_datetime.date(), trip_miles=row.trip_miles, base_passenger_fare=row.base_passenger_fare)\n",
"\n",
"data_rdd = data_rdd.map(lambda row: extract_and_convert_relevant_columns(row))\n",
"data_rdd.take(1)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 집계"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"total_number_of_trips = data_rdd.count()\n",
"print(f\"total_number_of_trips: {total_number_of_trips} miles\")\n",
"\n",
"total_revenue = data_rdd.map(lambda row: row.base_passenger_fare).reduce(add)\n",
"print(f\"total_revenue: {round(total_revenue, 2)}$\")\n",
"\n",
"average_trip_distance = data_rdd.map(lambda row: row.trip_miles).mean()\n",
"print(f\"average_trip_distance: round(average_trip_distance, 2) miles\")\n",
"\n",
"number_of_trips_per_day = data_rdd.map(lambda row: (row.pickup_datetime, 1)).reduceByKey(add).sortByKey(lambda row: row.pickup_datetime)\n",
"number_of_trips_per_day.take(20)\n",
"\n",
"total_revenue_per_day = data_rdd.map(lambda row: (row.pickup_datetime, row.base_passenger_fare)).reduceByKey(add).sortByKey(lambda row: row.pickup_datetime)\n",
"total_revenue_per_day.take(20)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 데이터 출력"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Save the output as text\n",
"result = spark.sparkContext.parallelize([\n",
" f\"total_number_of_trips, {total_number_of_trips}\",\n",
" f\"total_revenue, {total_revenue}\",\n",
" f\"average_trip_distance, {average_trip_distance}\",\n",
"])\n",
"result.coalesce(1).saveAsTextFile(output_dir_path + \"result.txt\")\n",
"\n",
"# Save the output as pickle object\n",
"number_of_trips_per_day.coalesce(1).saveAsPickleFile(output_dir_path + \"number_of_trips_per_day\")\n",
"total_revenue_per_day.coalesce(1).saveAsPickleFile(output_dir_path + \"total_revenue_per_day\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
78 changes: 78 additions & 0 deletions w5/m1/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
services:
spark-master:
container_name: spark-master
hostname: spark-master
build: .
image: spark-standalone-cluster
entrypoint: ['./entrypoint.sh', 'master']
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- W4M2:/home/spark_user/code
- namenode:/home/hduser/data
ports:
- '8080:8080'
- '7077:7077'
- '8888:8888'
- "9870:9870"
- "8088:8088"
networks:
- spark

spark-history-server:
container_name: spark-history
hostname: spark-history-server
build: .
entrypoint: ['./entrypoint.sh', 'history']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode0:/home/hduser/data
ports:
- '18080:18080'
networks:
- spark

spark-worker1:
container_name: spark-worker1
hostname: spark-worker1
build: .
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode1:/home/hduser/data

ports:
- '11111:8081'
networks:
- spark

spark-worker2:
container_name: spark-worker2
hostname: spark-worker2
build: .
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-master
volumes:
- spark-logs:/home/spark_user/spark/spark-events
- datanode2:/home/hduser/data

ports:
- '22222:8081'
networks:
- spark

volumes:
spark-logs:
W4M2:
namenode:
datanode0:
datanode1:
datanode2:

networks:
spark:
driver: bridge