Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Merge pull request #472 from edx/brian/upgrade-luigi2c
Browse files Browse the repository at this point in the history
Upgrade to fork of Luigi 2.7.
  • Loading branch information
brianhw authored Jan 24, 2018
2 parents df48fa9 + 13a6acb commit 4d9d9ae
Show file tree
Hide file tree
Showing 74 changed files with 336 additions and 429 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ endif
requirements:
pip install -U -r requirements/pre.txt
pip install -U -r requirements/default.txt --no-cache-dir --upgrade-strategy only-if-needed
pip install -U -r requirements/extra.txt --no-cache-dir
pip install -U -r requirements/extra.txt --no-cache-dir --upgrade-strategy only-if-needed

test-requirements: requirements
pip install -U -r requirements/test.txt --no-cache-dir --upgrade-strategy only-if-needed
Expand Down Expand Up @@ -89,7 +89,7 @@ coverage: test coverage-local
docs-requirements:
pip install -U -r requirements/pre.txt
pip install -U -r requirements/docs.txt --no-cache-dir --upgrade-strategy only-if-needed
pip install -U -r requirements/extra.txt --no-cache-dir
pip install -U -r requirements/extra.txt --no-cache-dir --upgrade-strategy only-if-needed
python setup.py install --force

docs-local:
Expand Down
13 changes: 3 additions & 10 deletions config/devstack.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ engine = hadoop
marker = hdfs://localhost:9000/edx-analytics-pipeline/marker/

[event-logs]
pattern = .*tracking.log.*
pattern = [".*tracking.log.*"]
expand_interval = 2 days
source = hdfs://localhost:9000/data/
source = ["hdfs://localhost:9000/data/"]

[event-export]
output_root = hdfs://localhost:9000/edx-analytics-pipeline/event-export/output/
Expand Down Expand Up @@ -58,13 +58,6 @@ output_root = hdfs://localhost:9000/edx-analytics-pipeline/activity/
[enrollments]
interval_start = 2013-11-01

[enrollment-reports]
src = hdfs://localhost:9000/data/
destination = hdfs://localhost:9000/edx-analytics-pipeline/enrollment_reports/output/
offsets = hdfs://localhost:9000/edx-analytics-pipeline/enrollment_reports/offsets.tsv
blacklist = hdfs://localhost:9000/edx-analytics-pipeline/enrollment_reports/course_blacklist.tsv
history = hdfs://localhost:9000/edx-analytics-pipeline/enrollment_reports/enrollment_history.tsv

[financial-reports]
shoppingcart-partners = {"DEFAULT": "edx"}

Expand All @@ -79,7 +72,7 @@ dropoff_threshold = 0.05

[elasticsearch]
# Point to the vagrant host's port 9201 where we assume elasticsearch is running
host = http://172.17.0.1:9201/
host = ["http://172.17.0.1:9201/"]

[module-engagement]
alias = roster
Expand Down
31 changes: 10 additions & 21 deletions config/test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@ marker = s3://fake/marker/
remote_log_level = DEBUG

[event-logs]
pattern = .*tracking.log-(?P<date>\d{8}).*\.gz
.*tracking.notalog-(?P<date>\d{8}).*\.gz
# pattern = [".*tracking.log-(?P<date>\d{8}).*\.gz", ".*tracking.notalog-(?P<date>\d{8}).*\.gz"]
pattern = [".*tracking.log-(?P<date>\\\\d{8}).*\\\\.gz", ".*tracking.notalog-(?P<date>\\\\d{8}).*\\\\.gz"]
expand_interval = 2 days
source = s3://fake/input/
s3://fake/input2/
source = ["s3://fake/input/", "s3://fake/input2/"]

[segment-logs]
pattern = .*segment.log-(?P<date>\d{8}).*\.gz
.*segment.notalog-(?P<date>\d{8}).*\.gz
source = s3://fake/segment/input/
s3://fake/segment/input2/
pattern = [".*segment.log-(?P<date>\\\\d{8}).*\\\\.gz", ".*segment.notalog-(?P<date>\\\\d{8}).*\\\\.gz"]
source = ["s3://fake/segment/input/", "s3://fake/segment/input2/"]

[event-export]
output_root = s3://fake/
Expand Down Expand Up @@ -69,13 +66,6 @@ overwrite_n_days = 14
interval_start = 2013-11-01
overwrite_n_days = 14

[enrollment-reports]
src = s3://fake/input/
destination = s3://fake/enrollment_reports/output/
offsets = s3://fake/enrollment_reports/offsets.tsv
blacklist = s3://fake/enrollment_reports/course_blacklist.tsv
history = s3://fake/enrollment_reports/enrollment_history.tsv

[financial-reports]
shoppingcart-partners = {"OpenCraftX": "OCX", "DEFAULT": "edx"}

Expand All @@ -96,9 +86,9 @@ access_token = acceptance
credentials = s3://fake/vertica_creds.json
schema = testing
read_timeout = 600
standard_roles = data_engineering_team
restricted_roles = data_engineering_team
business_intelligence_team_roles = data_engineering_team
standard_roles = ["data_engineering_team"]
restricted_roles = ["data_engineering_team"]
business_intelligence_team_roles = ["data_engineering_team"]
event_retention_interval = 3 days

[payment-reconciliation]
Expand Down Expand Up @@ -129,8 +119,7 @@ host=test.cybersource.com/test
interval_start = 2015-09-01

[payment]
cybersource_merchant_ids = test
empty_test
cybersource_merchant_ids = ["test", "empty_test"]

[videos]
dropoff_threshold = 0.05
Expand All @@ -141,7 +130,7 @@ alias = roster
number_of_shards = 5

[course-catalog-api]
partner_short_codes = openedx
partner_short_codes = ["openedx"]
api_root_url = http://example.com/api/v1/

[ccx]
Expand Down
4 changes: 2 additions & 2 deletions docs/source/gen_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
import time
import stevedore
from luigi.task import Register
from luigi.task_register import Register


SPHINX_DIR = os.path.dirname(__file__)
Expand Down Expand Up @@ -81,7 +81,7 @@ def gen_sphinx_tasks(entry_point, labels, *_args, **kwargs):
sys.exit('Unable to write to {file_name}'.format(file_name=tocfile_name))

# For each Task, sorted by class name
tasks = Register.get_reg()
tasks = Register._get_reg()
for name in sorted(tasks):
cls = tasks[name]
module = cls.__module__
Expand Down
9 changes: 5 additions & 4 deletions docs/source/luigi_sphinx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import inspect
import luigi

from luigi.parameter import _no_value

def append_parameters(_app, _what, _name, obj, _options, lines):
"""
Expand Down Expand Up @@ -36,13 +37,13 @@ def append_parameters(_app, _what, _name, obj, _options, lines):

# Mark configured parameters (requires protected-access)
# pylint: disable=W0212
if hasattr(membervalue, '_Parameter__config') and membervalue._Parameter__config is not None:
param['default'] = ' pulled from ``{section}.{name}``'.format(**membervalue._Parameter__config)
if hasattr(membervalue, '_config_path') and membervalue._config_path is not None:
param['default'] = 'pulled from ``{section}.{name}``'.format(**membervalue._config_path)
param['type'] = u'{type}, configurable'.format(**param)

# Mark optional parameters
elif hasattr(membervalue, 'default'):
param['default'] = membervalue.default
elif hasattr(membervalue, '_default') and membervalue._default != _no_value:
param['default'] = membervalue._default
param['type'] = u'{type}, optional'.format(**param)

if 'default' in param:
Expand Down
3 changes: 1 addition & 2 deletions edx/analytics/tasks/common/elasticsearch_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class ElasticsearchIndexTask(OverwriteOutputMixin, MapReduceJobTask):
"""

host = luigi.Parameter(
is_list=True,
host = luigi.ListParameter(
config_path={'section': 'elasticsearch', 'name': 'host'},
description='Hostnames for the elasticsearch cluster nodes. They can be specified in any of the formats'
' accepted by the elasticsearch-py library. This includes complete URLs such as http://foo.com/, or'
Expand Down
16 changes: 7 additions & 9 deletions edx/analytics/tasks/common/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from hashlib import md5

import luigi
import luigi.hadoop
import luigi.hdfs
import luigi.contrib.hadoop
import luigi.task
from luigi import configuration

Expand All @@ -37,8 +36,7 @@ class MapReduceJobTaskMixin(object):
description='The input_format for Hadoop job to use. For example, when '
'running with manifest file, specify "oddjob.ManifestTextInputFormat" for input_format.',
)
lib_jar = luigi.Parameter(
is_list=True,
lib_jar = luigi.ListParameter(
default=[],
significant=False,
description='A list of library jars that the Hadoop job can make use of.',
Expand All @@ -55,7 +53,7 @@ class MapReduceJobTaskMixin(object):
)


class MapReduceJobTask(MapReduceJobTaskMixin, luigi.hadoop.JobTask):
class MapReduceJobTask(MapReduceJobTaskMixin, luigi.contrib.hadoop.JobTask):
"""
Execute a map reduce job. Typically using Hadoop, but can execute the
job in process as well.
Expand Down Expand Up @@ -133,7 +131,7 @@ def input_hadoop(self):
return convert_to_manifest_input_if_necessary(manifest_id, super(MapReduceJobTask, self).input_hadoop())


class MapReduceJobRunner(luigi.hadoop.HadoopJobRunner):
class MapReduceJobRunner(luigi.contrib.hadoop.HadoopJobRunner):
"""
Support more customization of the streaming command.
Expand All @@ -160,11 +158,11 @@ def __init__(self, libjars_in_hdfs=None, input_format=None):
)


class EmulatedMapReduceJobRunner(luigi.hadoop.JobRunner):
class EmulatedMapReduceJobRunner(luigi.contrib.hadoop.JobRunner):
"""
Execute map reduce tasks in process on the machine that is running luigi.
This is a modified version of luigi.hadoop.LocalJobRunner. The key differences are:
This is a modified version of luigi.contrib.hadoop.LocalJobRunner. The key differences are:
* It gracefully handles .gz input files, decompressing them and streaming them directly to the mapper. This mirrors
the behavior of hadoop's default file input format. Note this only works for files that support `tell()` and
Expand Down Expand Up @@ -252,7 +250,7 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
output_root = luigi.Parameter(
description='A URL location where the split files will be stored.',
)
delete_output_root = luigi.BooleanParameter(
delete_output_root = luigi.BoolParameter(
default=False,
significant=False,
description='If True, recursively deletes the `output_root` at task creation.',
Expand Down
25 changes: 21 additions & 4 deletions edx/analytics/tasks/common/mysql_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,22 @@ def requires(self):
if self.required_tasks is None:
self.required_tasks = {
'credentials': ExternalURL(url=self.credentials),
'insert_source': self.insert_source_task
}
if not self.insert_source_task_dynamically:
self.required_tasks['insert_source'] = self.insert_source_task

return self.required_tasks

@property
def insert_source_task(self):
"""Defines task that provides source of data for insertion."""
raise NotImplementedError

@property
def insert_source_task_dynamically(self):
"""Declare if task that provides source of data for insertion should be a dynamic dependency."""
return False

@property
def table(self):
"""Provides name of database table."""
Expand Down Expand Up @@ -170,9 +177,11 @@ def create_database(self):
def rows(self):
"""Return/yield tuples or lists corresponding to each row to be inserted """
try:
with self.input()['insert_source'].open('r') as fobj:
for line in fobj:
yield line.strip('\n').split('\t')
if self.insert_source_task is not None:
input_target = self.insert_source_task.output()
with input_target.open('r') as fobj:
for line in fobj:
yield line.strip('\n').split('\t')
except RuntimeError:
# While calling finish on an input target, Luigi throws a RuntimeError exception if the subprocess command
# to read the input returns a non-zero return code. As all of the data's been read already, we choose to ignore
Expand Down Expand Up @@ -320,6 +329,14 @@ def run(self):
Normally you don't want to override this.
"""
# Use dynamic dependencies here to make sure that the tasks on
# which this depends have been run.
if self.insert_source_task_dynamically and self.insert_source_task is not None:
log.debug('Yielding dependency dynamically at runtime for %s: %s', self, self.insert_source_task)
yield self.insert_source_task
else:
yield []

if not (self.table and self.columns):
raise Exception("table and columns need to be specified")

Expand Down
28 changes: 12 additions & 16 deletions edx/analytics/tasks/common/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import re

import luigi
import luigi.format
import luigi.hdfs
import luigi.contrib.hdfs
import luigi.contrib.hdfs.format
import luigi.task
from luigi.date_interval import DateInterval
from luigi.date_interval import Custom

from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.s3_util import ScalableS3Client, generate_s3_sources, get_s3_bucket_key_names
Expand All @@ -30,21 +30,19 @@ class PathSetTask(luigi.Task):
A task to select a subset of files in an S3 bucket or local FS.
"""
src = luigi.Parameter(
is_list=True,
src = luigi.ListParameter(
config_path={'section': 'event-logs', 'name': 'source'},
description='A URL pointing to a folder in s3:// or local FS.',
)
include = luigi.Parameter(
is_list=True,
include = luigi.ListParameter(
default=('*',),
description='A list of patterns to use to select. Multiple patterns are OR\'d.',
)
manifest = luigi.Parameter(
default=None,
description='A URL pointing to a manifest file location.',
)
include_zero_length = luigi.BooleanParameter(
include_zero_length = luigi.BoolParameter(
default=False,
description='If True, include files/directories with size zero.',
)
Expand All @@ -64,7 +62,7 @@ def generate_file_list(self):
source = url_path_join(src, path)
yield ExternalURL(source)
elif src.startswith('hdfs'):
for source, size in luigi.hdfs.listdir(src, recursive=True, include_size=True):
for source, size in luigi.contrib.hdfs.listdir(src, recursive=True, include_size=True):
if not self.include_zero_length and size == 0:
continue
elif any(fnmatch.fnmatch(source, include_val) for include_val in self.include):
Expand Down Expand Up @@ -109,8 +107,7 @@ def output(self):
class EventLogSelectionDownstreamMixin(object):
"""Defines parameters for passing upstream to tasks that use EventLogSelectionMixin."""

source = luigi.Parameter(
is_list=True,
source = luigi.ListParameter(
config_path={'section': 'event-logs', 'name': 'source'},
description='A URL to a path that contains log files that contain the events. (e.g., s3://my_bucket/foo/).',
)
Expand All @@ -122,8 +119,7 @@ class EventLogSelectionDownstreamMixin(object):
description='A time interval to add to the beginning and end of the interval to expand the windows of '
'files captured.',
)
pattern = luigi.Parameter(
is_list=True,
pattern = luigi.ListParameter(
config_path={'section': 'event-logs', 'name': 'pattern'},
description='A regex with a named capture group for the date that approximates the date that the events '
'within were emitted. Note that the search interval is expanded, so events don\'t have to be in exactly '
Expand All @@ -150,7 +146,7 @@ class PathSelectionByDateIntervalTask(EventLogSelectionDownstreamMixin, luigi.Wr

def __init__(self, *args, **kwargs):
super(PathSelectionByDateIntervalTask, self).__init__(*args, **kwargs)
self.interval = DateInterval(
self.interval = Custom(
self.interval.date_a - self.expand_interval,
self.interval.date_b + self.expand_interval
)
Expand Down Expand Up @@ -200,9 +196,9 @@ def _get_s3_urls(self, source):

def _get_hdfs_urls(self, source):
"""Recursively list all files inside the source directory on the hdfs filesystem."""
if luigi.hdfs.exists(source):
if luigi.contrib.hdfs.exists(source):
# listdir raises an exception if the source doesn't exist.
for source in luigi.hdfs.listdir(source, recursive=True):
for source in luigi.contrib.hdfs.listdir(source, recursive=True):
yield source

def _get_local_urls(self, source):
Expand Down
Loading

0 comments on commit 4d9d9ae

Please sign in to comment.