From 7612ca75b3b1737a1af29e6a89148fdfbd6e6d23 Mon Sep 17 00:00:00 2001 From: andamian Date: Wed, 6 Jun 2018 10:41:27 -0700 Subject: [PATCH] S1990 (#108) * Added support for streaming vls results --- vos/test/scripts/vospace-token-atest.tcsh | 7 +- vos/vos/commands/vls.py | 65 ++++++------ vos/vos/commonparser.py | 8 +- vos/vos/tests/test_vos.py | 34 ++++++- vos/vos/vos.py | 118 ++++++++++++++++++++-- 5 files changed, 181 insertions(+), 51 deletions(-) diff --git a/vos/test/scripts/vospace-token-atest.tcsh b/vos/test/scripts/vospace-token-atest.tcsh index 781c1cb42..b3c609962 100755 --- a/vos/test/scripts/vospace-token-atest.tcsh +++ b/vos/test/scripts/vospace-token-atest.tcsh @@ -1,7 +1,7 @@ #!/bin/tcsh -f if (! ${?VOSPACE_WEBSERVICE} ) then - setenv VOSPACE_WEBSERVICE 'www.canfar.phys.uvic.ca' + setenv VOSPACE_WEBSERVICE 'www.cadc-ccda.hia-iha.nrc-cnrc.gc.ca' echo "VOSPACE_WEBSERVICE env variable not set, use default WebService URL $VOSPACE_WEBSERVICE" else echo "WebService URL (VOSPACE_WEBSERVICE env variable): $VOSPACE_WEBSERVICE" @@ -48,9 +48,8 @@ set TOKEN = "`curl -s -d username=$username -d password=$password ${ACCESS_PAGE} echo -n "create containers" -echo $MKDIRCMD --token="$TOKEN" -p $CONTAINER/A -$MKDIRCMD --token="$TOKEN" -p $CONTAINER/A > /dev/null || echo " [FAIL]" && exit -1 -$MKDIRCMD --token="$TOKEN" $CONTAINER/B > /dev/null || echo " [FAIL]" && exit -1 +$MKDIRCMD --token "$TOKEN" -p $CONTAINER/A > /dev/null || echo " [FAIL]" && exit -1 +$MKDIRCMD --token "$TOKEN" $CONTAINER/B > /dev/null || echo " [FAIL]" && exit -1 echo " [OK]" echo -n "set permissions" diff --git a/vos/vos/commands/vls.py b/vos/vos/commands/vls.py index 77f5e0dd1..ca9768186 100755 --- a/vos/vos/commands/vls.py +++ b/vos/vos/commands/vls.py @@ -101,9 +101,6 @@ def vls(): columns.extend( ['readGroup', 'writeGroup', 'isLocked', 'size', 'date']) - # determine if their is a sorting order - sort_key = (opt.time and "date") or (opt.Size and "size") or False - # create a client to send VOSpace command client = vos.Client(vospace_certfile=opt.certfile, vospace_token=opt.token) @@ -124,51 +121,49 @@ def vls(): else: files.append(target) - info_list = [] - for f in files: - client.get_node(f) - info_list = info_list + client.get_info_list(f) + # determine if their is a sorting order + if opt.Size: + sort = vos.SortNodeProperty.LENGTH + elif opt.time: + sort = vos.SortNodeProperty.DATE + else: + sort = None + + if opt.reverse: + order = 'desc' + else: + order = 'asc' - _display_target(columns, info_list, opt, sort_key) + for f in files: + for row in client.get_children_info(f, sort, order): + _display_target(columns, row) for d in dirs: - n = client.get_node(d, limit=None, force=True) + n = client.get_node(d, limit=0, force=True) if (len(dirs) + len(files)) > 1: sys.stdout.write('\n{}:\n'.format(n.name)) if opt.long: sys.stdout.write('total: {}\n'.format( int(n.get_info()['size']))) - info_list = client.get_info_list(d) - _display_target(columns, info_list, opt, sort_key) + for row in client.get_children_info(d, sort, order): + _display_target(columns, row) except Exception as ex: exit_on_exception(ex) -def _display_target(columns, info_list, opt, sort_key): - if sort_key: - # noinspection PyBroadException - try: - sorted_list = \ - sorted(info_list, - key=lambda name: name.get_info()[sort_key], - reverse=not opt.reverse) - except Exception: - sorted_list = info_list - finally: - info_list = sorted_list - for n in info_list: - name_string = n.name - info = n.get_info() - for col in columns: - value = info.get(col, None) - value = value is not None and value or "" - if col in __LIST_FORMATS__: - sys.stdout.write(__LIST_FORMATS__[col](value)) - if info["permissions"][0] == 'l': - name_string = "%s -> %s" % ( - name_string, info['target']) - sys.stdout.write("%s\n" % name_string) +def _display_target(columns, row): + name_string = row.name + info = row.get_info() + for col in columns: + value = info.get(col, None) + value = value is not None and value or "" + if col in __LIST_FORMATS__: + sys.stdout.write(__LIST_FORMATS__[col](value)) + if info["permissions"][0] == 'l': + name_string = "%s -> %s" % ( + row.name, info['target']) + sys.stdout.write("%s\n" % name_string) vls.__doc__ = DESCRIPTION diff --git a/vos/vos/commonparser.py b/vos/vos/commonparser.py index db6d14173..dc188a9bc 100644 --- a/vos/vos/commonparser.py +++ b/vos/vos/commonparser.py @@ -9,13 +9,13 @@ from .version import version -# handle interrupts nicely def signal_handler(signum, frame): - raise KeyboardInterrupt( - "SIGINT signal handler. {0} {1}".format(signum, frame)) + """Exit without calling cleanup handlers, flushing stdio buffers, etc. """ + os._exit(signum) -signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGINT, signal_handler) # Ctrl-C +signal.signal(signal.SIGPIPE, signal_handler) # Pipe gone (head, more etc) def exit_on_exception(ex): diff --git a/vos/vos/tests/test_vos.py b/vos/vos/tests/test_vos.py index 241c5febc..88724da6f 100644 --- a/vos/vos/tests/test_vos.py +++ b/vos/vos/tests/test_vos.py @@ -2,11 +2,15 @@ import os import unittest +import pytest import requests from xml.etree import ElementTree from mock import Mock, patch, MagicMock, call from vos import Client, Connection, Node, VOFile from vos import vos as vos +from six.moves.urllib.parse import urlparse +from six.moves import urllib + # The following is a temporary workaround for Python issue 25532 # (https://bugs.python.org/issue25532) @@ -34,6 +38,33 @@ class Object(object): pass +def test_get_node_url(): + client = Client() + with pytest.raises(TypeError): + client.get_node_url('vos://cadc.nrc.ca!vospace/auser', sort='Blah') + with pytest.raises(ValueError): + client.get_node_url('vos://cadc.nrc.ca!vospace/auser', order='Blah') + + response = Mock() + response.status_code = 303 + client.conn.session.get = Mock(return_value=response) + equery = urlparse(client.get_node_url('vos://cadc.nrc.ca!vospace/auser', + sort=vos.SortNodeProperty.DATE)).query + assert(urllib.parse.unquote(equery) == + 'sort={}'.format(vos.SortNodeProperty.DATE.value)) + + equery = urlparse(client.get_node_url('vos://cadc.nrc.ca!vospace/auser', + sort=vos.SortNodeProperty.LENGTH, order='asc')).query + args = urllib.parse.unquote(equery).split('&') + assert(2 == len(args)) + assert('order=asc' in args) + assert('sort={}'.format(vos.SortNodeProperty.LENGTH.value) in args) + + equery = urlparse(client.get_node_url('vos://cadc.nrc.ca!vospace/auser', + order='desc')).query + assert('order=desc' == urllib.parse.unquote(equery)) + + class TestClient(unittest.TestCase): """Test the vos Client class. """ @@ -113,7 +144,8 @@ def test_get_info_list(self): mock_link_node.target = 'vos:/somefile' client = Client() client.get_node = MagicMock(side_effect=[mock_link_node, mock_node]) - self.assertEquals([mock_node], client.get_info_list('vos:/somenode')) + self.assertEquals([mock_node], + client.get_children_info('vos:/somenode')) def test_nodetype(self): mock_node = MagicMock(id=333) diff --git a/vos/vos/vos.py b/vos/vos/vos.py index 3ad8637a5..bba131c9c 100644 --- a/vos/vos/vos.py +++ b/vos/vos/vos.py @@ -12,6 +12,7 @@ import errno from datetime import datetime import fnmatch +from enum import Enum try: from cStringIO import StringIO @@ -76,6 +77,14 @@ # CADC specific views VO_CADC_VIEW_URI = 'ivo://cadc.nrc.ca/vospace/view' + +# sorting-related uris +class SortNodeProperty(Enum): + """ URIs of node properties used for sorting""" + LENGTH = 'ivo://ivoa.net/vospace/core#length' + DATE = 'ivo://ivoa.net/vospace/core#date' + + CADC_VO_VIEWS = {'data': '{}#data'.format(VO_CADC_VIEW_URI), 'manifest': '{}#manifest'.format(VO_CADC_VIEW_URI), 'rss': '{}#rss'.format(VO_CADC_VIEW_URI), @@ -761,6 +770,56 @@ def node_list(self): self.add_child(nodeNode) return self._node_list + def get_children(self, client, sort, order, limit=500): + """ Gets an iterator over the nodes held to by a ContainerNode""" + # IF THE CALLER KNOWS THEY DON'T NEED THE CHILDREN THEY + # CAN SET LIMIT=0 IN THE CALL Also, if the number of nodes + # on the firt call was less than 500, we likely got them + # all during the init + if not self.isdir(): + return + + if self.node_list is not None: + # children already downloaded + for i in self.node_list: + yield i.uri + + # stream children + xml_file = StringIO( + client.open(self.uri, os.O_RDONLY, + limit=limit, sort=sort, + order=order).read().decode('UTF-8')) + xml_file.seek(0) + page = Node(ElementTree.parse(xml_file).getroot()) + nl = page.node_list + current_index = 0 + run = True + while run and nl: + yield_node = nl[current_index] + current_index = current_index + 1 + if current_index == len(nl): + if len(nl) == limit: + # do another page read + xml_file = StringIO( + client.open(self.uri, os.O_RDONLY, next_uri=nl[-1].uri, + sort=sort, order=order, + limit=limit).read().decode('UTF-8')) + xml_file.seek(0) + page = Node(ElementTree.parse(xml_file).getroot()) + nl = page.node_list + if len(nl) == 1 and nl[0].uri == yield_node.uri: + # that must be the last node + run = False + else: + # skip first returned entry as it is the same with + # the last one from the previous batch + current_index = 1 + else: + run = False + with client.nodeCache.watch(yield_node.uri) as childWatch: + childWatch.insert(yield_node) + yield yield_node + def add_child(self, child_element_tree): """ Add a child node to a node list. @@ -1812,7 +1871,8 @@ def get_node(self, uri, limit=0, force=False): return node def get_node_url(self, uri, method='GET', view=None, limit=None, - next_uri=None, cutout=None, full_negotiation=None): + next_uri=None, cutout=None, sort=None, order=None, + full_negotiation=None): """Split apart the node string into parts and return the correct URL for this node. @@ -1834,6 +1894,11 @@ def get_node_url(self, uri, method='GET', view=None, limit=None, :param cutout: The cutout pattern to apply to the file at the service end: applies to view='cutout' only. :type cutout: str, None + :param sort: node property to sort on + :type sort: vos.NodeProperty, None + :param order: Order of sorting, Ascending ('asc' - default) or + Descending ('desc') + :type order: unicode, None :param full_negotiation: Should we use the transfer UWS or do a GET and follow the redirect. :type full_negotiation: bool @@ -1844,6 +1909,10 @@ def get_node_url(self, uri, method='GET', view=None, limit=None, """ uri = self.fix_uri(uri) + if sort is not None and not isinstance(sort, SortNodeProperty): + raise TypeError('sort must be an instace of vos.NodeProperty Enum') + if order not in [None, 'asc', 'desc']: + raise ValueError('order must be either "asc" or "desc"') if view in ['data', 'cutout'] and method == 'GET': node = self.get_node(uri, limit=0) if node.islink(): @@ -1864,7 +1933,7 @@ def get_node_url(self, uri, method='GET', view=None, limit=None, logger.debug("Getting URLs for: {0}".format(target)) return self.get_node_url(target, method=method, view=view, limit=limit, next_uri=next_uri, - cutout=cutout, + cutout=cutout, sort=sort, order=order, full_negotiation=full_negotiation) logger.debug("Getting URL for: " + str(uri)) @@ -1900,6 +1969,10 @@ def get_node_url(self, uri, method='GET', view=None, limit=None, fields = {} if limit is not None: fields['limit'] = limit + if sort is not None: + fields['sort'] = sort.value + if order is not None: + fields['order'] = order if view is not None: fields['view'] = view if next_uri is not None: @@ -1965,7 +2038,9 @@ def get_node_url(self, uri, method='GET', view=None, limit=None, full_negotiation=True, limit=limit, next_uri=next_uri, - cutout=cutout) + cutout=cutout, + sort=sort, + order=order) logger.debug("Sending short cut url: {0}".format(url)) return [url] @@ -2204,7 +2279,7 @@ def get_transfer_error(self, url, uri): def open(self, uri, mode=os.O_RDONLY, view=None, head=False, url=None, limit=None, next_uri=None, size=None, cutout=None, - byte_range=None, + byte_range=None, sort=None, order=None, full_negotiation=False, possible_partial_read=False): """Create a VOFile connection to the specified uri or url. @@ -2236,6 +2311,11 @@ def open(self, uri, mode=os.O_RDONLY, view=None, head=False, url=None, :param byte_range: The range of bytes to request, rather than getting the entire file. :type byte_range: unicode, None + :param sort: node property to sort on + :type sort: vos.NodeProperty, None + :param order: Sorting order. Values: asc for ascending (default), desc + for descending + :type order: unicode, None :param full_negotiation: force this interaction to use the full UWS interaction to get the url for the resource :type full_negotiation: bool @@ -2282,7 +2362,7 @@ def open(self, uri, mode=os.O_RDONLY, view=None, head=False, url=None, return self.open(target, mode, view, head, url, limit, next_uri, size, cutout, - byte_range) + byte_range, sort, order) else: # A target external link # TODO Need a way of passing along authentication. @@ -2305,7 +2385,7 @@ def open(self, uri, mode=os.O_RDONLY, view=None, head=False, url=None, if url is None: url = self.get_node_url(uri, method=method, view=view, limit=limit, next_uri=next_uri, - cutout=cutout, + cutout=cutout, sort=sort, order=order, full_negotiation=full_negotiation) if url is None: raise OSError(errno.EREMOTE) @@ -2451,8 +2531,32 @@ def delete(self, uri): response = self.conn.session.delete(url) response.raise_for_status() + def get_children_info(self, uri, sort=None, order=None): + """Returns an iterator over tuples of (NodeName, Info dict) + :param uri: the Node to get info about. + :param sort: node property to sort on (vos.NodeProperty) + :param order: order of sorting: 'asc' - default or 'desc' + """ + uri = self.fix_uri(uri) + logger.debug(str(uri)) + node = self.get_node(uri, limit=0, force=True) + logger.debug(str(node)) + while node.type == "vos:LinkNode": + uri = node.target + try: + node = self.get_node(uri, limit=0, force=True) + except Exception as exception: + logger.error(str(exception)) + break + if node.type in ["vos:DataNode", "vos:LinkNode"]: + return [node] + else: + return node.get_children(self, sort, order, 500) + def get_info_list(self, uri): - """Retrieve a list of tuples of (NodeName, Info dict) + """Retrieve a list of tuples of (NodeName, Info dict). + Similar to the method above except that information is loaded + directly into memory. :param uri: the Node to get info about. """ info_list = []