diff --git a/edx/analytics/tasks/common/pathutil.py b/edx/analytics/tasks/common/pathutil.py index ae56dc01b3..ee6cacd6eb 100644 --- a/edx/analytics/tasks/common/pathutil.py +++ b/edx/analytics/tasks/common/pathutil.py @@ -309,22 +309,3 @@ def get_map_input_file(self): log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path') self.incr_counter('Event', 'Missing map_input_file', 1) return '' - - -class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): - """ - Extract events corresponding to a specified time interval. - """ - path_targets = None - - def __init__(self, *args, **kwargs): - super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) - self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member - self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member - path_targets = PathSelectionByDateIntervalTask( - source=self.source, - interval=self.interval, - pattern=self.pattern, - date_pattern=self.date_pattern, - ).output() - self.path_targets = [task.path for task in path_targets] diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py new file mode 100644 index 0000000000..ddf87cb540 --- /dev/null +++ b/edx/analytics/tasks/common/spark.py @@ -0,0 +1,276 @@ +import json +import os +import tempfile +import zipfile + +import luigi.configuration +from luigi.contrib.spark import PySparkTask + +from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin + +_file_path_to_package_meta_path = {} + + +def get_package_metadata_paths(): + """ + List of package metadata to be loaded on EMR cluster + """ + from distlib.database import DistributionPath + + if len(_file_path_to_package_meta_path) > 0: + return _file_path_to_package_meta_path + + dist_path = DistributionPath(include_egg=True) + for distribution in dist_path.get_distributions(): + metadata_path = distribution.path + for installed_file_path, _hash, _size in distribution.list_installed_files(): + absolute_installed_file_path = installed_file_path + if not os.path.isabs(installed_file_path): + absolute_installed_file_path = os.path.join(os.path.dirname(metadata_path), installed_file_path) + normalized_file_path = os.path.realpath(absolute_installed_file_path) + _file_path_to_package_meta_path[normalized_file_path] = metadata_path + + return _file_path_to_package_meta_path + + +def dereference(f): + if os.path.islink(f): + # by joining with the dirname we are certain to get the absolute path + return dereference(os.path.join(os.path.dirname(f), os.readlink(f))) + else: + return f + + +def create_packages_archive(packages, archive_dir_path): + """ + Create a zip archive for all the packages listed in packages and returns the list of zip file location. + """ + import zipfile + archives_list = [] + package_metadata_paths = get_package_metadata_paths() + metadata_to_add = dict() + + package_zip_path = os.path.join(archive_dir_path, 'packages.zip') + package_zip = zipfile.ZipFile(package_zip_path, "w", compression=zipfile.ZIP_DEFLATED) + archives_list.append(package_zip_path) + + def add(src, dst, package_name): + # Ensure any entry points and other egg-info metadata is also transmitted along with + # this file. If it is associated with any egg-info directories, ship them too. + metadata_path = package_metadata_paths.get(os.path.realpath(src)) + if metadata_path: + metadata_to_add[package_name] = metadata_path + + package_zip.write(src, dst) + + def add_files_for_package(sub_package_path, root_package_path, root_package_name, package_name): + for root, dirs, files in os.walk(sub_package_path): + if '.svn' in dirs: + dirs.remove('.svn') + for f in files: + if not f.endswith(".pyc") and not f.startswith("."): + add(dereference(root + "/" + f), + root.replace(root_package_path, root_package_name) + "/" + f, + package_name) + + for package in packages: + # Archive each package + if not getattr(package, "__path__", None) and '.' in package.__name__: + package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty') + + n = package.__name__.replace(".", "/") + + # Check length of path, because the attribute may exist and be an empty list. + if len(getattr(package, "__path__", [])) > 0: + # TODO: (BUG) picking only the first path does not + # properly deal with namespaced packages in different + # directories + p = package.__path__[0] + + if p.endswith('.egg') and os.path.isfile(p): + raise 'Not going to archive egg files!!!' + # Add the entire egg file + # p = p[:p.find('.egg') + 4] + # add(dereference(p), os.path.basename(p)) + + else: + # include __init__ files from parent projects + root = [] + for parent in package.__name__.split('.')[0:-1]: + root.append(parent) + module_name = '.'.join(root) + directory = '/'.join(root) + + add(dereference(__import__(module_name, None, None, 'non_empty').__path__[0] + "/__init__.py"), + directory + "/__init__.py", + package.__name__) + + add_files_for_package(p, p, n, package.__name__) + + else: + f = package.__file__ + if f.endswith("pyc"): + f = f[:-3] + "py" + if n.find(".") == -1: + add(dereference(f), os.path.basename(f), package.__name__) + else: + add(dereference(f), n + ".py", package.__name__) + + # include metadata in the same zip file + metadata_path = metadata_to_add.get(package.__name__) + if metadata_path is not None: + add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__) + + return archives_list + + +class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin): + """ + Extract events corresponding to a specified time interval. + """ + path_targets = None + + def __init__(self, *args, **kwargs): + """ + Call path selection task to get list of log files matching the pattern + """ + super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs) + self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member + self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member + path_targets = PathSelectionByDateIntervalTask( + source=self.source, + interval=self.interval, + pattern=self.pattern, + date_pattern=self.date_pattern, + ).output() + self.path_targets = [task.path for task in path_targets] + + def get_log_schema(self): + """ + Get spark based schema for processing event logs + :return: Spark schema + """ + from pyspark.sql.types import StructType, StringType + event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) + module_schema = StructType().add("display_name", StringType(), True) \ + .add("original_usage_key", StringType(), True) \ + .add("original_usage_version", StringType(), True) \ + .add("usage_key", StringType(), True) + context_schema = StructType().add("command", StringType(), True) \ + .add("course_id", StringType(), True) \ + .add("module", module_schema) \ + .add("org_id", StringType(), True) \ + .add("path", StringType(), True) \ + .add("user_id", StringType(), True) + + event_log_schema = StructType() \ + .add("username", StringType(), True) \ + .add("event_type", StringType(), True) \ + .add("ip", StringType(), True) \ + .add("agent", StringType(), True) \ + .add("host", StringType(), True) \ + .add("referer", StringType(), True) \ + .add("accept_language", StringType(), True) \ + .add("event", event_schema) \ + .add("event_source", StringType(), True) \ + .add("context", context_schema) \ + .add("time", StringType(), True) \ + .add("name", StringType(), True) \ + .add("page", StringType(), True) \ + .add("session", StringType(), True) + + return event_log_schema + + def get_event_log_dataframe(self, spark, *args, **kwargs): + from pyspark.sql.functions import to_date, udf, struct, date_format + dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema()) + dataframe = dataframe.filter(dataframe['time'].isNotNull()) \ + .withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd')) + dataframe = dataframe.filter(dataframe['event_date'] == self.lower_bound_date_string) + return dataframe + + +class SparkJobTask(OverwriteOutputMixin, PySparkTask): + """ + Wrapper for spark task + """ + + _spark = None + _spark_context = None + _sql_context = None + _hive_context = None + _tmp_dir = None + + driver_memory = '2g' + executor_memory = '3g' + always_log_stderr = False # log stderr if spark fails, True for verbose log + + def init_spark(self, sc): + """ + Initialize spark, sql and hive context + :param sc: Spark context + """ + from pyspark.sql import SparkSession, SQLContext, HiveContext + self._sql_context = SQLContext(sc) + self._spark_context = sc + self._spark = SparkSession.builder.getOrCreate() + self._hive_context = HiveContext(sc) + + def spark_job(self): + """ + Spark code for the job + """ + raise NotImplementedError + + def _load_internal_dependency_on_cluster(self): + """ + creates a zip of package and loads it on spark worker nodes + + Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files + """ + + # import packages to be loaded on cluster + import edx + import luigi + import opaque_keys + import stevedore + import bson + import ccx_keys + import cjson + import boto + import filechunkio + import ciso8601 + import chardet + import urllib3 + import certifi + import idna + import requests + + dependencies_list = [] + egg_files = luigi.configuration.get_config().get('spark', 'edx_egg_files', None) + if isinstance(egg_files, basestring): + dependencies_list = json.loads(egg_files) + packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet, + urllib3, certifi, idna, requests] + self._tmp_dir = tempfile.mkdtemp() + dependencies_list += create_packages_archive(packages, self._tmp_dir) + # dependencies_list.append('s3://edx-analytics-scratch/egg_files/edx_opaque_keys-0.4-py2.7.egg') + if len(dependencies_list) > 0: + for file in dependencies_list: + self._spark_context.addPyFile(file) + + def run(self): + self.remove_output_on_overwrite() + super(SparkJobTask, self).run() + + def _clean(self): + """Do any cleanup after job here""" + import shutil + shutil.rmtree(self._tmp_dir) + + def main(self, sc, *args): + self.init_spark(sc) + self._load_internal_dependency_on_cluster() # load packages on EMR cluster for spark worker nodes + self.spark_job() + self._clean() diff --git a/edx/analytics/tasks/insights/tests/test_user_activity.py b/edx/analytics/tasks/insights/tests/test_user_activity.py index fe431543f7..b1a08d3228 100644 --- a/edx/analytics/tasks/insights/tests/test_user_activity.py +++ b/edx/analytics/tasks/insights/tests/test_user_activity.py @@ -1,6 +1,5 @@ """ Tests for tasks that collect enrollment events. - """ import datetime import json @@ -12,8 +11,9 @@ from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin from edx.analytics.tasks.insights.user_activity import ( - ACTIVE_LABEL, PLAY_VIDEO_LABEL, POST_FORUM_LABEL, PROBLEM_LABEL, InsertToMysqlCourseActivityTask, UserActivityTask + InsertToMysqlCourseActivityTask, UserActivityTask ) +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin @@ -91,21 +91,21 @@ def test_whitespace_username(self): def test_good_dummy_event(self): line = self.create_event_log_line() event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_play_video_event(self): line = self.create_event_log_line(event_source='browser', event_type='play_video') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL))) self.assertEquals(event, expected) def test_problem_event(self): line = self.create_event_log_line(event_source='server', event_type='problem_check') event = tuple(self.task.mapper(line)) - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PROBLEM_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PROBLEM_LABEL))) self.assertEquals(event, expected) @data(('edx.forum.thread.created', True), ('edx.forum.response.created', True), ('edx.forum.comment.created', True), @@ -115,11 +115,11 @@ def test_post_forum_event(self, event_type, is_labeled_forum): line = self.create_event_log_line(event_source='server', event_type=event_type) event = tuple(self.task.mapper(line)) if is_labeled_forum: - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, POST_FORUM_LABEL))) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.POST_FORUM_LABEL))) else: # The voted event is not a "discussion activity" and thus does not get the POST_FORUM_LABEL - expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)),) + expected = ((self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)),) self.assertEquals(event, expected) def test_exclusion_of_events_by_source(self): @@ -154,13 +154,13 @@ def test_multiple(self): outputs.append(output) expected = ( - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, ACTIVE_LABEL)), - (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PLAY_VIDEO_LABEL)), - ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', ACTIVE_LABEL)), - ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PROBLEM_LABEL)), - ('2013-12-16', (self.encoded_course_id, self.username, '2013-12-16', ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.ACTIVE_LABEL)), + (self.expected_date_string, (self.encoded_course_id, self.username, self.expected_date_string, PredicateLabels.PLAY_VIDEO_LABEL)), + ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PredicateLabels.ACTIVE_LABEL)), + ('2013-12-24', (self.encoded_course_id, self.username, '2013-12-24', PredicateLabels.PROBLEM_LABEL)), + ('2013-12-16', (self.encoded_course_id, self.username, '2013-12-16', PredicateLabels.ACTIVE_LABEL)), ) self.assertItemsEqual(outputs, expected) @@ -183,10 +183,10 @@ def setUp(self): def test_multiple(self): values = ( - (self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL), - (self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), + (self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL), ) mock_output_file = Mock() @@ -194,9 +194,9 @@ def test_multiple(self): self.task.multi_output_reducer('2013-12-01', values, mock_output_file) self.assertEquals(len(mock_output_file.write.mock_calls), 4) - expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', ACTIVE_LABEL, '2')) + expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.ACTIVE_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) - expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PLAY_VIDEO_LABEL, '2')) + expected_string = '\t'.join((self.encoded_course_id, self.username, '2013-12-01', PredicateLabels.PLAY_VIDEO_LABEL, '2')) self.assertIn(call(expected_string), mock_output_file.write.mock_calls) diff --git a/edx/analytics/tasks/insights/user_activity.py b/edx/analytics/tasks/insights/user_activity.py index 67f55eb8cf..7272190c29 100644 --- a/edx/analytics/tasks/insights/user_activity.py +++ b/edx/analytics/tasks/insights/user_activity.py @@ -8,10 +8,13 @@ import luigi.date_interval import edx.analytics.tasks.util.eventlog as eventlog +import edx.analytics.tasks.util.opaque_key_util as opaque_key_util from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin, MultiOutputMapReduceJobTask from edx.analytics.tasks.common.mysql_load import MysqlInsertTask from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.insights.calendar_task import CalendarTableTask +from edx.analytics.tasks.util.constants import PredicateLabels from edx.analytics.tasks.util.decorators import workflow_entry_point from edx.analytics.tasks.util.hive import BareHiveTableTask, HivePartitionTask, WarehouseMixin, hive_database_name from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin @@ -20,24 +23,17 @@ log = logging.getLogger(__name__) -ACTIVE_LABEL = "ACTIVE" -PROBLEM_LABEL = "ATTEMPTED_PROBLEM" -PLAY_VIDEO_LABEL = "PLAYED_VIDEO" -POST_FORUM_LABEL = "POSTED_FORUM" - +logging.getLogger('boto').setLevel(logging.INFO) class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMixin, MultiOutputMapReduceJobTask): """ Categorize activity of users. - Analyze the history of user actions and categorize their activity. Note that categories are not mutually exclusive. A single event may belong to multiple categories. For example, we define a generic "ACTIVE" category that refers to any event that has a course_id associated with it, but is not an enrollment event. Other events, such as a video play event, will also belong to other categories. - The output from this job is a table that represents the number of events seen for each user in each course in each category on each day. - """ output_root = None @@ -73,28 +69,26 @@ def get_predicate_labels(self, event): if event_type.startswith('edx.course.enrollment.'): return [] - labels = [ACTIVE_LABEL] + labels = [PredicateLabels.ACTIVE_LABEL] if event_source == 'server': if event_type == 'problem_check': - labels.append(PROBLEM_LABEL) + labels.append(PredicateLabels.PROBLEM_LABEL) if event_type.startswith('edx.forum.') and event_type.endswith('.created'): - labels.append(POST_FORUM_LABEL) + labels.append(PredicateLabels.POST_FORUM_LABEL) if event_source in ('browser', 'mobile'): if event_type == 'play_video': - labels.append(PLAY_VIDEO_LABEL) + labels.append(PredicateLabels.PLAY_VIDEO_LABEL) return labels def _encode_tuple(self, values): """ Convert values into a tuple containing encoded strings. - Parameters: Values is a list or tuple. - This enforces a standard encoding for the parts of the key. Without this a part of the key might appear differently in the key string when it is coerced to a string by luigi. For @@ -144,6 +138,40 @@ def run(self): return super(UserActivityTask, self).run() +class UserActivityTaskSpark(EventLogSelectionMixinSpark, WarehouseMixin, SparkJobTask): + """ + UserActivityTask converted to spark + """ + + output_root = luigi.Parameter() + + def output(self): + return get_target_from_url(self.output_root) + + def spark_job(self): + from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id + from pyspark.sql.functions import udf, struct, split, explode, lit + from pyspark.sql.types import ArrayType, StringType + df = self.get_event_log_dataframe(self._spark) + # register udfs + get_labels = udf(get_event_predicate_labels, StringType()) + get_courseid = udf(get_course_id, StringType()) + df = df.filter( + (df['event_source'] != 'task') & + ~ df['event_type'].startswith('edx.course.enrollment.') & + (df['username'] != '') + ) + # passing complete row to UDF + df = df.withColumn('all_labels', get_labels(df['event_type'], df['event_source'])) \ + .withColumn('course_id', get_courseid(df['context'])) + df = df.filter(df['course_id'] != '') # remove rows with empty course_id + df = df.withColumn('label', explode(split(df['all_labels'], ','))) + result = df.select('course_id', 'username', 'event_date', 'label') \ + .groupBy('course_id', 'username', 'event_date', 'label').count() + result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning + result.repartition(1).write.partitionBy('dt').csv(self.output().path, mode='overwrite', sep='\t') + + class UserActivityDownstreamMixin(WarehouseMixin, EventLogSelectionDownstreamMixin, MapReduceJobTaskMixin): """All parameters needed to run the UserActivityTableTask task.""" @@ -225,10 +253,8 @@ def columns(self): class CourseActivityPartitionTask(WeeklyIntervalMixin, UserActivityDownstreamMixin, HivePartitionTask): """ Number of users performing each category of activity each ISO week. - All references to weeks in here refer to ISO weeks. Note that ISO weeks may belong to different ISO years than the Gregorian calendar year. - If, for example, you wanted to analyze all data in the past week, you could run the job on Monday and pass in 1 to the "weeks" parameter. This will not analyze data for the week that contains the current day (since it is not complete). It will only compute data for the previous week. diff --git a/edx/analytics/tasks/launchers/remote.py b/edx/analytics/tasks/launchers/remote.py index f04109452b..672b4007e6 100755 --- a/edx/analytics/tasks/launchers/remote.py +++ b/edx/analytics/tasks/launchers/remote.py @@ -119,7 +119,7 @@ def run_task_playbook(inventory, arguments, uid): env_var_string = ' '.join('{0}={1}'.format(k, v) for k, v in env_vars.iteritems()) - command = 'cd {code_dir} && . $HOME/.bashrc && {env_vars}{bg}{data_dir}/venv/bin/launch-task {task_arguments}{end_bg}'.format( + command = 'cd {code_dir} && . $HOME/.bashrc && . {data_dir}/venv/bin/activate && {env_vars}{bg}launch-task {task_arguments}{end_bg}'.format( env_vars=env_var_string + ' ' if env_var_string else '', data_dir=data_dir, code_dir=code_dir, diff --git a/edx/analytics/tasks/monitor/overall_events.py b/edx/analytics/tasks/monitor/overall_events.py index 5bb5c3eb9d..8a6d5f508b 100644 --- a/edx/analytics/tasks/monitor/overall_events.py +++ b/edx/analytics/tasks/monitor/overall_events.py @@ -5,9 +5,9 @@ import luigi from edx.analytics.tasks.common.mapreduce import MapReduceJobTask -from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin, EventLogSelectionMixinSpark +from edx.analytics.tasks.common.pathutil import EventLogSelectionMixin +from edx.analytics.tasks.common.spark import EventLogSelectionMixinSpark, SparkJobTask from edx.analytics.tasks.util.url import get_target_from_url -from luigi.contrib.spark import PySparkTask log = logging.getLogger(__name__) @@ -37,56 +37,16 @@ def output(self): return get_target_from_url(self.output_root) -class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, PySparkTask): +class SparkTotalEventsDailyTask(EventLogSelectionMixinSpark, SparkJobTask): """Produce a dataset for total events within a given time period.""" - driver_memory = '2g' - executor_memory = '3g' - output_root = luigi.Parameter() - def __init__(self, *args, **kwargs): - super(SparkTotalEventsDailyTask, self).__init__(*args, **kwargs) - - # TODO: rename this method to output after testing is complete - def output_dir(self): + def output(self): return get_target_from_url(self.output_root) - def main(self, sc, *args): - from pyspark.sql import SparkSession - from pyspark.sql.types import * - from pyspark.sql.functions import to_date, udf, struct, date_format - spark = SparkSession.builder.getOrCreate() - event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True) - module_schema = StructType().add("display_name", StringType(), True) \ - .add("original_usage_key", StringType(), True) \ - .add("original_usage_version", StringType(), True) \ - .add("usage_key", StringType(), True) - context_schema = StructType().add("command", StringType(), True) \ - .add("course_id", StringType(), True) \ - .add("module", module_schema) \ - .add("org_id", StringType(), True) \ - .add("path", StringType(), True) \ - .add("user_id", StringType(), True) - - event_log_schema = StructType() \ - .add("username", StringType(), True) \ - .add("event_type", StringType(), True) \ - .add("ip", StringType(), True) \ - .add("agent", StringType(), True) \ - .add("host", StringType(), True) \ - .add("referer", StringType(), True) \ - .add("accept_language", StringType(), True) \ - .add("event", event_schema) \ - .add("event_source", StringType(), True) \ - .add("context", context_schema) \ - .add("time", StringType(), True) \ - .add("name", StringType(), True) \ - .add("page", StringType(), True) \ - .add("session", StringType(), True) - - df = spark.read.format('json').load(self.path_targets, schema=event_log_schema) - df = df.withColumn('event_date', date_format(to_date(df['time']), 'yyyy-MM-dd')) - df = df.filter(df['event_date'] == self.lower_bound_date_string).groupBy('event_date').count() - df.repartition(1).write.csv(self.output_dir().path, mode='overwrite', sep='\t') - # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) + def spark_job(self): + df = self.get_event_log_dataframe(self._spark) + df = df.groupBy('event_date').count() + df.repartition(1).write.csv(self.output().path, mode='overwrite', sep='\t') + # df.repartition(1).rdd.map(lambda row: '\t'.join(map(str, row))).saveAsTextFile(self.output_dir().path) \ No newline at end of file diff --git a/edx/analytics/tasks/monitor/total_events_report.py b/edx/analytics/tasks/monitor/total_events_report.py index d4c50fd43f..4e46875e61 100644 --- a/edx/analytics/tasks/monitor/total_events_report.py +++ b/edx/analytics/tasks/monitor/total_events_report.py @@ -7,8 +7,10 @@ from edx.analytics.tasks.common.mapreduce import MapReduceJobTaskMixin from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin -from edx.analytics.tasks.monitor.overall_events import TotalEventsDailyTask +from edx.analytics.tasks.monitor.overall_events import TotalEventsDailyTask, SparkTotalEventsDailyTask from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin + log = logging.getLogger(__name__) @@ -66,19 +68,20 @@ def run(self): output_file.write('\n') -class TotalEventsReportWorkflow(MapReduceJobTaskMixin, TotalEventsReport, EventLogSelectionDownstreamMixin): +class TotalEventsReportWorkflow( + MapReduceJobTaskMixin, + OverwriteOutputMixin, + TotalEventsReport, + EventLogSelectionDownstreamMixin): """ Generates report for an event count by date for all events. - """ def requires(self): - return TotalEventsDailyTask( - mapreduce_engine=self.mapreduce_engine, - lib_jar=self.lib_jar, - n_reduce_tasks=self.n_reduce_tasks, + return SparkTotalEventsDailyTask( source=self.source, output_root=self.counts, + overwrite=self.overwrite, pattern=self.pattern, interval=self.interval ) diff --git a/edx/analytics/tasks/util/constants.py b/edx/analytics/tasks/util/constants.py new file mode 100644 index 0000000000..1c7e8bf1b5 --- /dev/null +++ b/edx/analytics/tasks/util/constants.py @@ -0,0 +1,7 @@ +class PredicateLabels(object): + """Constants for predicate labels.""" + + ACTIVE_LABEL = "ACTIVE" + PROBLEM_LABEL = "ATTEMPTED_PROBLEM" + PLAY_VIDEO_LABEL = "PLAYED_VIDEO" + POST_FORUM_LABEL = "POSTED_FORUM" diff --git a/edx/analytics/tasks/util/spark_util.py b/edx/analytics/tasks/util/spark_util.py new file mode 100644 index 0000000000..b1f46dfb14 --- /dev/null +++ b/edx/analytics/tasks/util/spark_util.py @@ -0,0 +1,78 @@ +"""Support for spark tasks""" +import edx.analytics.tasks.util.opaque_key_util as opaque_key_util +from edx.analytics.tasks.util.constants import PredicateLabels + + +def get_event_predicate_labels(event_type, event_source): + """ + Creates labels by applying hardcoded predicates to a single event. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + # We only want the explicit event, not the implicit form. + # return 'test' + + labels = PredicateLabels.ACTIVE_LABEL + + # task & enrollment events are filtered out by spark later as it speeds up due to less # of records + + if event_source == 'server': + if event_type == 'problem_check': + labels += ',' + PredicateLabels.PROBLEM_LABEL + + if event_type.startswith('edx.forum.') and event_type.endswith('.created'): + labels += ',' + PredicateLabels.POST_FORUM_LABEL + + if event_source in ('browser', 'mobile'): + if event_type == 'play_video': + labels += ',' + PredicateLabels.PLAY_VIDEO_LABEL + + return labels + + +def get_key_value_from_event(event, key, default_value=None): + """ + Get value from event dict by key + Pyspark does not support dict.get() method, so this approach seems reasonable + """ + try: + default_value = event[key] + except KeyError: + pass + return default_value + + +def get_course_id(event_context, from_url=False): + """ + Gets course_id from event's data. + Don't pass whole event row to any spark UDF as it generates a different output than expected + """ + if event_context == '' or event_context is None: + # Assume it's old, and not worth logging... + return '' + + # Get the course_id from the data, and validate. + course_id = opaque_key_util.normalize_course_id(get_key_value_from_event(event_context, 'course_id', '')) + if course_id: + if opaque_key_util.is_valid_course_id(course_id): + return course_id + + return '' + + # TODO : make it work with url as well + # Try to get the course_id from the URLs in `event_type` (for implicit + # server events) and `page` (for browser events). + # if from_url: + # source = get_key_value_from_event(event, 'event_source') + # + # if source == 'server': + # url = get_key_value_from_event(event, 'event_type', '') + # elif source == 'browser': + # url = get_key_value_from_event(event, 'page', '') + # else: + # url = '' + # + # course_key = opaque_key_util.get_course_key_from_url(url) + # if course_key: + # return unicode(course_key) + # + # return ''