From 7d6b0c0b562e115c5b17b7d1adba5dbbb831c1a1 Mon Sep 17 00:00:00 2001 From: rao-abdul-mannan Date: Wed, 28 Feb 2018 14:49:46 +0500 Subject: [PATCH] spark sql performance is better than broadcast join --- .../load_internal_reporting_user_activity.py | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py index bfa2dce584..e5a20d854b 100644 --- a/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py +++ b/edx/analytics/tasks/warehouse/load_internal_reporting_user_activity.py @@ -124,33 +124,34 @@ def spark_job(self, *args): sep='\t', schema=self._get_user_activity_table_schema() ) - # self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user') - # self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity') - # query = """ - # SELECT - # au.id, - # ua.course_id, - # ua.date, - # ua.category, - # ua.count - # FROM auth_user au - # JOIN user_activity ua - # ON au.username = ua.username - # """ - # result = self._sql_context.sql(query) - # result.coalesce(2).write.csv(self.output().path, mode='overwrite', sep='\t') - from pyspark.sql.functions import broadcast - auth_df = broadcast(auth_user_df) - user_activity_df.join( - auth_df, - auth_df.username == user_activity_df.username - ).select( - auth_df['id'], - user_activity_df['course_id'], - user_activity_df['date'], - user_activity_df['category'], - user_activity_df['count'], - ).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') + self._sql_context.registerDataFrameAsTable(auth_user_df, 'auth_user') + self._sql_context.registerDataFrameAsTable(user_activity_df, 'user_activity') + query = """ + SELECT + au.id, + ua.course_id, + ua.date, + ua.category, + ua.count + FROM auth_user au + JOIN user_activity ua + ON au.username = ua.username + """ + result = self._sql_context.sql(query) + result.coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') + # using broadcast join + # from pyspark.sql.functions import broadcast + # auth_df = broadcast(auth_user_df) + # user_activity_df.join( + # auth_df, + # auth_df.username == user_activity_df.username + # ).select( + # auth_df['id'], + # user_activity_df['course_id'], + # user_activity_df['date'], + # user_activity_df['category'], + # user_activity_df['count'], + # ).coalesce(1).write.csv(self.output().path, mode='overwrite', sep='\t') def output(self): return get_target_from_url(