diff --git a/edx/analytics/tasks/common/bigquery_extract.py b/edx/analytics/tasks/common/bigquery_extract.py new file mode 100644 index 0000000000..68d2ab4d7f --- /dev/null +++ b/edx/analytics/tasks/common/bigquery_extract.py @@ -0,0 +1,318 @@ +""" +Collection of abstract tasks for extracting tables from BigQuery. + +BigQuery data can ONLY dump into Google Cloud Storage (GCS), not S3. Therefore, this module also +provides GCSToS3Task. +""" +import errno +import json +import logging +import subprocess +import time + +import luigi + +from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin +from edx.analytics.tasks.util.s3_util import canonicalize_s3_url +from edx.analytics.tasks.util.url import ExternalURL, GCSMarkerTarget, get_target_from_url, url_path_join + +log = logging.getLogger(__name__) + +try: + from google.cloud import bigquery + from google.oauth2 import service_account + bigquery_available = True # pylint: disable=invalid-name +except ImportError: + log.warn('Unable to import Bigquery libraries') + # On hadoop slave nodes we don't have bigquery libraries installed, + # so just fail noisily if we attempt to use these libraries there. + bigquery_available = False # pylint: disable=invalid-name + + +class BigQueryExtractDownstreamMixin(OverwriteOutputMixin): + """ + Common luigi parameters for all BigQuery extract tasks and wrapper tasks. + """ + credentials = luigi.Parameter() + project = luigi.Parameter() + dataset = luigi.Parameter() + + +class BigQueryExtractTask(BigQueryExtractDownstreamMixin, luigi.Task): + """ + Abstract Task to extract one BigQuery table into Google Cloud Storage (GCS). + + This task outputs a GCSMarkerTarget which represents the destination prefix for the dumped table + which gets broken up (automatically) into several 1GB JSON gzip'd files. + """ + + output_target = None + required_tasks = None + + # For example, output_url might be: + # + # gs://bucket/table + # + # in which case the following output files could be generated: + # + # gs://bucket/table/_SUCCESS + # gs://bucket/table/_metadata + # gs://bucket/table/0000001.json.gz + # gs://bucket/table/0000002.json.gz + # gs://bucket/table/0000003.json.gz + output_url = luigi.Parameter( + description='The GCS URL prefix of the output files, NOT including the filename pattern or trailing slash.' + ) + + def requires(self): # pylint: disable=missing-docstring + if self.required_tasks is None: + self.required_tasks = { + 'credentials': ExternalURL(url=self.credentials), + } + return self.required_tasks + + def output(self): # pylint: disable=missing-docstring + if self.output_target is None: + self.output_target = GCSMarkerTarget( + self.output_url, + credentials_file=self.input()['credentials'], + ) + return self.output_target + + @property + def table(self): + """ + Name (str) of the table in BQ to extract. + + If this is a sharded table, do not include the date suffix, e.g. return + "ga_sessions_" instead of "ga_sessions_20190408". + """ + raise NotImplementedError + + @property + def output_compression_type(self): + """ + The type of compression to use for the output files. + + Return None for no compression. + + All available options are listed here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.compression + """ + return 'GZIP' + + @property + def output_format(self): + """ + The file format to use for the output files. + + All available options are listed here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.destinationFormat + """ + return bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON + + def _get_table_identifier(self): + """ + Construct a table identifier for the table to be extracted. + + Override this method to include partition or shard suffixes. By default, this simply + returns the passed-in table without any suffixes. + + Returns: + str: The table to extract. + """ + return self.table + + def _get_destination_url_pattern(self): + """ + Generate the BQ-compatible GCS destination URL pattern including a single asterisk to + specify where the table dumps fan-out. + + For example, if self.output_url is: + + gs://bucket/table + + then this function might return: + + gs://bucket/table/*.json.gz + + Returns: + str: GCS URL pattern in BQ extract_table format. + """ + if self.output_compression_type == 'GZIP': + filename_extension = 'json.gz' + else: + filename_extension = 'json' + + # This will cause the leaf files themselves to just be named like "0000001.json.gz". + filename_pattern = '*.{}'.format(filename_extension) + + return url_path_join( + self.output_url, + filename_pattern, + ) + + def _get_job_id(self): + """ + Returns: + str: Unique identifier to assign to the BQ job. + """ + return 'extract_{table}_{timestamp}'.format(table=self._get_table_identifier(), timestamp=int(time.time())) + + def _make_bq_client(self): + """ + Construct a BigQuery client using the credentials file input target. + """ + with self.input()['credentials'].open('r') as credentials_file: + json_creds = json.load(credentials_file) + self.project_id = json_creds['project_id'] + credentials = service_account.Credentials.from_service_account_info(json_creds) + return bigquery.Client(credentials=credentials, project=self.project_id) + + def run(self): # pylint: disable=missing-docstring + self.check_bigquery_availability() + + client = self._make_bq_client() + dataset_ref = client.dataset(self.dataset, project=self.project) + table_name = self._get_table_identifier() + table_ref = dataset_ref.table(table_name) + + job_config = bigquery.job.ExtractJobConfig() + job_config.destination_format = self.output_format + if self.output_compression_type: + job_config.compression = self.output_compression_type + job = client.extract_table( + table_ref, + self._get_destination_url_pattern(), + job_config=job_config, + location='US', + job_id=self._get_job_id(), + ) + log.info("Starting BigQuery Extract job.") + job.result() # Waits for job to complete. + + self.output().touch_marker() + + def check_bigquery_availability(self): + """ + Call to ensure fast failure if this machine doesn't have the Bigquery libraries available. + """ + if not bigquery_available: + raise ImportError('Bigquery library not available.') + + +class BigQueryExtractShardedTask(BigQueryExtractTask): # pylint: disable=abstract-method + """ + Abstract class for extracting BigQuery tables to GCS, specifically for tables sharded by date. + + Be sure to include a trailing underscore when overriding the self.table property, if the table + name needs one. + """ + date = luigi.DateParameter() + + def _get_date_suffix(self): + """ + Generate a date sharding suffix in BQ's typical format (YYYYMMDD). + + Returns: + str: BQ date suffix. + """ + iso_date_string = self.date.isoformat() + return iso_date_string.replace('-', '') + + def _get_table_identifier(self): + """ + Override super's method to add the shard date suffix to the table identifier. + """ + return '{}{}'.format(self.table, self._get_date_suffix()) + + +class CopyGCSToS3Task(OverwriteOutputMixin, luigi.Task): + """ + Abstract Task to copy data from Google Cloud Storage (GCS) to Amazon S3. + + Input to this task is a GCSMarkerTarget, and output is an S3MarkerTarget. + For example, if input and output URLs were as follows: + + input = gs://bucket/foo + output = s3+https://bucket/bar + + and the contents of the input were: + + gs://bucket/foo/_SUCCESS + gs://bucket/foo/_metadata + gs://bucket/foo/0000001.json.gz + gs://bucket/foo/0000002.json.gz + gs://bucket/foo/0000003.json.gz + + then, the following output files would be generated: + + s3://bucket/bar/_SUCCESS + s3://bucket/bar/_metadata + s3://bucket/bar/0000001.json.gz + s3://bucket/bar/0000002.json.gz + s3://bucket/bar/0000003.json.gz + """ + + output_url = luigi.Parameter( + description='The S3 URL prefix of the output files, NOT including the filename pattern or extension.' + ) + + output_target = None + required_tasks = None + + def requires(self): # pylint: disable=missing-docstring + if self.required_tasks is None: + self.required_tasks = { + 'source': self.insert_source_task, + } + return self.required_tasks + + def output(self): # pylint: disable=missing-docstring + if self.output_target is None: + self.output_target = get_target_from_url(self.output_url, marker=True) + return self.output_target + + @property + def insert_source_task(self): + """ + Override this to define the source task that outputs a GCSMarkerTarget to copy from. + """ + raise NotImplementedError + + def _copy_data_from_gcs_to_s3(self, source_path, destination_path): + """ + Recursively copy a "directory" from GCS to S3. + """ + # Exclude any luigi marker files which should not be copied. The pattern is a Python + # regular expression. + exclusion_pattern = r'.*_SUCCESS$|.*_metadata$' + command = [ + 'gsutil', '-m', 'rsync', '-x', exclusion_pattern, + source_path, + canonicalize_s3_url(destination_path), + ] + log.info( + 'Invoking the following command to copy from GCS to S3: %s', + ' '.join(command), + ) + try: + return_code = subprocess.call(command) + except OSError as err: + if err.errno == errno.ENOENT: + log.error('Check that gsutil is installed.') + raise + if return_code != 0: + raise RuntimeError('Error {code} while syncing {source} to {destination}'.format( + code=return_code, + source=source_path, + destination=destination_path, + )) + + def run(self): # pylint: disable=missing-docstring + log.debug("Starting GCS Extract job.") + self._copy_data_from_gcs_to_s3( + self.input()['source'].path, + self.output_url, + ) + self.output().touch_marker() diff --git a/edx/analytics/tasks/common/pathutil.py b/edx/analytics/tasks/common/pathutil.py index ee6cacd6eb..07bb1f02ad 100644 --- a/edx/analytics/tasks/common/pathutil.py +++ b/edx/analytics/tasks/common/pathutil.py @@ -17,9 +17,10 @@ import luigi.contrib.hdfs.format import luigi.task from luigi.date_interval import Custom +from luigi.contrib.s3 import S3Client from edx.analytics.tasks.util import eventlog -from edx.analytics.tasks.util.s3_util import ScalableS3Client, generate_s3_sources, get_s3_bucket_key_names +from edx.analytics.tasks.util.s3_util import generate_s3_sources, get_s3_bucket_key_names from edx.analytics.tasks.util.url import ExternalURL, UncheckedExternalURL, get_target_from_url, url_path_join log = logging.getLogger(__name__) @@ -57,7 +58,7 @@ def generate_file_list(self): if src.startswith('s3'): # connect lazily as needed: if self.s3_conn is None: - self.s3_conn = ScalableS3Client().s3 + self.s3_conn = S3Client().s3 for _bucket, _root, path in generate_s3_sources(self.s3_conn, src, self.include, self.include_zero_length): source = url_path_join(src, path) yield ExternalURL(source) @@ -186,7 +187,7 @@ def _get_requirements(self): def _get_s3_urls(self, source): """Recursively list all files inside the source URL directory.""" - s3_conn = ScalableS3Client().s3 + s3_conn = S3Client().s3 bucket_name, root = get_s3_bucket_key_names(source) bucket = s3_conn.get_bucket(bucket_name) for key_metadata in bucket.list(root): diff --git a/edx/analytics/tasks/tests/acceptance/__init__.py b/edx/analytics/tasks/tests/acceptance/__init__.py index c1db23c20d..464e336a4f 100644 --- a/edx/analytics/tasks/tests/acceptance/__init__.py +++ b/edx/analytics/tasks/tests/acceptance/__init__.py @@ -9,10 +9,10 @@ import boto import pandas from pandas.util.testing import assert_frame_equal, assert_series_equal +from luigi.contrib.s3 import S3Client from edx.analytics.tasks.common.pathutil import PathSetTask from edx.analytics.tasks.tests.acceptance.services import db, elasticsearch_service, fs, hive, task, vertica -from edx.analytics.tasks.util.s3_util import ScalableS3Client from edx.analytics.tasks.util.url import get_target_from_url, url_path_join log = logging.getLogger(__name__) @@ -24,7 +24,7 @@ def when_s3_available(function): s3_available = getattr(when_s3_available, 's3_available', None) if s3_available is None: try: - connection = ScalableS3Client().s3 + connection = S3Client().s3 # ^ The above line will not error out if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY # are set, so it can't be used to check if we have a valid connection to S3. Instead: connection.get_all_buckets() @@ -142,7 +142,7 @@ class AcceptanceTestCase(unittest.TestCase): def setUp(self): try: - self.s3_client = ScalableS3Client() + self.s3_client = S3Client() except Exception: self.s3_client = None diff --git a/edx/analytics/tasks/tests/acceptance/test_database_export.py b/edx/analytics/tasks/tests/acceptance/test_database_export.py index 489a6cb5fa..000e35834e 100644 --- a/edx/analytics/tasks/tests/acceptance/test_database_export.py +++ b/edx/analytics/tasks/tests/acceptance/test_database_export.py @@ -14,10 +14,11 @@ import gnupg +from luigi.contrib.s3 import S3Client + from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase, when_exporter_available from edx.analytics.tasks.tests.acceptance.services import shell from edx.analytics.tasks.util.opaque_key_util import get_filename_safe_course_id, get_org_id_for_course -from edx.analytics.tasks.util.s3_util import ScalableS3Client from edx.analytics.tasks.util.url import url_path_join log = logging.getLogger(__name__) @@ -198,7 +199,7 @@ def validate_exporter_output(self, org_id, exported_filename): """ today = datetime.datetime.utcnow().strftime('%Y-%m-%d') - bucket = ScalableS3Client().s3.get_bucket(self.config.get('exporter_output_bucket')) + bucket = S3Client().s3.get_bucket(self.config.get('exporter_output_bucket')) export_id = '{org}-{date}'.format(org=org_id, date=today) filename = export_id + '.zip' key = bucket.lookup(self.output_prefix + filename) diff --git a/edx/analytics/tasks/util/s3_util.py b/edx/analytics/tasks/util/s3_util.py index 280820a269..cf36378ed0 100644 --- a/edx/analytics/tasks/util/s3_util.py +++ b/edx/analytics/tasks/util/s3_util.py @@ -122,21 +122,6 @@ def func(name): return (n for n in names if func(n)) -# TODO: Once we upgrade to boto3 (luigi>=2.7.6), delete this class! In boto3/luigi>=2.7.6, we must -# NOT pass `host` to the s3 client or else it will throw a KeyError. boto3 will already default to -# s3.amazonaws.com. -class ScalableS3Client(S3Client): - """ - S3 client that adds support for defaulting host name to s3.amazonaws.com. - """ - - def __init__(self, *args, **kwargs): - if 'host' not in kwargs: - kwargs['host'] = self._get_s3_config('host') or 's3.amazonaws.com' - - super(ScalableS3Client, self).__init__(*args, **kwargs) - - class S3HdfsTarget(HdfsTarget): """HDFS target that supports writing and reading files directly in S3.""" @@ -157,7 +142,7 @@ def open(self, mode='r'): else: safe_path = self.path.replace('s3n://', 's3://') if not hasattr(self, 's3_client'): - self.s3_client = ScalableS3Client() + self.s3_client = S3Client() return AtomicS3File(safe_path, self.s3_client, policy=DEFAULT_KEY_ACCESS_POLICY) diff --git a/edx/analytics/tasks/util/url.py b/edx/analytics/tasks/util/url.py index 1d70af0391..fcfb7d0bb6 100644 --- a/edx/analytics/tasks/util/url.py +++ b/edx/analytics/tasks/util/url.py @@ -10,6 +10,7 @@ """ from __future__ import absolute_import +import json import logging import os import time @@ -19,30 +20,57 @@ import luigi.configuration import luigi.contrib.hdfs import luigi.contrib.s3 +from luigi.contrib.gcs import GCSClient, GCSTarget from luigi.contrib.hdfs import format as hdfs_format from luigi.contrib.hdfs.target import HdfsTarget from luigi.contrib.s3 import S3Target -from edx.analytics.tasks.util.s3_util import DEFAULT_KEY_ACCESS_POLICY, S3HdfsTarget, ScalableS3Client +from edx.analytics.tasks.util.s3_util import DEFAULT_KEY_ACCESS_POLICY, S3HdfsTarget log = logging.getLogger(__name__) +try: + from google.oauth2 import service_account +except ImportError: + log.warn('Unable to import google cloud libraries') + class MarkerMixin(object): - """This mixin handles Targets that cannot accurately be measured by the existence of data files, and instead need - another positive marker to indicate Task success.""" + """ + This mixin handles Targets that cannot accurately be measured by the existence of data files, and instead need + another positive marker to indicate Task success. + """ # Check if the marker file is readable after being written, and if not then block for up to 10 minutes until a read # is successful. confirm_marker_file_after_writing = True + def _marker_path(self): + """ + Construct a new path representing the success marker. + """ + return self.path + "/_SUCCESS" + def exists(self): # pragma: no cover - """Completion of this target is based solely on the existence of the marker file.""" - return self.fs.exists(self.path + "/_SUCCESS") + """ + Completion of this target is based solely on the existence of the marker file. + """ + return self.fs.exists(self._marker_path()) + + def new_with_credentials(self, path): + """ + Create a new target, copying any credentials from the current one. + + This is just a default implementation which does not copy any credentials, override if + necessary. + """ + return self.__class__(path=path) def touch_marker(self): # pragma: no cover - """Generate the marker file using file system native to the parent Target.""" - marker = self.__class__(path=self.path + "/_SUCCESS") + """ + Generate the marker file using file system native to the parent Target. + """ + marker = self.new_with_credentials(self._marker_path()) marker.open("w").close() if self.confirm_marker_file_after_writing: @@ -61,23 +89,68 @@ def touch_marker(self): # pragma: no cover class S3MarkerTarget(MarkerMixin, S3Target): - """An S3 Target that uses a marker file to indicate success.""" + """ + An S3 Target that uses a marker file to indicate success. + """ class HdfsMarkerTarget(MarkerMixin, HdfsTarget): - """An HDFS Target that uses a marker file to indicate success.""" + """ + An HDFS Target that uses a marker file to indicate success. + """ class LocalMarkerTarget(MarkerMixin, luigi.LocalTarget): - """A Local Target that uses a marker file to indicate success.""" + """ + A Local Target that uses a marker file to indicate success. + """ class S3HdfsMarkerTarget(MarkerMixin, S3HdfsTarget): - """An S3 HDFS Target that uses a marker file to indicate success.""" + """ + An S3 HDFS Target that uses a marker file to indicate success. + """ + + +class GCSMarkerTarget(MarkerMixin, GCSTarget): + """ + A Google Cloud Storage (GCS) Target that uses a marker file to indicate success. + """ + def __init__(self, *args, **kwargs): + """ + Customize constructor to add support for reading a credentials file. + + Extra Args: + credentials_file (file-like object): A file that contains GCP credentials JSON. + """ + if 'credentials_file' in kwargs: + with kwargs['credentials_file'].open('r') as credentials_file: + json_creds = json.load(credentials_file) + log.info( + ( + "GCSMarkerTarget being constructed with credentials file for the following GCP " + "service account: %s" + ), + json_creds['client_email'], + ) + credentials = service_account.Credentials.from_service_account_info(json_creds) + kwargs['client'] = GCSClient(oauth_credentials=credentials) + del kwargs['credentials_file'] + else: + log.info("GCSMarkerTarget being constructed WITHOUT credentials.") + super(GCSMarkerTarget, self).__init__(*args, **kwargs) + + def new_with_credentials(self, path): + """ + Overriding so that we can pass `self.fs` to the new marker. + """ + return GCSMarkerTarget(path=path, client=self.fs) class ExternalURL(luigi.ExternalTask): - """Simple Task that returns a target based on its URL""" + """ + Simple Task that returns a target based on its URL + """ url = luigi.Parameter() def output(self): @@ -85,14 +158,18 @@ def output(self): class UncheckedExternalURL(ExternalURL): - """A ExternalURL task that does not verify if the source file exists, which can be expensive for S3 URLs.""" + """ + A ExternalURL task that does not verify if the source file exists, which can be expensive for S3 URLs. + """ def complete(self): return True class IgnoredTarget(HdfsTarget): - """Dummy target for use in Hadoop jobs that produce no explicit output file.""" + """ + Dummy target for use in Hadoop jobs that produce no explicit output file. + """ def __init__(self): super(IgnoredTarget, self).__init__(is_tmp=True) @@ -110,6 +187,7 @@ def open(self, mode='r'): 's3n': S3HdfsTarget, 'file': luigi.LocalTarget, 's3+https': S3Target, + 'gs': GCSTarget, } DEFAULT_MARKER_TARGET_CLASS = LocalMarkerTarget @@ -119,11 +197,14 @@ def open(self, mode='r'): 's3n': S3HdfsMarkerTarget, 'file': LocalMarkerTarget, 's3+https': S3MarkerTarget, + 'gs': GCSMarkerTarget, } def get_target_class_from_url(url, marker=False): - """Returns a luigi target class based on the url scheme""" + """ + Returns a luigi target class based on the url scheme + """ parsed_url = urlparse.urlparse(url) if marker: @@ -139,8 +220,11 @@ def get_target_class_from_url(url, marker=False): # everything else off the url and pass that in to the target. url = parsed_url.path if issubclass(target_class, S3Target): - kwargs['client'] = ScalableS3Client() kwargs['policy'] = DEFAULT_KEY_ACCESS_POLICY + if issubclass(target_class, GCSTarget): + raise NotImplementedError( + 'Attempting to construct a GCSTarget via get_target_from_url() which is currently unsupported.' + ) url = url.rstrip('/') args = (url,) @@ -149,7 +233,9 @@ def get_target_class_from_url(url, marker=False): def get_target_from_url(url, marker=False): - """Returns a luigi target based on the url scheme""" + """ + Returns a luigi target based on the url scheme + """ cls, args, kwargs = get_target_class_from_url(url, marker) return cls(*args, **kwargs) @@ -167,9 +253,8 @@ def url_path_join(url, *extra_path): url=http://foo.com/bar, extra_path=../baz -> http://foo.com/bar/../baz Args: - url (str): The URL to modify. - extra_path (str): The path to join with the current URL path. + extra_path (list of str): The path(s) to join with the current URL path. Returns: The URL with the path component joined with `extra_path` argument. diff --git a/edx/analytics/tasks/warehouse/ga_imports.py b/edx/analytics/tasks/warehouse/ga_imports.py new file mode 100644 index 0000000000..51215099b2 --- /dev/null +++ b/edx/analytics/tasks/warehouse/ga_imports.py @@ -0,0 +1,135 @@ +""" +Tasks for loading GA360 data from BigQuery -> Google Cloud Storage -> S3 -> Snowflake. + +For a given date, the following tasks will be sequenced: + +1. BigQueryExtractGATask +2. CopyGAFromGCSToS3Task +3. SnowflakeLoadGATask + +You need only schedule SnowflakeLoadGATask, and luigi will schedule dependent +tasks as necessary. + +SnowflakeLoadGAIntervalTask can be used to load multiple days of GA data, +useful when combined with Jenkins to load the "last 3 days" for automatic +recovery from downtime. +""" +import logging + +import luigi + +from edx.analytics.tasks.common.bigquery_extract import BigQueryExtractShardedTask, CopyGCSToS3Task +from edx.analytics.tasks.common.snowflake_load import SnowflakeLoadJSONTask +from edx.analytics.tasks.util.url import url_path_join + +log = logging.getLogger(__name__) + + +class BigQueryExtractGATask(BigQueryExtractShardedTask): + """ + Task that extracts GA data for the specified date from BQ into GCS. + """ + + credentials = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'bq_credentials'} + ) + project = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'bq_project'} + ) + dataset = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'bq_dataset'} + ) + + @property + def table(self): + """ + Name of the table in BQ to extract. + """ + return 'ga_sessions_' # This is a sharded table, so the table name needs a trailing underscore. + + +class CopyGAFromGCSToS3Task(CopyGCSToS3Task): + """ + Task that copies GA data for the specified date from GCS into S3. + """ + + date = luigi.DateParameter() + gcs_intermediate_url_prefix = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'gcs_intermediate_url_prefix'} + ) + + @property + def insert_source_task(self): + gcs_intermediate_url = url_path_join( + self.gcs_intermediate_url_prefix, + self.date.isoformat(), + ) + return BigQueryExtractGATask( + date=self.date, + output_url=gcs_intermediate_url, + ) + + +class SnowflakeLoadGATask(SnowflakeLoadJSONTask): + """ + Load GA data for the specified date from s3 into Snowflake. + """ + + s3_intermediate_url_prefix = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 's3_intermediate_url_prefix'} + ) + credentials = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'sf_credentials'} + ) + role = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'sf_role'} + ) + database = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'sf_database'} + ) + schema = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'sf_schema'} + ) + warehouse = luigi.Parameter( + config_path={'section': 'ga-imports', 'name': 'sf_warehouse'} + ) + + @property + def insert_source_task(self): + s3_intermediate_url = url_path_join( + self.s3_intermediate_url_prefix, + self.date.isoformat(), + ) + return CopyGAFromGCSToS3Task( + date=self.date, + output_url=s3_intermediate_url, + ) + + @property + def table(self): + """ + Provides the name of the database table. + """ + return 'ga_sessions' + + @property + def file_format_name(self): + """ + Given name for a Snowflake file format definition for the s3 location + of the GA data. + """ + return 'ga_sessions_json_format' + + +class SnowflakeLoadGAIntervalTask(luigi.WrapperTask): + """ + Fan-out the SnowflakeLoadGATask. Load GA data for the specified date range. + """ + + interval = luigi.DateIntervalParameter( + description='The range of received dates for which to load GA records.', + ) + + def requires(self): + for date in reversed([d for d in self.interval]): # pylint: disable=not-an-iterable + yield SnowflakeLoadGATask(date=date) diff --git a/requirements/base.txt b/requirements/base.txt index b92552600d..ad5c26f4c2 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,7 +10,7 @@ ansible==1.4.5 asn1crypto==0.24.0 # via cryptography bcrypt==3.1.6 # via paramiko boto==2.48.0 -cffi==1.12.2 # via bcrypt, cryptography, pynacl +cffi==1.12.3 # via bcrypt, cryptography, pynacl cryptography==2.6.1 # via paramiko ecdsa==0.13 enum34==1.1.6 # via cryptography @@ -25,4 +25,4 @@ pycrypto==2.6.1 pynacl==1.3.0 # via paramiko pyyaml==5.1 # via ansible six==1.10.0 -wheel==0.30.0 \ No newline at end of file +wheel==0.30.0 diff --git a/requirements/default.in b/requirements/default.in index d226ba55b3..a7e60afb8f 100644 --- a/requirements/default.in +++ b/requirements/default.in @@ -4,23 +4,23 @@ -r base.txt argparse==1.2.1 # Python Software Foundation License -boto3==1.4.8 # Apache 2.0 +boto3==1.9.131 # Apache 2.0 ciso8601==1.0.3 # MIT edx-ccx-keys==0.2.1 # AGPL edx-opaque-keys==0.4 # AGPL elasticsearch==1.7.0 # Apache filechunkio==1.8 # MIT -google-cloud-bigquery==0.27.0 -google-api-python-client==1.7.7 +google-cloud-bigquery==1.11.2 +google-api-python-client==1.7.8 graphitesend==0.10.0 # Apache html5lib==1.0b3 # MIT isoweek==1.3.3 # BSD numpy==1.11.3 # BSD paypalrestsdk==1.9.0 # Paypal SDK License -psycopg2==2.6.2 # LGPL +psycopg2==2.8.1 # LGPL pygeoip==0.3.2 # LGPL python-cjson==1.1.0 # LGPL -python-dateutil==2.6.1 # BSD +python-dateutil==2.7.5 # BSD # There seems to be an issue(see: https://pagure.io/python-daemon/issue/18) with dependency installation on the # latest version of python-daemon(2.2.0), so we pin it to an earlier version. python-daemon==2.1.2 @@ -35,7 +35,7 @@ user-agents==0.3.2 # MIT vertica-python==0.6.11 # MIT yarn-api-client==0.2.3 # BSD snowflake-connector-python==1.7.9 - --e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi # Apache License 2.0 +httplib2==0.12.1 +luigi==2.8.3 # Apache License 2.0 -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument # BSD diff --git a/requirements/default.txt b/requirements/default.txt index 15c829e1ef..888adc0a59 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -6,28 +6,26 @@ # --no-binary ansible --e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument ansible==1.4.5 argparse==1.2.1 asn1crypto==0.24.0 -azure-common==1.1.18 # via azure-storage-blob, azure-storage-common, snowflake-connector-python +azure-common==1.1.19 # via azure-storage-blob, azure-storage-common, snowflake-connector-python azure-nspkg==3.0.2 # via azure-common, azure-storage-nspkg azure-storage-blob==1.5.0 # via snowflake-connector-python azure-storage-common==1.4.0 # via azure-storage-blob azure-storage-nspkg==3.1.0 # via azure-storage-common backports-abc==0.5 # via tornado bcrypt==3.1.6 -boto3==1.4.8 +boto3==1.9.131 boto==2.48.0 -botocore==1.8.50 # via boto3, s3transfer, snowflake-connector-python +botocore==1.12.133 # via boto3, s3transfer, snowflake-connector-python cachetools==3.1.0 # via google-auth certifi==2019.3.9 # via requests, snowflake-connector-python, tornado -cffi==1.12.2 +cffi==1.12.3 chardet==3.0.4 # via requests ciso8601==1.0.3 cryptography==2.6.1 -distlib==0.2.2 docutils==0.14 # via botocore, python-daemon ecdsa==0.13 edx-ccx-keys==0.2.1 @@ -36,14 +34,15 @@ elasticsearch==1.7.0 enum34==1.1.6 filechunkio==1.8 future==0.17.1 # via snowflake-connector-python, vertica-python -futures==3.2.0 # via azure-storage-blob, google-cloud-core, s3transfer -google-api-python-client==1.7.7 +futures==3.2.0 # via azure-storage-blob, google-api-core, s3transfer +google-api-core==1.9.0 # via google-cloud-bigquery, google-cloud-core +google-api-python-client==1.7.8 google-auth-httplib2==0.0.3 # via google-api-python-client -google-auth==1.6.3 # via google-api-python-client, google-auth-httplib2, google-cloud-bigquery, google-cloud-core -google-cloud-bigquery==0.27.0 -google-cloud-core==0.27.1 # via google-cloud-bigquery +google-auth==1.6.3 # via google-api-core, google-api-python-client, google-auth-httplib2 +google-cloud-bigquery==1.11.2 +google-cloud-core==0.29.1 # via google-cloud-bigquery google-resumable-media==0.3.2 # via google-cloud-bigquery -googleapis-common-protos==1.5.9 # via google-cloud-core +googleapis-common-protos==1.5.9 # via google-api-core graphitesend==0.10.0 html5lib==1.0b3 httplib2==0.12.1 @@ -54,18 +53,19 @@ isoweek==1.3.3 jinja2==2.8.1 jmespath==0.9.4 # via boto3, botocore lockfile==0.12.2 # via python-daemon +luigi==2.8.3 markupsafe==1.1.1 numpy==1.11.3 paramiko==2.4.2 paypalrestsdk==1.9.0 pbr==5.1.3 # via stevedore -protobuf==3.7.1 # via google-cloud-core, googleapis-common-protos -psycopg2==2.6.2 +protobuf==3.7.1 # via google-api-core, google-cloud-bigquery, googleapis-common-protos +psycopg2==2.8.1 pyasn1-modules==0.2.4 # via google-auth, snowflake-connector-python pyasn1==0.4.5 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.0 # via snowflake-connector-python +pycryptodomex==3.8.1 # via snowflake-connector-python pygeoip==0.3.2 pyjwt==1.7.1 # via snowflake-connector-python pymongo==3.7.2 # via edx-opaque-keys @@ -73,22 +73,22 @@ pynacl==1.3.0 pyopenssl==19.0.0 # via paypalrestsdk, snowflake-connector-python python-cjson==1.1.0 python-daemon==2.1.2 -python-dateutil==2.6.1 +python-dateutil==2.7.5 python-gnupg==0.3.9 pytz==2017.3 pyyaml==5.1 requests==2.18.4 rsa==4.0 # via google-auth -s3transfer==0.1.13 # via boto3 +s3transfer==0.2.0 # via boto3 singledispatch==3.4.0.3 # via tornado six==1.10.0 snowflake-connector-python==1.7.9 stevedore==1.19.1 -tornado==4.5.3 +tornado==4.5.3 # via luigi ua-parser==0.3.6 uritemplate==3.0.0 # via google-api-python-client urllib3==1.22 user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 -yarn-api-client==0.2.3 \ No newline at end of file +yarn-api-client==0.2.3 diff --git a/requirements/docs.txt b/requirements/docs.txt index baca202202..9577bdb3fb 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -6,13 +6,12 @@ # --no-binary ansible --e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument alabaster==0.7.12 # via sphinx ansible==1.4.5 argparse==1.2.1 asn1crypto==0.24.0 -azure-common==1.1.18 +azure-common==1.1.19 azure-nspkg==3.0.2 azure-storage-blob==1.5.0 azure-storage-common==1.4.0 @@ -20,16 +19,15 @@ azure-storage-nspkg==3.1.0 babel==2.6.0 # via sphinx backports-abc==0.5 bcrypt==3.1.6 -boto3==1.4.8 +boto3==1.9.131 boto==2.48.0 -botocore==1.8.50 +botocore==1.12.133 cachetools==3.1.0 certifi==2019.3.9 -cffi==1.12.2 +cffi==1.12.3 chardet==3.0.4 ciso8601==1.0.3 cryptography==2.6.1 -distlib==0.2.2 docutils==0.14 ecdsa==0.13 edx-ccx-keys==0.2.1 @@ -39,11 +37,12 @@ enum34==1.1.6 filechunkio==1.8 future==0.17.1 futures==3.2.0 -google-api-python-client==1.7.7 +google-api-core==1.9.0 +google-api-python-client==1.7.8 google-auth-httplib2==0.0.3 google-auth==1.6.3 -google-cloud-bigquery==0.27.0 -google-cloud-core==0.27.1 +google-cloud-bigquery==1.11.2 +google-cloud-core==0.29.1 google-resumable-media==0.3.2 googleapis-common-protos==1.5.9 graphitesend==0.10.0 @@ -57,18 +56,19 @@ isoweek==1.3.3 jinja2==2.8.1 jmespath==0.9.4 lockfile==0.12.2 +luigi==2.8.3 markupsafe==1.1.1 numpy==1.11.3 paramiko==2.4.2 paypalrestsdk==1.9.0 pbr==5.1.3 protobuf==3.7.1 -psycopg2==2.6.2 +psycopg2==2.8.1 pyasn1-modules==0.2.4 pyasn1==0.4.5 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.0 +pycryptodomex==3.8.1 pygeoip==0.3.2 pygments==2.3.1 # via sphinx pyjwt==1.7.1 @@ -77,13 +77,13 @@ pynacl==1.3.0 pyopenssl==19.0.0 python-cjson==1.1.0 python-daemon==2.1.2 -python-dateutil==2.6.1 +python-dateutil==2.7.5 python-gnupg==0.3.9 pytz==2017.3 pyyaml==5.1 requests==2.18.4 rsa==4.0 -s3transfer==0.1.13 +s3transfer==0.2.0 singledispatch==3.4.0.3 six==1.10.0 snowballstemmer==1.2.1 # via sphinx @@ -97,4 +97,4 @@ urllib3==1.22 user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 -yarn-api-client==0.2.3 \ No newline at end of file +yarn-api-client==0.2.3 diff --git a/requirements/pip-tools.txt b/requirements/pip-tools.txt index edfb214852..1f571e5d97 100644 --- a/requirements/pip-tools.txt +++ b/requirements/pip-tools.txt @@ -5,5 +5,5 @@ # make upgrade # click==7.0 # via pip-tools -pip-tools==3.5.0 -six==1.10.0 \ No newline at end of file +pip-tools==3.6.0 +six==1.10.0 diff --git a/requirements/test.txt b/requirements/test.txt index 269583e50b..cb60654816 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -6,13 +6,12 @@ # --no-binary ansible --e git+https://github.com/edx/luigi.git@eb45bcc52243de11b2b16a81229ac584fe1e601b#egg=luigi -e git+https://github.com/edx/pyinstrument.git@a35ff76df4c3d5ff9a2876d859303e33d895e78f#egg=pyinstrument ansible==1.4.5 argparse==1.2.1 asn1crypto==0.24.0 astroid==1.4.9 # via pylint -azure-common==1.1.18 +azure-common==1.1.19 azure-nspkg==3.0.2 azure-storage-blob==1.5.0 azure-storage-common==1.4.0 @@ -20,20 +19,19 @@ azure-storage-nspkg==3.1.0 backports-abc==0.5 backports.functools-lru-cache==1.5 # via pylint bcrypt==3.1.6 -boto3==1.4.8 +boto3==1.9.131 boto==2.48.0 -botocore==1.8.50 +botocore==1.12.133 cachetools==3.1.0 certifi==2019.3.9 -cffi==1.12.2 +cffi==1.12.3 chardet==3.0.4 ciso8601==1.0.3 configparser==3.7.4 # via pylint coverage==4.3.1 cryptography==2.6.1 ddt==1.1.1 -diff-cover==1.0.7 -distlib==0.2.2 +diff-cover==2.0.0 docutils==0.14 ecdsa==0.13 edx-ccx-keys==0.2.1 @@ -45,11 +43,12 @@ freezegun==0.3.9 funcsigs==1.0.2 # via mock future==0.17.1 futures==3.2.0 -google-api-python-client==1.7.7 +google-api-core==1.9.0 +google-api-python-client==1.7.8 google-auth-httplib2==0.0.3 google-auth==1.6.3 -google-cloud-bigquery==0.27.0 -google-cloud-core==0.27.1 +google-cloud-bigquery==1.11.2 +google-cloud-core==0.29.1 google-resumable-media==0.3.2 googleapis-common-protos==1.5.9 graphitesend==0.10.0 @@ -67,6 +66,7 @@ jinja2==2.8.1 jmespath==0.9.4 lazy-object-proxy==1.3.1 # via astroid lockfile==0.12.2 +luigi==2.8.3 markupsafe==1.1.1 mccabe==0.6.1 # via pylint mock==2.0.0 @@ -78,13 +78,13 @@ paramiko==2.4.2 paypalrestsdk==1.9.0 pbr==5.1.3 protobuf==3.7.1 -psycopg2==2.6.2 +psycopg2==2.8.1 pyasn1-modules==0.2.4 pyasn1==0.4.5 pycodestyle==2.3.1 pycparser==2.19 pycrypto==2.6.1 -pycryptodomex==3.8.0 +pycryptodomex==3.8.1 pygeoip==0.3.2 pygments==2.3.1 # via diff-cover pyjwt==1.7.1 @@ -94,13 +94,13 @@ pynacl==1.3.0 pyopenssl==19.0.0 python-cjson==1.1.0 python-daemon==2.1.2 -python-dateutil==2.6.1 +python-dateutil==2.7.5 python-gnupg==0.3.9 pytz==2017.3 pyyaml==5.1 requests==2.18.4 rsa==4.0 -s3transfer==0.1.13 +s3transfer==0.2.0 singledispatch==3.4.0.3 six==1.10.0 snowflake-connector-python==1.7.9 @@ -113,4 +113,4 @@ user-agents==0.3.2 vertica-python==0.6.11 wheel==0.30.0 wrapt==1.11.1 # via astroid -yarn-api-client==0.2.3 \ No newline at end of file +yarn-api-client==0.2.3 diff --git a/setup.cfg b/setup.cfg index 253767e203..be0437228e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,7 @@ edx.analytics.tasks = run-vertica-sql-scripts = edx.analytics.tasks.warehouse.run_vertica_sql_scripts:RunVerticaSqlScriptTask test-vertica-sqoop = edx.analytics.tasks.common.vertica_export:VerticaSchemaToBigQueryTask load-ga-permissions = edx.analytics.tasks.warehouse.load_ga_permissions:LoadGoogleAnalyticsPermissionsWorkflow + ga-imports = edx.analytics.tasks.warehouse.ga_imports:SnowflakeLoadGATask # financial: cybersource = edx.analytics.tasks.warehouse.financial.cybersource:DailyPullFromCybersourceTask