Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
convert TotalEventsDailyTask task to spark #486
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Jan 24, 2018
1 parent 4d9d9ae commit 91ff398
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
19 changes: 19 additions & 0 deletions edx/analytics/tasks/common/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,22 @@ def get_map_input_file(self):
log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path')
self.incr_counter('Event', 'Missing map_input_file', 1)
return ''


class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin):
"""
Extract events corresponding to a specified time interval.
"""
path_targets = None

def __init__(self, *args, **kwargs):
super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs)
self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member
self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member
path_targets = PathSelectionByDateIntervalTask(
source=self.source,
interval=self.interval,
pattern=self.pattern,
date_pattern=self.date_pattern,
).output()
self.path_targets = [task.path for task in path_targets]
58 changes: 57 additions & 1 deletion edx/analytics/tasks/monitor/overall_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import luigi

from edx.analytics.tasks.common.mapreduce import MapReduceJobTask
from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin
from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin, EventLogSelectionMixinSpark
from edx.analytics.tasks.util.url import get_target_from_url
from luigi.contrib.spark import PySparkTask

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -34,3 +35,58 @@ def reducer(self, key, values):

def output(self):
return get_target_from_url(self.output_root)


class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, PySparkTask):
"""Produce a dataset for total events within a given time period."""

driver_memory = '2g'
executor_memory = '3g'

output_root = luigi.Parameter()

def __init__(self, *args, **kwargs):
super(SparkTotalEventsDailyTask, self).__init__(*args, **kwargs)

# TODO: rename this method to output after testing is complete
def output_dir(self):
return get_target_from_url(self.output_root)

def main(self, sc, *args):
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, udf, struct, date_format
spark = SparkSession.builder.getOrCreate()
event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True)
module_schema = StructType().add("display_name", StringType(), True) \
.add("original_usage_key", StringType(), True) \
.add("original_usage_version", StringType(), True) \
.add("usage_key", StringType(), True)
context_schema = StructType().add("command", StringType(), True) \
.add("course_id", StringType(), True) \
.add("module", module_schema) \
.add("org_id", StringType(), True) \
.add("path", StringType(), True) \
.add("user_id", StringType(), True)

event_log_schema = StructType() \
.add("username", StringType(), True) \
.add("event_type", StringType(), True) \
.add("ip", StringType(), True) \
.add("agent", StringType(), True) \
.add("host", StringType(), True) \
.add("referer", StringType(), True) \
.add("accept_language", StringType(), True) \
.add("event", event_schema) \
.add("event_source", StringType(), True) \
.add("context", context_schema) \
.add("time", StringType(), True) \
.add("name", StringType(), True) \
.add("page", StringType(), True) \
.add("session", StringType(), True)

df = spark.read.format('json').load(self.path_targets, schema=event_log_schema)
df = df.withColumn('event_date', date_format(to_date(df['time']), 'yyyy-MM-dd'))
df = df.filter(df['event_date'] == self.lower_bound_date_string).groupBy('event_date').count()
df.repartition(1).write.csv(self.output_dir().path, mode='overwrite', sep='\t')
# df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path)

0 comments on commit 91ff398

Please sign in to comment.