From 8cb86a259096b1a34177ed10dd572cc803a0f4ff Mon Sep 17 00:00:00 2001 From: Matthew R Hanlon Date: Mon, 7 Mar 2016 22:01:22 -0600 Subject: [PATCH] pull indexing job outputs into workspace tasks --- designsafe/apps/data/tasks.py | 111 ++++++++-------- designsafe/apps/signals/receivers.py | 5 +- designsafe/apps/workspace/tasks.py | 189 +++++++++++++++++---------- 3 files changed, 178 insertions(+), 127 deletions(-) diff --git a/designsafe/apps/data/tasks.py b/designsafe/apps/data/tasks.py index a17487f1e1..871e2f9f74 100644 --- a/designsafe/apps/data/tasks.py +++ b/designsafe/apps/data/tasks.py @@ -17,61 +17,62 @@ logger = logging.getLogger(__name__) -@shared_task -def index_job_outputs(data): - logger.debug('Preparing to index Job outputs: %s' % data) - - job_owner = data['job_owner'] - job_id = data['job_id'] - - if data['status'] == 'INDEXING': - try: - user = get_user_model().objects.get(username=job_owner) - if user.agave_oauth.expired: - user.agave_oauth.refresh() - ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL, - token=user.agave_oauth.token['access_token']) - job = ag.jobs.get(jobId=job_id) - system_id = job['archiveSystem'] - archive_path = job['archivePath'] - #base_file = ag.files.list(systemId=system_id, filePath=archive_path) - # the listing returns "name"="." for the first folder in the list - # we need the actual folder name - #base_file[0]['name'] = base_file[0]['path'].split('/')[-1] - #get_or_create_from_file(ag, base_file[0]) - for f in agave_utils.fs_walk(ag, system_id, archive_path): - fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f) - logger.debug('Indexing: {}'.format(fo.full_path)) - o = Object(**fo.to_dict()) - o.save() - - paths = archive_path.split('/') - for i in range(len(paths)): - path = '/'.join(paths) - fo = AgaveFolderFile.from_path(ag, system_id, path) - logger.debug('Indexing: {}'.format(fo.full_path)) - o = Object(**fo.to_dict()) - o.save() - paths.pop() - - data['status']='FINISHED' - data['event']='FINISHED' - - index=next((i for i,x in enumerate(data['html']) if 'Status' in x),None) - data['html'][index]={'Status': 'FINISHED'} - - - db_hash = archive_path.replace(job_owner, '') - data['action_link']={'label': 'View Output', 'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash)} - - del data['event_type'] - del data['users'] - generic_event.send_robust(None, event_type='job', event_data=data, - event_users=[job_owner]) - logger.info('data with action link: {}'.format(str(data))) - - except ObjectDoesNotExist: - logger.exception('Unable to locate local user=%s' % job_owner) +# move to workspace.tasks +# @shared_task +# def index_job_outputs(data): +# logger.debug('Preparing to index Job outputs: %s' % data) +# +# job_owner = data['job_owner'] +# job_id = data['job_id'] +# +# if data['status'] == 'INDEXING': +# try: +# user = get_user_model().objects.get(username=job_owner) +# if user.agave_oauth.expired: +# user.agave_oauth.refresh() +# ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL, +# token=user.agave_oauth.token['access_token']) +# job = ag.jobs.get(jobId=job_id) +# system_id = job['archiveSystem'] +# archive_path = job['archivePath'] +# #base_file = ag.files.list(systemId=system_id, filePath=archive_path) +# # the listing returns "name"="." for the first folder in the list +# # we need the actual folder name +# #base_file[0]['name'] = base_file[0]['path'].split('/')[-1] +# #get_or_create_from_file(ag, base_file[0]) +# for f in agave_utils.fs_walk(ag, system_id, archive_path): +# fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f) +# logger.debug('Indexing: {}'.format(fo.full_path)) +# o = Object(**fo.to_dict()) +# o.save() +# +# paths = archive_path.split('/') +# for i in range(len(paths)): +# path = '/'.join(paths) +# fo = AgaveFolderFile.from_path(ag, system_id, path) +# logger.debug('Indexing: {}'.format(fo.full_path)) +# o = Object(**fo.to_dict()) +# o.save() +# paths.pop() +# +# data['status']='FINISHED' +# data['event']='FINISHED' +# +# index=next((i for i,x in enumerate(data['html']) if 'Status' in x),None) +# data['html'][index]={'Status': 'FINISHED'} +# +# +# db_hash = archive_path.replace(job_owner, '') +# data['action_link']={'label': 'View Output', 'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash)} +# +# del data['event_type'] +# del data['users'] +# generic_event.send_robust(None, event_type='job', event_data=data, +# event_users=[job_owner]) +# logger.info('data with action link: {}'.format(str(data))) +# +# except ObjectDoesNotExist: +# logger.exception('Unable to locate local user=%s' % job_owner) @app.task def share(system_id, path, username, permission, me): diff --git a/designsafe/apps/signals/receivers.py b/designsafe/apps/signals/receivers.py index 12228900aa..abfc7f95d1 100644 --- a/designsafe/apps/signals/receivers.py +++ b/designsafe/apps/signals/receivers.py @@ -25,8 +25,9 @@ def ds_event_callback(sender, **kwargs): logger.info('Event Type: {0}'.format(event_type)) logger.info('Event kwargs: {0}'.format(kwargs)) - if event_type == 'job' and data['status'] == 'INDEXING': - tasks.index_job_outputs.delay(data) + # do this directly in workspace.tasks.watch_job_status + # if event_type == 'job' and data['status'] == 'INDEXING': + # tasks.index_job_outputs.delay(data) if users: rp = RedisPublisher(facility = WEBSOCKETS_FACILITY, users=users) diff --git a/designsafe/apps/workspace/tasks.py b/designsafe/apps/workspace/tasks.py index 384897d68f..4670fb634f 100644 --- a/designsafe/apps/workspace/tasks.py +++ b/designsafe/apps/workspace/tasks.py @@ -1,19 +1,17 @@ from __future__ import absolute_import -from celery import shared_task, task -from designsafe.celery import app -from designsafe.apps.signals.signals import generic_event -from agavepy.agave import Agave, AgaveException - -import logging -import requests -from requests import ConnectionError, HTTPError -import json - from django.contrib.auth import get_user_model from django.core.exceptions import ObjectDoesNotExist from django.core.urlresolvers import reverse from django.conf import settings +from designsafe.apps.signals.signals import generic_event +from designsafe.libs.elasticsearch.api import Object +from dsapi.agave import utils as agave_utils +from dsapi.agave.daos import AgaveFolderFile +from agavepy.agave import Agave, AgaveException +from celery import shared_task +from requests import ConnectionError, HTTPError +import logging logger = logging.getLogger(__name__) @@ -32,7 +30,7 @@ def json(self): } -@app.task +@shared_task def submit_job(request, username, job_post, retry=1): logger.info('Submitting job for user=%s: %s' % (username, job_post)) @@ -44,12 +42,9 @@ def submit_job(request, username, job_post, retry=1): agave = Agave(api_server=settings.AGAVE_TENANT_BASEURL, token=token.access_token) response = agave.jobs.submit(body=job_post) logger.debug('Job Submission Response: {}'.format(response)) + # watch job status - task_data = { - 'username': username, - 'job_id': response['id'] - } - watch_job_status.apply_async(args=[task_data], countdown=10) + watch_job_status.apply_async(args=[username, response['id']], countdown=10) return response except ConnectionError as e: @@ -85,10 +80,7 @@ def submit_job(request, username, job_post, retry=1): @shared_task -def watch_job_status(data): - username = data['username'] - job_id = data['job_id'] - current_status = data.get('current_status', None) +def watch_job_status(username, job_id, current_status=None, retry=0): try: user = get_user_model().objects.get(username=username) if user.agave_oauth.expired: @@ -116,23 +108,58 @@ def watch_job_status(data): ] } - if 'retry' in data: - # clear out any past retries - del data['retry'] - - if job_status in ['INDEXING', 'FINISHED', 'FAILED']: - # job finished, no additional tasks; notify + if job_status == 'FAILED': + # end state, no additional tasks; notify logger.debug('JOB FINALIZED: id=%s status=%s' % (job_id, job_status)) + generic_event.send_robust(None, event_type='job', event_data=event_data, + event_users=[username]) + elif job_status == 'INDEXING': + # end state, start indexing outputs + logger.debug('JOB STATUS CHANGE: id=%s status=%s' % (job_id, job_status)) + + # notify generic_event.send_robust(None, event_type='job', event_data=event_data, event_users=[username]) + + try: + logger.debug('Preparing to Index Job Output job=%s' % job) + index_job_outputs(user, job) + logger.debug('Finished Indexing Job Output job=%s' % job) + + event_data['status'] = 'FINISHED' + event_data['event'] = 'FINISHED' + event_data['html'][1] = {'Status': 'FINISHED'} + + db_hash = job['archivePath'].replace(job['owner'], '') + event_data['action_link'] = { + 'label': 'View Output', + 'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash) + } + logger.debug('Event data with action link %s' % event_data) + + # notify + generic_event.send_robust(None, event_type='job', event_data=event_data, + event_users=[job['owner']]) + except: + logger.exception('Error indexing job output; scheduling retry') + retry += 1 + watch_job_status.apply_async(args=[username, job_id], + kwargs={'retry': retry, + 'current_status': 'FINISHED'}, + countdown=2**retry) + elif current_status and current_status == job_status: # DO NOT notify, but still queue another watch task - watch_job_status.apply_async(args=[data], countdown=10) + watch_job_status.apply_async(args=[username, job_id], + kwargs={'current_status': job_status}, + countdown=10) else: # queue another watch task - data['current_status'] = job_status - watch_job_status.apply_async(args=[data], countdown=10) + watch_job_status.apply_async(args=[username, job_id], + kwargs={'current_status': job_status}, + countdown=10) + # notify logger.debug('JOB STATUS CHANGE: id=%s status=%s' % (job_id, job_status)) generic_event.send_robust(None, event_type='job', event_data=event_data, @@ -145,48 +172,70 @@ def watch_job_status(data): logger.warning('Job not found. Cancelling job watch.', extra={'job_id': job_id}) else: - retries = data.get('retry', 0) + 1 - data['retry'] = retries - if retries > 10: - logger.error('Agave Job Status max retries exceeded', - extra={'job_id': job_id}) + retry += 1 + if retry > 10: + logger.error('Agave Job Status max retries exceeded for job=%s' % job) else: - logger.warning('Agave API error. Retry number %s...' % retries) - watch_job_status.apply_async(args=[data], countdown=2**retries) + logger.warning('Agave API error. Retry number %s...' % retry) + watch_job_status.apply_async(args=[username, job_id], + kwargs={'retry': retry}, + countdown=2**retry) except AgaveException: - retries = data.get('retry', 0) + 1 - data['retry'] = retries - if retries > 10: - logger.error('Agave Job Status max retries exceeded', - extra={'job_id': job_id}) + retry += 1 + if retry > 10: + logger.error('Agave Job Status max retries exceeded for job=%s' % job) else: - logger.warning('Agave API error. Retry number %s...' % retries) - watch_job_status.apply_async(args=[data], countdown=2**retries) - - -@app.task -def subscribe_job_notification(request, agave, job_id): - url=request.build_absolute_uri(reverse('jobs_webhook'))+'?uuid=${UUID}&status=${STATUS}&job_id=${JOB_ID}&event=${EVENT}&system=${JOB_SYSTEM}&job_name=${JOB_NAME}&job_owner=${JOB_OWNER}' - logger.info('job notification url: {}'.format(url)) - - d = { - "url" : url, - "event" : "*", - "associatedUuid" : job_id, - "persistent": True - } - - try: - subscribe = agave.notifications.add(body=json.dumps(d)) - except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e: - logger.debug('Job Notification Subscription Task HTTPError {0}: {1}'.format(e.response.status_code, e.__class__)) - submit_job.retry(exc=e("Agave is currently down. Your notification will be created when it returns."), max_retries=None) - - logger.info('agave subs: {}'.format(subscribe)) - - -#just for testing -def mock_agave_notification(): - import requests - r = requests.post('http://192.168.99.100:8000/webhooks/jobs/', data={"job_id":'1234512345', "event":"test", "job_name":'test name', "job_owner": 'mlm55', "status":"test status", "archivePath":"test/path"}) + logger.warning('Agave API error. Retry number %s...' % retry) + watch_job_status.apply_async(args=[username, job_id], + kwargs={'retry': retry}, + countdown=2**retry) + + +def index_job_outputs(user, job): + if user.agave_oauth.expired: + user.agave_oauth.refresh() + ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL, + token=user.agave_oauth.access_token) + system_id = job['archiveSystem'] + archive_path = job['archivePath'] + paths = archive_path.split('/') + for i in range(len(paths)): + path = '/'.join(paths) + fo = AgaveFolderFile.from_path(ag, system_id, path) + logger.debug('Indexing: {}'.format(fo.full_path)) + o = Object(**fo.to_dict()) + o.save() + paths.pop() + + for f in agave_utils.fs_walk(ag, system_id, archive_path): + fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f) + logger.debug('Indexing: {}'.format(fo.full_path)) + o = Object(**fo.to_dict()) + o.save() + +# @shared_task +# def subscribe_job_notification(request, agave, job_id): +# url=request.build_absolute_uri(reverse('jobs_webhook'))+'?uuid=${UUID}&status=${STATUS}&job_id=${JOB_ID}&event=${EVENT}&system=${JOB_SYSTEM}&job_name=${JOB_NAME}&job_owner=${JOB_OWNER}' +# logger.info('job notification url: {}'.format(url)) +# +# d = { +# "url" : url, +# "event" : "*", +# "associatedUuid" : job_id, +# "persistent": True +# } +# +# try: +# subscribe = agave.notifications.add(body=json.dumps(d)) +# except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e: +# logger.debug('Job Notification Subscription Task HTTPError {0}: {1}'.format(e.response.status_code, e.__class__)) +# submit_job.retry(exc=e("Agave is currently down. Your notification will be created when it returns."), max_retries=None) +# +# logger.info('agave subs: {}'.format(subscribe)) +# +# +# #just for testing +# def mock_agave_notification(): +# import requests +# r = requests.post('http://192.168.99.100:8000/webhooks/jobs/', data={"job_id":'1234512345', "event":"test", "job_name":'test name', "job_owner": 'mlm55', "status":"test status", "archivePath":"test/path"})