Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
convert user activity task to spark
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Jan 31, 2018
1 parent 91ff398 commit b89c71d
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 115 deletions.
19 changes: 0 additions & 19 deletions edx/analytics/tasks/common/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,3 @@ def get_map_input_file(self):
log.warn('mapreduce_map_input_file not defined in os.environ, unable to determine input file path')
self.incr_counter('Event', 'Missing map_input_file', 1)
return ''


class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin):
"""
Extract events corresponding to a specified time interval.
"""
path_targets = None

def __init__(self, *args, **kwargs):
super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs)
self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member
self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member
path_targets = PathSelectionByDateIntervalTask(
source=self.source,
interval=self.interval,
pattern=self.pattern,
date_pattern=self.date_pattern,
).output()
self.path_targets = [task.path for task in path_targets]
276 changes: 276 additions & 0 deletions edx/analytics/tasks/common/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import json
import os
import tempfile
import zipfile

import luigi.configuration
from luigi.contrib.spark import PySparkTask

from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, PathSelectionByDateIntervalTask
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin

_file_path_to_package_meta_path = {}


def get_package_metadata_paths():
"""
List of package metadata to be loaded on EMR cluster
"""
from distlib.database import DistributionPath

if len(_file_path_to_package_meta_path) > 0:
return _file_path_to_package_meta_path

dist_path = DistributionPath(include_egg=True)
for distribution in dist_path.get_distributions():
metadata_path = distribution.path
for installed_file_path, _hash, _size in distribution.list_installed_files():
absolute_installed_file_path = installed_file_path
if not os.path.isabs(installed_file_path):
absolute_installed_file_path = os.path.join(os.path.dirname(metadata_path), installed_file_path)
normalized_file_path = os.path.realpath(absolute_installed_file_path)
_file_path_to_package_meta_path[normalized_file_path] = metadata_path

return _file_path_to_package_meta_path


def dereference(f):
if os.path.islink(f):
# by joining with the dirname we are certain to get the absolute path
return dereference(os.path.join(os.path.dirname(f), os.readlink(f)))
else:
return f


def create_packages_archive(packages, archive_dir_path):
"""
Create a zip archive for all the packages listed in packages and returns the list of zip file location.
"""
import zipfile
archives_list = []
package_metadata_paths = get_package_metadata_paths()
metadata_to_add = dict()

package_zip_path = os.path.join(archive_dir_path, 'packages.zip')
package_zip = zipfile.ZipFile(package_zip_path, "w", compression=zipfile.ZIP_DEFLATED)
archives_list.append(package_zip_path)

def add(src, dst, package_name):
# Ensure any entry points and other egg-info metadata is also transmitted along with
# this file. If it is associated with any egg-info directories, ship them too.
metadata_path = package_metadata_paths.get(os.path.realpath(src))
if metadata_path:
metadata_to_add[package_name] = metadata_path

package_zip.write(src, dst)

def add_files_for_package(sub_package_path, root_package_path, root_package_name, package_name):
for root, dirs, files in os.walk(sub_package_path):
if '.svn' in dirs:
dirs.remove('.svn')
for f in files:
if not f.endswith(".pyc") and not f.startswith("."):
add(dereference(root + "/" + f),
root.replace(root_package_path, root_package_name) + "/" + f,
package_name)

for package in packages:
# Archive each package
if not getattr(package, "__path__", None) and '.' in package.__name__:
package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty')

n = package.__name__.replace(".", "/")

# Check length of path, because the attribute may exist and be an empty list.
if len(getattr(package, "__path__", [])) > 0:
# TODO: (BUG) picking only the first path does not
# properly deal with namespaced packages in different
# directories
p = package.__path__[0]

if p.endswith('.egg') and os.path.isfile(p):
raise 'Not going to archive egg files!!!'
# Add the entire egg file
# p = p[:p.find('.egg') + 4]
# add(dereference(p), os.path.basename(p))

else:
# include __init__ files from parent projects
root = []
for parent in package.__name__.split('.')[0:-1]:
root.append(parent)
module_name = '.'.join(root)
directory = '/'.join(root)

add(dereference(__import__(module_name, None, None, 'non_empty').__path__[0] + "/__init__.py"),
directory + "/__init__.py",
package.__name__)

add_files_for_package(p, p, n, package.__name__)

else:
f = package.__file__
if f.endswith("pyc"):
f = f[:-3] + "py"
if n.find(".") == -1:
add(dereference(f), os.path.basename(f), package.__name__)
else:
add(dereference(f), n + ".py", package.__name__)

# include metadata in the same zip file
metadata_path = metadata_to_add.get(package.__name__)
if metadata_path is not None:
add_files_for_package(metadata_path, metadata_path, os.path.basename(metadata_path), package.__name__)

return archives_list


class EventLogSelectionMixinSpark(EventLogSelectionDownstreamMixin):
"""
Extract events corresponding to a specified time interval.
"""
path_targets = None

def __init__(self, *args, **kwargs):
"""
Call path selection task to get list of log files matching the pattern
"""
super(EventLogSelectionDownstreamMixin, self).__init__(*args, **kwargs)
self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d') # pylint: disable=no-member
self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d') # pylint: disable=no-member
path_targets = PathSelectionByDateIntervalTask(
source=self.source,
interval=self.interval,
pattern=self.pattern,
date_pattern=self.date_pattern,
).output()
self.path_targets = [task.path for task in path_targets]

def get_log_schema(self):
"""
Get spark based schema for processing event logs
:return: Spark schema
"""
from pyspark.sql.types import StructType, StringType
event_schema = StructType().add("POST", StringType(), True).add("GET", StringType(), True)
module_schema = StructType().add("display_name", StringType(), True) \
.add("original_usage_key", StringType(), True) \
.add("original_usage_version", StringType(), True) \
.add("usage_key", StringType(), True)
context_schema = StructType().add("command", StringType(), True) \
.add("course_id", StringType(), True) \
.add("module", module_schema) \
.add("org_id", StringType(), True) \
.add("path", StringType(), True) \
.add("user_id", StringType(), True)

event_log_schema = StructType() \
.add("username", StringType(), True) \
.add("event_type", StringType(), True) \
.add("ip", StringType(), True) \
.add("agent", StringType(), True) \
.add("host", StringType(), True) \
.add("referer", StringType(), True) \
.add("accept_language", StringType(), True) \
.add("event", event_schema) \
.add("event_source", StringType(), True) \
.add("context", context_schema) \
.add("time", StringType(), True) \
.add("name", StringType(), True) \
.add("page", StringType(), True) \
.add("session", StringType(), True)

return event_log_schema

def get_event_log_dataframe(self, spark, *args, **kwargs):
from pyspark.sql.functions import to_date, udf, struct, date_format
dataframe = spark.read.format('json').load(self.path_targets, schema=self.get_log_schema())
dataframe = dataframe.filter(dataframe['time'].isNotNull()) \
.withColumn('event_date', date_format(to_date(dataframe['time']), 'yyyy-MM-dd'))
dataframe = dataframe.filter(dataframe['event_date'] == self.lower_bound_date_string)
return dataframe


class SparkJobTask(OverwriteOutputMixin, PySparkTask):
"""
Wrapper for spark task
"""

_spark = None
_spark_context = None
_sql_context = None
_hive_context = None
_tmp_dir = None

driver_memory = '2g'
executor_memory = '3g'
always_log_stderr = False # log stderr if spark fails, True for verbose log

def init_spark(self, sc):
"""
Initialize spark, sql and hive context
:param sc: Spark context
"""
from pyspark.sql import SparkSession, SQLContext, HiveContext
self._sql_context = SQLContext(sc)
self._spark_context = sc
self._spark = SparkSession.builder.getOrCreate()
self._hive_context = HiveContext(sc)

def spark_job(self):
"""
Spark code for the job
"""
raise NotImplementedError

def _load_internal_dependency_on_cluster(self):
"""
creates a zip of package and loads it on spark worker nodes
Loading via luigi configuration does not work as it creates a tar file whereas spark does not load tar files
"""

# import packages to be loaded on cluster
import edx
import luigi
import opaque_keys
import stevedore
import bson
import ccx_keys
import cjson
import boto
import filechunkio
import ciso8601
import chardet
import urllib3
import certifi
import idna
import requests

dependencies_list = []
egg_files = luigi.configuration.get_config().get('spark', 'edx_egg_files', None)
if isinstance(egg_files, basestring):
dependencies_list = json.loads(egg_files)
packages = [edx, luigi, opaque_keys, stevedore, bson, ccx_keys, cjson, boto, filechunkio, ciso8601, chardet,
urllib3, certifi, idna, requests]
self._tmp_dir = tempfile.mkdtemp()
dependencies_list += create_packages_archive(packages, self._tmp_dir)
# dependencies_list.append('s3://edx-analytics-scratch/egg_files/edx_opaque_keys-0.4-py2.7.egg')
if len(dependencies_list) > 0:
for file in dependencies_list:
self._spark_context.addPyFile(file)

def run(self):
self.remove_output_on_overwrite()
super(SparkJobTask, self).run()

def _clean(self):
"""Do any cleanup after job here"""
import shutil
shutil.rmtree(self._tmp_dir)

def main(self, sc, *args):
self.init_spark(sc)
self._load_internal_dependency_on_cluster() # load packages on EMR cluster for spark worker nodes
self.spark_job()
self._clean()
Loading

0 comments on commit b89c71d

Please sign in to comment.