Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
replace username with user_id
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed May 5, 2018
1 parent 7f6f2a3 commit 9def6c5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 392 deletions.
42 changes: 22 additions & 20 deletions edx/analytics/tasks/insights/user_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,18 @@ def mapper(self, line):
return
event, date_string = value

username = event.get('username', '').strip()
if not username:
user_id = event.get('context', {}).get('user_id')
if not user_id:
self.incr_counter('UserActivity', 'Discard Missing User ID', 1)
log.error("User-Activity: event without user_id in context: %s", event)
return

course_id = eventlog.get_course_id(event)
if not course_id:
return

for label in self.get_predicate_labels(event):
yield date_string, self._encode_tuple((course_id, username, date_string, label))
yield date_string, self._encode_tuple((str(user_id), course_id, date_string, label))

def get_predicate_labels(self, event):
"""Creates labels by applying hardcoded predicates to a single event."""
Expand Down Expand Up @@ -111,15 +113,15 @@ def multi_output_reducer(self, _date_string, values, output_file):
counter = Counter(values)

for key, num_events in counter.iteritems():
course_id, username, date_string, label = key
value = (course_id, username, date_string, label, num_events)
user_id, course_id, date_string, label = key
value = (user_id, course_id, date_string, label, num_events)
output_file.write('\t'.join([str(field) for field in value]))
output_file.write('\n')

def output_path_for_key(self, key):
date_string = key
return url_path_join(
self.hive_partition_path('user_activity', date_string),
self.hive_partition_path('user_activity_by_user', date_string),
'user_activity_{date}'.format(
date=date_string,
)
Expand Down Expand Up @@ -203,17 +205,17 @@ def spark_job(self, *args):
df = df.filter(
(df['event_source'] != 'task') &
~ df['event_type'].startswith('edx.course.enrollment.') &
(df['username'] != '')
(df['context.user_id'] != '')
)
# passing complete row to UDF
df = df.withColumn('all_labels', get_labels(df['event_type'], df['event_source'])) \
.withColumn('course_id', get_courseid(df['context']))
df = df.filter(df['course_id'] != '') # remove rows with empty course_id
df = df.withColumn('label', explode(split(df['all_labels'], ',')))
result = df.select('course_id', 'username', 'event_date', 'label') \
.groupBy('course_id', 'username', 'event_date', 'label').count()
result = df.select('context.user_id', 'course_id', 'event_date', 'label') \
.groupBy('user_id', 'course_id', 'event_date', 'label').count()
result = result.withColumn('dt', lit(result['event_date'])) # generate extra column for partitioning
result.coalesce(1).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')
result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')


class UserActivityDownstreamMixin(WarehouseMixin, EventLogSelectionDownstreamMixin, MapReduceJobTaskMixin):
Expand Down Expand Up @@ -256,7 +258,7 @@ def query(self):

@property
def table(self):
return 'user_activity'
return 'user_activity_by_user'

@property
def partition_by(self):
Expand All @@ -265,8 +267,8 @@ def partition_by(self):
@property
def columns(self):
return [
('user_id', 'INT'),
('course_id', 'STRING'),
('username', 'STRING'),
('date', 'STRING'),
('category', 'STRING'),
('count', 'INT'),
Expand Down Expand Up @@ -314,7 +316,7 @@ def requires(self):
def user_activity_hive_table_path(self, *args):
return url_path_join(
self.warehouse_path,
'user_activity'
'user_activity_by_user'
)

def calendar_hive_table_path(self, *args):
Expand All @@ -329,8 +331,8 @@ def calendar_hive_table_path(self, *args):

def get_user_activity_table_schema(self):
from pyspark.sql.types import StructType, StringType
schema = StructType().add("course_id", StringType(), True) \
.add("username", StringType(), True) \
schema = StructType().add("user_id", StringType(), True) \
.add("course_id", StringType(), True) \
.add("date", StringType(), True) \
.add("category", StringType(), True) \
.add("count", StringType(), True) \
Expand Down Expand Up @@ -361,16 +363,16 @@ def spark_job(self, *args):
sep='\t',
schema=self.get_calendar_table_schema()
)
user_activity_df.createOrReplaceTempView('user_activity')
user_activity_df.createOrReplaceTempView('user_activity_by_user')
calendar_df.createOrReplaceTempView('calendar')
query = """
SELECT
act.course_id as course_id,
CONCAT(cal.iso_week_start, " 00:00:00") as interval_start,
CONCAT(cal.iso_week_end, " 00:00:00") as interval_end,
act.category as label,
COUNT (DISTINCT username) as count
FROM user_activity act
COUNT (DISTINCT user_id) as count
FROM user_activity_by_user act
JOIN calendar cal
ON act.date = cal.date AND act.dt >= "{interval_start}" AND act.dt < "{interval_end}"
WHERE
Expand Down Expand Up @@ -445,8 +447,8 @@ def query(self):
CONCAT(cal.iso_week_start, ' 00:00:00') as interval_start,
CONCAT(cal.iso_week_end, ' 00:00:00') as interval_end,
act.category as label,
COUNT(DISTINCT username) as count
FROM user_activity act
COUNT(DISTINCT user_id) as count
FROM user_activity_by_user act
JOIN calendar cal
ON act.`date` = cal.`date` AND act.dt >= "{interval_start}" AND act.dt < "{interval_end}"
WHERE
Expand Down
Loading

0 comments on commit 9def6c5

Please sign in to comment.