Skip to content

Commit

Permalink
pull indexing job outputs into workspace tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhanlon committed Mar 8, 2016
1 parent 0db9e59 commit 8cb86a2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 127 deletions.
111 changes: 56 additions & 55 deletions designsafe/apps/data/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,62 @@
logger = logging.getLogger(__name__)


@shared_task
def index_job_outputs(data):
logger.debug('Preparing to index Job outputs: %s' % data)

job_owner = data['job_owner']
job_id = data['job_id']

if data['status'] == 'INDEXING':
try:
user = get_user_model().objects.get(username=job_owner)
if user.agave_oauth.expired:
user.agave_oauth.refresh()
ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL,
token=user.agave_oauth.token['access_token'])
job = ag.jobs.get(jobId=job_id)
system_id = job['archiveSystem']
archive_path = job['archivePath']
#base_file = ag.files.list(systemId=system_id, filePath=archive_path)
# the listing returns "name"="." for the first folder in the list
# we need the actual folder name
#base_file[0]['name'] = base_file[0]['path'].split('/')[-1]
#get_or_create_from_file(ag, base_file[0])
for f in agave_utils.fs_walk(ag, system_id, archive_path):
fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f)
logger.debug('Indexing: {}'.format(fo.full_path))
o = Object(**fo.to_dict())
o.save()

paths = archive_path.split('/')
for i in range(len(paths)):
path = '/'.join(paths)
fo = AgaveFolderFile.from_path(ag, system_id, path)
logger.debug('Indexing: {}'.format(fo.full_path))
o = Object(**fo.to_dict())
o.save()
paths.pop()

data['status']='FINISHED'
data['event']='FINISHED'

index=next((i for i,x in enumerate(data['html']) if 'Status' in x),None)
data['html'][index]={'Status': 'FINISHED'}


db_hash = archive_path.replace(job_owner, '')
data['action_link']={'label': 'View Output', 'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash)}

del data['event_type']
del data['users']
generic_event.send_robust(None, event_type='job', event_data=data,
event_users=[job_owner])
logger.info('data with action link: {}'.format(str(data)))

except ObjectDoesNotExist:
logger.exception('Unable to locate local user=%s' % job_owner)
# move to workspace.tasks
# @shared_task
# def index_job_outputs(data):
# logger.debug('Preparing to index Job outputs: %s' % data)
#
# job_owner = data['job_owner']
# job_id = data['job_id']
#
# if data['status'] == 'INDEXING':
# try:
# user = get_user_model().objects.get(username=job_owner)
# if user.agave_oauth.expired:
# user.agave_oauth.refresh()
# ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL,
# token=user.agave_oauth.token['access_token'])
# job = ag.jobs.get(jobId=job_id)
# system_id = job['archiveSystem']
# archive_path = job['archivePath']
# #base_file = ag.files.list(systemId=system_id, filePath=archive_path)
# # the listing returns "name"="." for the first folder in the list
# # we need the actual folder name
# #base_file[0]['name'] = base_file[0]['path'].split('/')[-1]
# #get_or_create_from_file(ag, base_file[0])
# for f in agave_utils.fs_walk(ag, system_id, archive_path):
# fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f)
# logger.debug('Indexing: {}'.format(fo.full_path))
# o = Object(**fo.to_dict())
# o.save()
#
# paths = archive_path.split('/')
# for i in range(len(paths)):
# path = '/'.join(paths)
# fo = AgaveFolderFile.from_path(ag, system_id, path)
# logger.debug('Indexing: {}'.format(fo.full_path))
# o = Object(**fo.to_dict())
# o.save()
# paths.pop()
#
# data['status']='FINISHED'
# data['event']='FINISHED'
#
# index=next((i for i,x in enumerate(data['html']) if 'Status' in x),None)
# data['html'][index]={'Status': 'FINISHED'}
#
#
# db_hash = archive_path.replace(job_owner, '')
# data['action_link']={'label': 'View Output', 'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash)}
#
# del data['event_type']
# del data['users']
# generic_event.send_robust(None, event_type='job', event_data=data,
# event_users=[job_owner])
# logger.info('data with action link: {}'.format(str(data)))
#
# except ObjectDoesNotExist:
# logger.exception('Unable to locate local user=%s' % job_owner)

@app.task
def share(system_id, path, username, permission, me):
Expand Down
5 changes: 3 additions & 2 deletions designsafe/apps/signals/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def ds_event_callback(sender, **kwargs):
logger.info('Event Type: {0}'.format(event_type))
logger.info('Event kwargs: {0}'.format(kwargs))

if event_type == 'job' and data['status'] == 'INDEXING':
tasks.index_job_outputs.delay(data)
# do this directly in workspace.tasks.watch_job_status
# if event_type == 'job' and data['status'] == 'INDEXING':
# tasks.index_job_outputs.delay(data)

if users:
rp = RedisPublisher(facility = WEBSOCKETS_FACILITY, users=users)
Expand Down
189 changes: 119 additions & 70 deletions designsafe/apps/workspace/tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from __future__ import absolute_import

from celery import shared_task, task
from designsafe.celery import app
from designsafe.apps.signals.signals import generic_event
from agavepy.agave import Agave, AgaveException

import logging
import requests
from requests import ConnectionError, HTTPError
import json

from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.core.urlresolvers import reverse
from django.conf import settings
from designsafe.apps.signals.signals import generic_event
from designsafe.libs.elasticsearch.api import Object
from dsapi.agave import utils as agave_utils
from dsapi.agave.daos import AgaveFolderFile
from agavepy.agave import Agave, AgaveException
from celery import shared_task
from requests import ConnectionError, HTTPError
import logging

logger = logging.getLogger(__name__)

Expand All @@ -32,7 +30,7 @@ def json(self):
}


@app.task
@shared_task
def submit_job(request, username, job_post, retry=1):
logger.info('Submitting job for user=%s: %s' % (username, job_post))

Expand All @@ -44,12 +42,9 @@ def submit_job(request, username, job_post, retry=1):
agave = Agave(api_server=settings.AGAVE_TENANT_BASEURL, token=token.access_token)
response = agave.jobs.submit(body=job_post)
logger.debug('Job Submission Response: {}'.format(response))

# watch job status
task_data = {
'username': username,
'job_id': response['id']
}
watch_job_status.apply_async(args=[task_data], countdown=10)
watch_job_status.apply_async(args=[username, response['id']], countdown=10)
return response

except ConnectionError as e:
Expand Down Expand Up @@ -85,10 +80,7 @@ def submit_job(request, username, job_post, retry=1):


@shared_task
def watch_job_status(data):
username = data['username']
job_id = data['job_id']
current_status = data.get('current_status', None)
def watch_job_status(username, job_id, current_status=None, retry=0):
try:
user = get_user_model().objects.get(username=username)
if user.agave_oauth.expired:
Expand Down Expand Up @@ -116,23 +108,58 @@ def watch_job_status(data):
]
}

if 'retry' in data:
# clear out any past retries
del data['retry']

if job_status in ['INDEXING', 'FINISHED', 'FAILED']:
# job finished, no additional tasks; notify
if job_status == 'FAILED':
# end state, no additional tasks; notify
logger.debug('JOB FINALIZED: id=%s status=%s' % (job_id, job_status))
generic_event.send_robust(None, event_type='job', event_data=event_data,
event_users=[username])

elif job_status == 'INDEXING':
# end state, start indexing outputs
logger.debug('JOB STATUS CHANGE: id=%s status=%s' % (job_id, job_status))

# notify
generic_event.send_robust(None, event_type='job', event_data=event_data,
event_users=[username])

try:
logger.debug('Preparing to Index Job Output job=%s' % job)
index_job_outputs(user, job)
logger.debug('Finished Indexing Job Output job=%s' % job)

event_data['status'] = 'FINISHED'
event_data['event'] = 'FINISHED'
event_data['html'][1] = {'Status': 'FINISHED'}

db_hash = job['archivePath'].replace(job['owner'], '')
event_data['action_link'] = {
'label': 'View Output',
'value': '%s#%s' % (reverse('designsafe_data:my_data'), db_hash)
}
logger.debug('Event data with action link %s' % event_data)

# notify
generic_event.send_robust(None, event_type='job', event_data=event_data,
event_users=[job['owner']])
except:
logger.exception('Error indexing job output; scheduling retry')
retry += 1
watch_job_status.apply_async(args=[username, job_id],
kwargs={'retry': retry,
'current_status': 'FINISHED'},
countdown=2**retry)

elif current_status and current_status == job_status:
# DO NOT notify, but still queue another watch task
watch_job_status.apply_async(args=[data], countdown=10)
watch_job_status.apply_async(args=[username, job_id],
kwargs={'current_status': job_status},
countdown=10)
else:
# queue another watch task
data['current_status'] = job_status
watch_job_status.apply_async(args=[data], countdown=10)
watch_job_status.apply_async(args=[username, job_id],
kwargs={'current_status': job_status},
countdown=10)

# notify
logger.debug('JOB STATUS CHANGE: id=%s status=%s' % (job_id, job_status))
generic_event.send_robust(None, event_type='job', event_data=event_data,
Expand All @@ -145,48 +172,70 @@ def watch_job_status(data):
logger.warning('Job not found. Cancelling job watch.',
extra={'job_id': job_id})
else:
retries = data.get('retry', 0) + 1
data['retry'] = retries
if retries > 10:
logger.error('Agave Job Status max retries exceeded',
extra={'job_id': job_id})
retry += 1
if retry > 10:
logger.error('Agave Job Status max retries exceeded for job=%s' % job)
else:
logger.warning('Agave API error. Retry number %s...' % retries)
watch_job_status.apply_async(args=[data], countdown=2**retries)
logger.warning('Agave API error. Retry number %s...' % retry)
watch_job_status.apply_async(args=[username, job_id],
kwargs={'retry': retry},
countdown=2**retry)

except AgaveException:
retries = data.get('retry', 0) + 1
data['retry'] = retries
if retries > 10:
logger.error('Agave Job Status max retries exceeded',
extra={'job_id': job_id})
retry += 1
if retry > 10:
logger.error('Agave Job Status max retries exceeded for job=%s' % job)
else:
logger.warning('Agave API error. Retry number %s...' % retries)
watch_job_status.apply_async(args=[data], countdown=2**retries)


@app.task
def subscribe_job_notification(request, agave, job_id):
url=request.build_absolute_uri(reverse('jobs_webhook'))+'?uuid=${UUID}&status=${STATUS}&job_id=${JOB_ID}&event=${EVENT}&system=${JOB_SYSTEM}&job_name=${JOB_NAME}&job_owner=${JOB_OWNER}'
logger.info('job notification url: {}'.format(url))

d = {
"url" : url,
"event" : "*",
"associatedUuid" : job_id,
"persistent": True
}

try:
subscribe = agave.notifications.add(body=json.dumps(d))
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
logger.debug('Job Notification Subscription Task HTTPError {0}: {1}'.format(e.response.status_code, e.__class__))
submit_job.retry(exc=e("Agave is currently down. Your notification will be created when it returns."), max_retries=None)

logger.info('agave subs: {}'.format(subscribe))


#just for testing
def mock_agave_notification():
import requests
r = requests.post('http://192.168.99.100:8000/webhooks/jobs/', data={"job_id":'1234512345', "event":"test", "job_name":'test name', "job_owner": 'mlm55', "status":"test status", "archivePath":"test/path"})
logger.warning('Agave API error. Retry number %s...' % retry)
watch_job_status.apply_async(args=[username, job_id],
kwargs={'retry': retry},
countdown=2**retry)


def index_job_outputs(user, job):
if user.agave_oauth.expired:
user.agave_oauth.refresh()
ag = Agave(api_server=settings.AGAVE_TENANT_BASEURL,
token=user.agave_oauth.access_token)
system_id = job['archiveSystem']
archive_path = job['archivePath']
paths = archive_path.split('/')
for i in range(len(paths)):
path = '/'.join(paths)
fo = AgaveFolderFile.from_path(ag, system_id, path)
logger.debug('Indexing: {}'.format(fo.full_path))
o = Object(**fo.to_dict())
o.save()
paths.pop()

for f in agave_utils.fs_walk(ag, system_id, archive_path):
fo = agave_utils.get_folder_obj(agave_client = ag, file_obj = f)
logger.debug('Indexing: {}'.format(fo.full_path))
o = Object(**fo.to_dict())
o.save()

# @shared_task
# def subscribe_job_notification(request, agave, job_id):
# url=request.build_absolute_uri(reverse('jobs_webhook'))+'?uuid=${UUID}&status=${STATUS}&job_id=${JOB_ID}&event=${EVENT}&system=${JOB_SYSTEM}&job_name=${JOB_NAME}&job_owner=${JOB_OWNER}'
# logger.info('job notification url: {}'.format(url))
#
# d = {
# "url" : url,
# "event" : "*",
# "associatedUuid" : job_id,
# "persistent": True
# }
#
# try:
# subscribe = agave.notifications.add(body=json.dumps(d))
# except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
# logger.debug('Job Notification Subscription Task HTTPError {0}: {1}'.format(e.response.status_code, e.__class__))
# submit_job.retry(exc=e("Agave is currently down. Your notification will be created when it returns."), max_retries=None)
#
# logger.info('agave subs: {}'.format(subscribe))
#
#
# #just for testing
# def mock_agave_notification():
# import requests
# r = requests.post('http://192.168.99.100:8000/webhooks/jobs/', data={"job_id":'1234512345', "event":"test", "job_name":'test name', "job_owner": 'mlm55', "status":"test status", "archivePath":"test/path"})

0 comments on commit 8cb86a2

Please sign in to comment.