Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
quick test spark internal reporting user activity
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Feb 14, 2018
1 parent 30ad82b commit 9dba8a9
Showing 1 changed file with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,40 +58,40 @@ def run(self):
self.remove_output_on_overwrite()
super(InternalReportingUserActivityPartitionTaskSpark, self).run()

def requires(self):
required_tasks = [
ImportAuthUserTask(overwrite=False, destination=self.warehouse_path)
]
if self.overwrite_n_days > 0:
overwrite_from_date = self.date - datetime.timedelta(days=self.overwrite_n_days)
overwrite_interval = luigi.date_interval.Custom(overwrite_from_date, self.date)
required_tasks.append(
UserActivityTaskSpark(
interval=overwrite_interval,
warehouse_path=self.warehouse_path,
output_root=self._get_user_activity_hive_table_path(),
overwrite=True,
)
)
yield required_tasks
# def requires(self):
# required_tasks = [
# ImportAuthUserTask(overwrite=False, destination=self.warehouse_path)
# ]
# if self.overwrite_n_days > 0:
# overwrite_from_date = self.date - datetime.timedelta(days=self.overwrite_n_days)
# overwrite_interval = luigi.date_interval.Custom(overwrite_from_date, self.date)
# required_tasks.append(
# UserActivityTaskSpark(
# interval=overwrite_interval,
# warehouse_path=self.warehouse_path,
# output_root=self._get_user_activity_hive_table_path(),
# overwrite=True,
# )
# )
# yield required_tasks

def _get_auth_user_hive_table_path(self):
todays_date = datetime.datetime.utcnow().date()
import_date = datetime.datetime.utcnow().date()
return url_path_join(
self.warehouse_path,
'auth_user',
'dt={}'.format(todays_date.isoformat())
'dt=2018-02-13'
)

def _get_auth_user_table_schema(self):
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType
schema = StructType().add("id", IntegerType(), True) \
from pyspark.sql.types import StructType, StringType
schema = StructType().add("id", StringType(), True) \
.add("username", StringType(), True) \
.add("last_login", StringType(), True) \
.add("date_joined", StringType(), True) \
.add("is_active", BooleanType(), True) \
.add("is_superuser", BooleanType(), True) \
.add("is_staff", BooleanType(), True) \
.add("is_active", StringType(), True) \
.add("is_superuser", StringType(), True) \
.add("is_staff", StringType(), True) \
.add("email", StringType(), True) \
.add("dt", StringType(), True)
return schema
Expand Down Expand Up @@ -138,7 +138,7 @@ def spark_job(self, *args):
ON au.username = ua.username
"""
result = self._sql_context.sql(query)
result.coalesce(10).write.csv(self.output().path, mode='overwrite', sep='\t')
result.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t')

def output(self):
return get_target_from_url(
Expand Down

0 comments on commit 9dba8a9

Please sign in to comment.