diff --git a/vos/setup.cfg b/vos/setup.cfg index 658b6ef4..e30b73d5 100644 --- a/vos/setup.cfg +++ b/vos/setup.cfg @@ -48,9 +48,9 @@ license = AGPLv3 url = https://www.canfar.net/en/docs/storage edit_on_github = False github_project = opencadc/vostools -install_requires = html2text>=2016.5.29 cadcutils>=1.2.1 future aenum +install_requires = html2text>=2016.5.29 cadcutils>=1.2.6 future aenum # version should be PEP440 compatible (http://www.python.org/dev/peps/pep-0440) -version = 3.3.4 +version = 3.3.5 [entry_points] @@ -67,3 +67,4 @@ vrmdir = vos.commands.vrmdir:vrmdir vsync = vos.commands.vsync:vsync vtag = vos.commands.vtag:vtag vos-config = vos.vosconfig:vos_config_main + diff --git a/vos/vos/commands/tests/test_vsync.py b/vos/vos/commands/tests/test_vsync.py new file mode 100644 index 00000000..4d422cba --- /dev/null +++ b/vos/vos/commands/tests/test_vsync.py @@ -0,0 +1,379 @@ +# # -*- coding: utf-8 -*- +# *********************************************************************** +# ****************** CANADIAN ASTRONOMY DATA CENTRE ******************* +# ************* CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** +# +# (c) 2021. (c) 2021. +# Government of Canada Gouvernement du Canada +# National Research Council Conseil national de recherches +# Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 +# All rights reserved Tous droits réservés +# +# NRC disclaims any warranties, Le CNRC dénie toute garantie +# expressed, implied, or énoncée, implicite ou légale, +# statutory, of any kind with de quelque nature que ce +# respect to the software, soit, concernant le logiciel, +# including without limitation y compris sans restriction +# any warranty of merchantability toute garantie de valeur +# or fitness for a particular marchande ou de pertinence +# purpose. NRC shall not be pour un usage particulier. +# liable in any event for any Le CNRC ne pourra en aucun cas +# damages, whether direct or être tenu responsable de tout +# indirect, special or general, dommage, direct ou indirect, +# consequential or incidental, particulier ou général, +# arising from the use of the accessoire ou fortuit, résultant +# software. Neither the name de l'utilisation du logiciel. Ni +# of the National Research le nom du Conseil National de +# Council of Canada nor the Recherches du Canada ni les noms +# names of its contributors may de ses participants ne peuvent +# be used to endorse or promote être utilisés pour approuver ou +# products derived from this promouvoir les produits dérivés +# software without specific prior de ce logiciel sans autorisation +# written permission. préalable et particulière +# par écrit. +# +# This file is part of the Ce fichier fait partie du projet +# OpenCADC project. OpenCADC. +# +# OpenCADC is free software: OpenCADC est un logiciel libre ; +# you can redistribute it and/or vous pouvez le redistribuer ou le +# modify it under the terms of modifier suivant les termes de +# the GNU Affero General Public la “GNU Affero General Public +# License as published by the License” telle que publiée +# Free Software Foundation, par la Free Software Foundation +# either version 3 of the : soit la version 3 de cette +# License, or (at your option) licence, soit (à votre gré) +# any later version. toute version ultérieure. +# +# OpenCADC is distributed in the OpenCADC est distribué +# hope that it will be useful, dans l’espoir qu’il vous +# but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE +# without even the implied GARANTIE : sans même la garantie +# warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ +# or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF +# PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence +# General Public License for Générale Publique GNU Affero +# more details. pour plus de détails. +# +# You should have received Vous devriez avoir reçu une +# a copy of the GNU Affero copie de la Licence Générale +# General Public License along Publique GNU Affero avec +# with OpenCADC. If not, see OpenCADC ; si ce n’est +# . pas le cas, consultez : +# . +# +# $Revision: 4 $ +# +# *********************************************************************** +# +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import tempfile +import os +import importlib +import datetime +import pytest +import mock +from mock import Mock +import hashlib + +from vos.commands.vsync import validate, prepare, build_file_list, execute, \ + TransferReport, compute_md5 +from cadcutils import exceptions as transfer_exceptions +from vos.vos import ZERO_MD5 + + +def module_patch(*args): + """ + Need to use this instead of mock.patch because vsync module has a function + vsync defined. + Credit: https://stackoverflow.com/questions/52324568/how-to-mock-a- + function-called-in-a-function-inside-a-module-with-the-same-name + :param args: + :return: + """ + target = args[0] + components = target.split('.') + for i in range(len(components), 0, -1): + try: + # attempt to import the module + imported = importlib.import_module('.'.join(components[:i])) + + # module was imported, let's use it in the patch + result = mock.patch(*args) + result.getter = lambda: imported + result.attribute = '.'.join(components[i:]) + return result + except Exception: + pass + + # did not find a module, just return the default mock + return mock.patch(*args) + + +def test_compute_md5(): + tmp_file = tempfile.NamedTemporaryFile() + assert compute_md5(tmp_file.name) == ZERO_MD5 + + content = b'abc' + open(tmp_file.name, 'wb').write(content) + md5 = hashlib.md5() + md5.update(content) + assert compute_md5(tmp_file.name) == md5.hexdigest() + # try again to use cache + assert compute_md5(tmp_file.name) == md5.hexdigest() + # make cache stalled + content = b'cba' + open(tmp_file.name, 'wb').write(content) + md5 = hashlib.md5() + md5.update(content) + assert compute_md5(tmp_file.name) == md5.hexdigest() + + +def test_validate(): + assert validate('somepath') + assert validate('somepath', exclude='.') + assert not validate('.hiddenfile', exclude='.') + assert not validate('file.fits.tmp', exclude='tmp') + assert not validate('file.fits.tmp', exclude='exe,tmp') + assert validate('somepath', include='.?me.?') + assert not validate('sopath', include='.?me.?') + # exclude wins + assert not validate('somepath', include='.?me.?', exclude='me') + # illegal characters + assert not validate('ab[cd') + + +def test_prepare(): + client = Mock() + vos_location = 'vos:someservice/somedir' + tmp_file = tempfile.NamedTemporaryFile() + assert tmp_file, vos_location == prepare( + tmp_file.name, vos_location, client) + assert not client.mkdir.called + + tmp_dir = tempfile.TemporaryDirectory() + src_dir = os.path.join(tmp_dir.name, 'vsyncsrc') + os.mkdir(src_dir) + assert not prepare(src_dir, vos_location, client) + client.mkdir.assert_called_with(vos_location) + + # simlinks are not synced + client.mkdir.reset_mock() + link_file = os.path.join(src_dir, 'filelink') + os.symlink(tmp_file.name, link_file) + assert not prepare(link_file, vos_location, client) + assert not client.mkdir.called + + # directory exists on the server + client.mkdir.reset_mock() + client.mkdir.side_effect = transfer_exceptions.AlreadyExistsException + tmp_dir = tempfile.TemporaryDirectory() + assert not prepare(tmp_dir.name, vos_location, client) + client.mkdir.assert_called_with(vos_location) + + +def test_build_file_list(): + def check_list(expected, actual): + """ + checks lists of expected files vs actual. Order is determined by + the os.walk function and it's not deterministic so we test just + the existence of elements in the list + """ + assert len(actual) == len(expected) + for item in expected: + assert item in actual + + tmp_dir = tempfile.TemporaryDirectory() + src_dir_name = 'syncsrc' + src_dir = os.path.join(tmp_dir.name, src_dir_name) + os.mkdir(src_dir) + # normally name of the src directory is part of this but we keep this + # for simplicity + vos_root = 'vos:someservice/somepath' + + def get_vos_path(path, sync_dir): + if sync_dir.endswith('/'): + base_dir = sync_dir + else: + base_dir = os.path.dirname(sync_dir) + uri_path = os.path.relpath(path, base_dir) + return '{}/{}'.format(vos_root, uri_path) + + check_list([(src_dir, get_vos_path(src_dir, src_dir))], + build_file_list([src_dir], vos_root)) + + file1 = 'file1' + file1_path = os.path.join(src_dir, file1) + open(file1_path, 'w').write('test') + + expected_list = [(src_dir, get_vos_path(src_dir, src_dir)), + (file1_path, get_vos_path(file1_path, src_dir))] + check_list(expected_list, build_file_list([src_dir], vos_root)) + + dir1 = 'dir1' + dir1_path = os.path.join(src_dir, dir1) + os.mkdir(dir1_path) + file2 = 'file2' + file2_path = os.path.join(dir1_path, file2) + open(file2_path, 'w').write('test') + dir2 = 'dir2' + dir2_path = os.path.join(src_dir, dir2) + os.mkdir(dir2_path) + + # if not recursive we get the same result as the previous test + check_list(expected_list, build_file_list([src_dir], vos_root)) + + # now recursive + expected_list = \ + [(src_dir, get_vos_path(src_dir, src_dir)), + (dir1_path, get_vos_path(dir1_path, src_dir)), + (dir2_path, get_vos_path(dir2_path, src_dir)), + (file1_path, get_vos_path(file1_path, src_dir)), + (file2_path, get_vos_path(file2_path, src_dir))] + check_list(expected_list, build_file_list( + [src_dir], vos_root, recursive=True)) + + # repeat but now add "/" at the end of the source. The sync just + # the content of the dir and not the dir itself + src_dir_content = src_dir + '/' + expected_list_content = \ + [(dir1_path, get_vos_path(dir1_path, src_dir_content)), + (dir2_path, get_vos_path(dir2_path, src_dir_content)), + (file1_path, get_vos_path(file1_path, src_dir_content)), + (file2_path, get_vos_path(file2_path, src_dir_content))] + check_list(expected_list_content, build_file_list( + [src_dir_content], vos_root, recursive=True)) + + # path='syncsrc' and vos_root='ivo://someservice/somepath' should generate + # the same list as path='syncsrc/' and + # vos_root='ivo://someservice/somepath/syncsrc' with the exception of + # the entry corresponding to the 'syncsrc' directory which is not + # generated in the second case (but assumed to already exist on server) + expected_list.pop(0) + check_list(expected_list, build_file_list( + [src_dir_content], '{}/{}'.format(vos_root, src_dir_name), + recursive=True)) + + # filtered results + expected_list = \ + [(src_dir, get_vos_path(src_dir, src_dir)), + (dir1_path, get_vos_path(dir1_path, src_dir)), + (file1_path, get_vos_path(file1_path, src_dir)), + (file2_path, get_vos_path(file2_path, src_dir))] + check_list(expected_list, build_file_list( + [src_dir], vos_root, recursive=True, include="1")) + + # repeat with no recursive + expected_list = \ + [(src_dir, get_vos_path(src_dir, src_dir)), + (file1_path, get_vos_path(file1_path, src_dir))] + check_list(expected_list, build_file_list( + [src_dir], vos_root, recursive=False, include="1")) + + # filter with exclude + expected_list = \ + [(src_dir, get_vos_path(src_dir, src_dir)), + (dir1_path, get_vos_path(dir1_path, src_dir)), + (file1_path, get_vos_path(file1_path, src_dir))] + check_list(expected_list, build_file_list( + [src_dir], vos_root, recursive=True, exclude="2")) + + # redo while doubling up the list + check_list(expected_list, build_file_list( + [src_dir]*2, vos_root, recursive=True, exclude="2")) + + # sync src_dir + a file + expected_list.append((file1_path, '{}/{}'.format(vos_root, file1))) + check_list(expected_list, build_file_list( + [src_dir, file1_path], vos_root, recursive=True, exclude="2")) + + # error when the src file does not exist + with pytest.raises(ValueError): + build_file_list([src_dir, 'bogus'], vos_root) + + +def test_transfer_report(): + tr = TransferReport() + assert not tr.files_erred + assert not tr.files_sent + assert not tr.files_skipped + assert not tr.bytes_sent + assert not tr.bytes_skipped + + +@module_patch('vos.commands.vsync.get_client') +def test_execute(get_client): + now = datetime.datetime.timestamp(datetime.datetime.now()) + node = Mock(props={'MD5': 'beef'}, attr={'st_size': 3, 'st_ctime': now}) + get_node_mock = Mock(return_value=node) + client_mock = Mock() + client_mock.get_node = get_node_mock + get_client.return_value = client_mock + tmp_file = tempfile.NamedTemporaryFile() + + class Options: + pass + + options = Options + options.overwrite = True + options.ignore_checksum = True + options.certfile = None + options.token = None + options.cache_nodes = False + expected_report = TransferReport() + expected_report.files_sent = 1 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # put some content in the file + open(tmp_file.name, 'w').write('ABC') + expected_report.bytes_sent = 3 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # no override, same md5 and older remote time = > no update + now = datetime.datetime.timestamp(datetime.datetime.now()) + node.attr['st_ctime'] = now + md5 = compute_md5(tmp_file.name) + node.props['MD5'] = md5 + options.overwrite = False + expected_report = TransferReport() + expected_report.files_skipped = 1 + expected_report.bytes_skipped = 3 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # mismatched md5 but ignore checksum => no update + node.props['MD5'] = 'beef' + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # mismached md5 and no ignore checksum => update + options.ignore_checksum = False + expected_report = TransferReport() + expected_report.files_sent = 1 + expected_report.bytes_sent = 3 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # ignore checksum but mismatched size => update + options.ignore_checksum = True + node.props['MD5'] = md5 + node.attr['st_size'] = 7 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # stalled remote copy => update + node.attr['st_size'] = 3 + node.attr['st_ctime'] = now - 10000 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) + + # OSErrors on update + client_mock.copy.side_effect = OSError('NodeLocked') + expected_report = TransferReport() + expected_report.files_erred = 1 + assert expected_report == execute(tmp_file.name, + 'vos:service/path', options) diff --git a/vos/vos/commands/vls.py b/vos/vos/commands/vls.py index e2985434..d8fc3e94 100755 --- a/vos/vos/commands/vls.py +++ b/vos/vos/commands/vls.py @@ -145,7 +145,7 @@ def vls(): # segregate files from directories for target in targets: target_node = client.get_node(target) - if target.endswith('/') and not opt.long: + if not opt.long or target.endswith('/'): while target_node.islink(): target_node = client.get_node(target_node.target) if target_node.isdir(): diff --git a/vos/vos/commands/vsync.py b/vos/vos/commands/vsync.py index 6dbfe462..86a4d124 100755 --- a/vos/vos/commands/vsync.py +++ b/vos/vos/commands/vsync.py @@ -3,12 +3,15 @@ import os import sys -from multiprocessing import Process, JoinableQueue from vos.commonparser import CommonParser, set_logging_level_from_args, \ - URI_DESCRIPTION + URI_DESCRIPTION, exit_on_exception import logging import time import signal +import threading +import concurrent.futures +import re + from vos import vos from cadcutils import exceptions as transfer_exceptions from .. import md5_cache @@ -37,13 +40,265 @@ HOME = os.getenv("HOME", "./") +global_md5_cache = None +node_dict = {} + +# placeholder for data local to a thread +thread_local = threading.local() + + +def compute_md5(filename): + """" + Computes the md5 of a file and caches the value for subsequent calls + """ + md5 = None + if global_md5_cache is not None: + md5 = global_md5_cache.get(filename) + if md5 is None or md5[2] < os.stat(filename).st_mtime: + md5 = md5_cache.MD5Cache.compute_md5(filename, + block_size=2**19) + if global_md5_cache is not None: + stat = os.stat(filename) + global_md5_cache.update(filename, md5, stat.st_size, + stat.st_mtime) + else: + md5 = md5[0] + return md5 + + +def get_client(certfile, token): + """ + Returns a VOS client instance for each thread. VOS Client uses requests + session which is not thread safe hence creating one instance of this class + for each thread + :param certfile: + :param token: + :return: vos.Client + """ + if not hasattr(thread_local, "client"): + thread_local.client = vos.Client(vospace_certfile=certfile, + vospace_token=token) + return thread_local.client + + +class TransferReport: + """ + Report of a job. + """ + def __init__(self): + self.bytes_sent = 0 + self.files_sent = 0 + self.bytes_skipped = 0 + self.files_skipped = 0 + self.files_erred = 0 + + def __eq__(self, other): + return (self.bytes_sent == other.bytes_sent) and \ + (self.files_sent == other.files_sent) and \ + (self.bytes_skipped == other.bytes_skipped) and \ + (self.files_skipped == other.files_skipped) and \ + (self.files_erred == other.files_erred) + + +def execute(src, dest, opt): + """ + Transfer a file from source to destination + :param src: local path to file to transfer + :param dest: vospace location + :param opt: command line parameters + :return: TransferReport() + """ + result = TransferReport() + src_md5 = None + stat = os.stat(src) + if not opt.ignore_checksum and not opt.overwrite: + src_md5 = compute_md5(src) + client = get_client(opt.certfile, opt.token) + if not opt.overwrite: + # Check if the file is the same + try: + node_info = None + if opt.cache_nodes: + node_info = global_md5_cache.get(dest) + if node_info is None: + logging.debug('Getting node info from VOSpace') + logging.debug(str(node_dict.keys())) + logging.debug(str(dest)) + node = client.get_node(dest, limit=None) + dest_md5 = node.props.get( + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e') + dest_length = node.attr['st_size'] + dest_time = node.attr['st_ctime'] + if opt.cache_nodes: + global_md5_cache.update( + dest, + dest_md5, + dest_length, + dest_time) + else: + dest_md5 = node_info[0] + dest_length = node_info[1] + dest_time = node_info[2] + logging.debug('Destination MD5: {}'.format( + dest_md5)) + if ((not opt.ignore_checksum and src_md5 == dest_md5) or + (opt.ignore_checksum and + dest_time >= stat.st_mtime and + dest_length == stat.st_size)): + logging.info('skipping: {} matches {}'.format(src, dest)) + result.files_skipped = 1 + result.bytes_skipped = dest_length + return result + except (transfer_exceptions.AlreadyExistsException, + transfer_exceptions.NotFoundException): + pass + logging.info('{} -> {}'.format(src, dest)) + try: + client.copy(src, dest, send_md5=True) + node = client.get_node(dest, limit=None) + dest_md5 = node.props.get( + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e') + dest_length = node.attr['st_size'] + dest_time = node.attr['st_ctime'] + if opt.cache_nodes: + global_md5_cache.update(dest, + dest_md5, + dest_length, + dest_time) + result.files_sent += 1 + result.bytes_sent += stat.st_size + return result + except (IOError, OSError) as exc: + logging.error( + 'Error writing {} to server, skipping'.format(src)) + logging.debug(str(exc)) + if re.search('NodeLocked', str(exc)) is not None: + logging.error( + ('Use vlock to unlock the node before syncing ' + 'to {}').format(dest)) + result.files_erred += 1 + return result + + +def validate(path, include=None, exclude=None): + """ + Determines whether a directory or filename should be included or not + :param path: path to consider + :param include: pattern for names to include + :param exclude: pattern for names to exclude + :return: True if filename is to be included, False otherwise + """ + if re.match(r'^[A-Za-z0-9._\-();:&*$@!+=/]*$', path) is None: + logging.error("filename {} contains illegal characters, " + "skipping".format(path)) + return False + if include is not None and not re.search(include, path): + logging.info("{} not included".format(path)) + return False + if exclude: + for thisIgnore in exclude.split(','): + if not path.find(thisIgnore) < 0: + logging.info("excluding: {}".format(path)) + return False + return True + + +def prepare(src, dest, client): + """ + If src is a directory it creates it otherwise prepares the transfer of file + :param src: name of local file + :param dest: name of location on VOSpace to copy file + :param client: vos client to use for operations on the server (mkdir) + :return: (src, dest) tuple to be sync if required or None otherwise + """ + # strip down current_destination until we find a part that exists + # and then build up the path. + if os.path.islink(src): + logging.error("{} is a link, skipping".format(src)) + return + if not os.access(src, os.R_OK): + logging.error( + "Failed to open file {}, skipping".format(src)) + return + + if os.path.isdir(src): + # make directory but nothing to transfer + try: + client.mkdir(dest) + logging.info("Made directory {}".format(dest)) + except transfer_exceptions.AlreadyExistsException: + # OK, must already have existed, add to list + pass + return + return src, dest + + +def build_file_list(paths, vos_root, recursive=False, include=None, + exclude=None): + """ + Build a list of files that should be copied into VOSpace + :param paths: source paths + :param vos_root: directory container on vospace service to sync to + :param recursive: True if recursive sync, False otherwise + :param include: patterns to include + :param exclude: comma separated strings to exclude when occuring in names + :return: set of expanded (src, dest) pairs + """ + + spinner = ['-', '\\', '|', '/', '-', '\\', '|', '/'] + count = 0 + results = [] # order is important to create the directories first + vos_root = vos_root.strip('/') + for path in paths: + content = False + if path.endswith('/'): + # vsync just the content and not the source dir + content = True + base_path = os.path.abspath(path) + path = path[:-1] + else: + base_path = os.path.dirname(path) + path = os.path.abspath(path) + rel_path = os.path.relpath(path, base_path) + if not os.path.exists(path): + raise ValueError('{} not found'.format(path)) + if os.path.isfile(path): + results.append((path, '{}/{}'.format(vos_root, rel_path))) + continue + elif not content: + results.append((path, '{}/{}'.format(vos_root, rel_path))) + for (root, dirs, filenames) in os.walk(path): + if recursive: + for this_dirname in dirs: + this_dirname = os.path.join(root, this_dirname) + rel_dirname = os.path.relpath(this_dirname, base_path) + if not validate(rel_dirname, include=include, + exclude=exclude): + continue + results.append((this_dirname, '{}/{}'.format( + vos_root, rel_dirname))) + for this_filename in filenames: + srcfilename = os.path.normpath(os.path.join(root, + this_filename)) + rel_name = os.path.relpath(srcfilename, base_path) + if not validate(rel_name, include=include, exclude=exclude): + continue + count += 1 + logging.info("Building list of files to transfer {}\r".format( + spinner[count % len(spinner)])) + results.append((srcfilename, '{}/{}'.format( + vos_root, rel_name))) + if not recursive: + break + # remove duplicates while maintaining the order + return list(dict.fromkeys(results)) + def vsync(): - global_md5_cache = None def signal_handler(h_stream, h_frame): - logging.debug("{} {}".format(h_stream, h_frame)) - logging.critical("Interrupt\n") + logging.debug('{} {}'.format(h_stream, h_frame)) + logging.critical('Interrupt\n') sys.exit(-1) # handle interrupts nicely @@ -84,336 +339,85 @@ def signal_handler(h_stream, h_frame): parser.error("Maximum of 30 streams exceeded") if opt.cache_nodes: + global global_md5_cache global_md5_cache = md5_cache.MD5Cache(cache_db=opt.cache_filename) destination = opt.destination - client = vos.Client( - vospace_certfile=opt.certfile, vospace_token=opt.token) - if not client.is_remote_file(destination): - parser.error("Only allows sync FROM local copy TO VOSpace") - # Currently we don't create nodes in sync and we don't sync onto files - logging.info("Connecting to VOSpace") - logging.info("Confirming Destination is a directory") - dest_is_dir = client.isdir(destination) - - queue = JoinableQueue(maxsize=10 * opt.nstreams) - good_dirs = [] - node_dict = {} - - def compute_md5(this_filename, block_size=None): - """ - Read through a file and compute that files MD5 checksum. - :param this_filename: name of the file on disk - :param block_size: number of bytes to read into memory, - defaults to 2**19 bytes - :return: md5 as a hexadecimal string - """ - block_size = block_size is None and 2 ** 19 or block_size - return md5_cache.MD5Cache.compute_md5(this_filename, - block_size=block_size) - - def file_md5(this_filename): - import os - md5 = None - if global_md5_cache is not None: - md5 = global_md5_cache.get(this_filename) - if md5 is None or md5[2] < os.stat(this_filename).st_mtime: - md5 = compute_md5(this_filename) - if global_md5_cache is not None: - stat = os.stat(this_filename) - global_md5_cache.update(this_filename, md5, stat.st_size, - stat.st_mtime) + try: + client = vos.Client( + vospace_certfile=opt.certfile, vospace_token=opt.token) + if not client.is_remote_file(destination): + parser.error("Only allows sync FROM local copy TO VOSpace") + # Currently we don't create nodes in sync and we don't sync onto files + logging.info("Connecting to VOSpace") + logging.info("Confirming Destination is a directory") + if client.isfile(destination): + if len(opt.files) == 1: + if os.path.isfile(opt.files): + files = [(opt.files, destination)] + else: + raise RuntimeError( + 'Cannot sync directory into a remote file') + else: + raise RuntimeError( + 'Cannot sync multiple sources into a single remote file') else: - md5 = md5[0] - return md5 - - class ThreadCopy(Process): - def __init__(self, this_queue): - super(ThreadCopy, self).__init__() - self.client = vos.Client( - vospace_certfile=opt.certfile, - vospace_token=opt.token) - self.queue = this_queue - self.filesSent = 0 - self.filesSkipped = 0 - self.bytesSent = 0 - self.bytesSkipped = 0 - self.filesErrored = 0 - - def run(self): - while True: - (current_source, current_destination) = self.queue.get() - requeue = (current_source, current_destination) - src_md5 = None - stat = os.stat(current_source) - if not opt.ignore_checksum and not opt.overwrite: - src_md5 = file_md5(current_source) - if not opt.overwrite: - # Check if the file is the same - try: - node_info = None - if opt.cache_nodes: - node_info = global_md5_cache.get( - current_destination) - if node_info is None: - logging.debug("Getting node info from VOSpace") - logging.debug(str(node_dict.keys())) - logging.debug(str(current_destination)) - node = self.client.get_node(current_destination, - limit=None) - current_destination_md5 = node.props.get( - 'MD5', 'd41d8cd98f00b204e9800998ecf8427e') - current_destination_length = node.attr['st_size'] - current_destination_time = node.attr['st_ctime'] - if opt.cache_nodes: - global_md5_cache.update( - current_destination, - current_destination_md5, - current_destination_length, - current_destination_time) - else: - current_destination_md5 = node_info[0] - current_destination_length = node_info[1] - current_destination_time = node_info[2] - logging.debug("Destination MD5: {}".format( - current_destination_md5)) - if ((not opt.ignore_checksum and src_md5 == - current_destination_md5) or - (opt.ignore_checksum and - current_destination_time >= stat.st_mtime and - current_destination_length == stat.st_size)): - logging.info("skipping: %s matches %s" % ( - current_source, current_destination)) - self.filesSkipped += 1 - self.bytesSkipped += current_destination_length - self.queue.task_done() - continue - except (transfer_exceptions.AlreadyExistsException, - transfer_exceptions.NotFoundException): - pass - logging.info( - "%s -> %s" % (current_source, current_destination)) - try: - self.client.copy(current_source, current_destination, - send_md5=True) - node = self.client.get_node(current_destination, - limit=None) - current_destination_md5 = node.props.get( - 'MD5', 'd41d8cd98f00b204e9800998ecf8427e') - current_destination_length = node.attr['st_size'] - current_destination_time = node.attr['st_ctime'] - if opt.cache_nodes: - global_md5_cache.update(current_destination, - current_destination_md5, - current_destination_length, - current_destination_time) - self.filesSent += 1 - self.bytesSent += stat.st_size - except (IOError, OSError) as exc: - logging.error( - "Error writing {} to server, skipping".format( - current_source)) - logging.error(str(exc)) - import re - if re.search('NodeLocked', str(exc)) is not None: - logging.error( - ("Use vlock to unlock the node before syncing " - "to {}").format(current_destination)) - try: - if exc.errno == 104: - self.queue.put(requeue) - except Exception as e2: - logging.error("Error during requeue") - logging.error(str(e2)) - pass - self.filesErrored += 1 - pass - self.queue.task_done() - - def mkdirs(directory): - """Recursively make all nodes in the path to directory. - - :param directory: str, vospace location of ContainerNode (directory) - to make - :return: - """ - - logging.debug("%s %s" % (directory, str(good_dirs))) - # if we've seen this before skip it. - if directory in good_dirs: - return - - # try and make a new directory and return - # failure indicates we should see if subdirectories exist - try: - client.mkdir(directory) - logging.info("Made directory {}".format(directory)) - good_dirs.append(directory) - return - except transfer_exceptions.AlreadyExistsException: - pass - - # OK, must already have existed, add to list - good_dirs.append(directory) - - return - - def copy(current_source, current_destination): - """ - Copy current_source from local file system to current_destination. - - :param current_source: name of local file - :param current_destination: name of localtion on VOSpace to copy file - to (includes filename part) - :return: None - """ - # strip down current_destination until we find a part that exists - # and then build up the path. - if os.path.islink(current_source): - logging.error("{} is a link, skipping".format(current_source)) - return - if not os.access(current_source, os.R_OK): - logging.error( - "Failed to open file {}, skipping".format(current_source)) - return - import re - if re.match(r'^[A-Za-z0-9._\-();:&*$@!+=/]*$', current_source) is None: - logging.error( - "filename %s contains illegal characters, skipping" % - current_source) - return - - dirname = os.path.dirname(current_destination) - mkdirs(dirname) - if opt.include is not None and not re.search(opt.include, - current_source): - return - queue.put((current_source, current_destination), timeout=3600) - - def start_streams(no_streams): - list_of_streams = [] - for i in range(no_streams): - logging.info("Launching VOSpace connection stream %d" % i) - t = ThreadCopy(queue) - t.daemon = True - t.start() - list_of_streams.append(t) - return list_of_streams - - def build_file_list(base_path, destination_root='', recursive=False, - ignore=None): - """Build a list of files that should be copied into VOSpace""" - - spinner = ['-', '\\', '|', '/', '-', '\\', '|', '/'] - count = 0 - - for (root, dirs, filenames) in os.walk(base_path): - for this_dirname in dirs: - if not recursive: - continue - this_dirname = os.path.join(root, this_dirname) - skip = False - if ignore is not None: - for thisIgnore in ignore.split(','): - if not this_dirname.find(thisIgnore) < 0: - logging.info("excluding: %s " % this_dirname) - skip = True - continue - if skip: - continue - cprefix = os.path.commonprefix((base_path, this_dirname)) - this_dirname = os.path.normpath( - destination_root + "/" + this_dirname[len(cprefix):]) - mkdirs(this_dirname) - for thisfilename in filenames: - srcfilename = os.path.normpath( - os.path.join(root, thisfilename)) - skip = False - if ignore is not None: - for thisIgnore in ignore.split(','): - if not srcfilename.find(thisIgnore) < 0: - logging.info("excluding: %s " % srcfilename) - skip = True - continue - if skip: - continue - cprefix = os.path.commonprefix((base_path, srcfilename)) - destfilename = os.path.normpath( - destination_root + "/" + srcfilename[len(cprefix):]) - this_dirname = os.path.dirname(destfilename) - mkdirs(this_dirname) - - count += 1 - if opt.verbose: - sys.stderr.write( - "Building list of files to transfer %s\r" % ( - spinner[count % len(spinner)])) - copy(srcfilename, destfilename) - if not recursive: - return - return - - streams = start_streams(opt.nstreams) - - # build a complete file list given all the things on the command line - for filename in opt.files: - filename = os.path.abspath(filename) - this_root = destination - if os.path.isdir(filename): - if filename[-1] != "/": - if os.path.basename(filename) != os.path.basename(destination): - this_root = os.path.join(destination, - os.path.basename(filename)) - mkdirs(this_root) - node_dict[this_root] = client.get_node(this_root, limit=None) - try: - build_file_list(filename, destination_root=this_root, - recursive=opt.recursive, ignore=opt.exclude) - except Exception as e: - logging.error(str(e)) - logging.error("ignoring error") - elif os.path.isfile(filename): - if dest_is_dir: - this_root = os.path.join(destination, - os.path.basename(filename)) - copy(filename, this_root) - else: - logging.error("%s: No such file or directory." % filename) - - logging.info( - ("Waiting for transfers to complete " - r"******** CTRL-\ to interrupt ********")) - - queue.join() - end_time = time.time() - bytes_sent = 0 - files_sent = 0 - bytes_skipped = 0 - files_skipped = 0 - files_erred = 0 - for stream in streams: - bytes_sent += stream.bytesSent - bytes_skipped += stream.bytesSkipped - files_sent += stream.filesSent - files_skipped += stream.filesSkipped - files_erred += stream.filesErrored - - logging.info("==== TRANSFER REPORT ====") - - if bytes_sent > 0: - rate = bytes_sent / (end_time - start_time) / 1024.0 - logging.info("Sent %d files (%8.1f kbytes @ %8.3f kBytes/s)" % ( - files_sent, bytes_sent / 1024.0, rate)) - speed_up = (bytes_skipped + bytes_sent) / bytes_sent - logging.info( - "Speedup: %f (skipped %d files)" % (speed_up, files_skipped)) - - if bytes_sent == 0: - logging.info("No files needed sending ") + files = build_file_list(paths=opt.files, + vos_root=destination, + recursive=opt.recursive, + include=opt.include, + exclude=opt.exclude) + + # build the list of transfers + transfers = [] + for src_path, vos_dest in files: + transfer = prepare(src_path, vos_dest, client) + if transfer: + transfers.append(transfer) + + # main execution loop + futures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=opt.nstreams) \ + as executor: + for file_src, vos_dest in transfers: + futures.append(executor.submit( + execute, file_src, vos_dest, opt)) - if files_erred > 0: logging.info( - "Error transferring %d files, please try again" % files_erred) + ("Waiting for transfers to complete " + r"******** CTRL-\ to interrupt ********")) + + end_time = time.time() + end_result = TransferReport() + for r in concurrent.futures.as_completed(futures): + res = r.result() + end_result.bytes_sent += res.bytes_sent + end_result.bytes_skipped += res.bytes_skipped + end_result.files_sent += res.files_sent + end_result.files_skipped += res.files_skipped + end_result.files_erred += res.files_erred + + logging.info("==== TRANSFER REPORT ====") + + if end_result.bytes_sent > 0: + rate = end_result.bytes_sent / (end_time - start_time) / 1024.0 + logging.info("Sent {} files ({} kbytes @ {} kBytes/s)".format( + end_result.files_sent, + round(end_result.bytes_sent / 1024.0, 2), + round(rate, 2))) + speed_up = (end_result.bytes_skipped + end_result.bytes_sent) / \ + end_result.bytes_sent + logging.info("Speedup: {} (skipped {} files)".format( + speed_up, end_result.files_skipped)) + if end_result.bytes_sent == 0: + logging.info("No files needed sending ") + + if end_result.files_erred > 0: + logging.info( + "Error transferring {} files, please try again".format( + end_result.files_erred)) + except Exception as ex: + exit_on_exception(ex) vsync.__doc__ = DESCRIPTION diff --git a/vos/vos/tests/test_vos.py b/vos/vos/tests/test_vos.py index e3796bba..97938fa4 100644 --- a/vos/vos/tests/test_vos.py +++ b/vos/vos/tests/test_vos.py @@ -790,43 +790,13 @@ def test_mkdir(self): def test_success_failure_case(self): with pytest.raises(OSError): client = Client() - ignore = client.status('vos:test/node.fits', code='abc') + client.status('vos:test/node.fits', code='abc') class TestNode(unittest.TestCase): """Test the vos Node class. """ - def test_compute_md5(self): - pass - # from vos import vos - # mock_file = MagicMock(spec=file, wraps=StringIO('a')) - # foo = mock_file.open() - # self.assertEqual(foo.read, 'a') - # # hash = vos.compute_md5('/dev/null') - # # self.assertEqual(hash, '098f6bcd4621d373cade4e832627b4f6') - - # import mock - # my_mock = mock.MagicMock() - # with mock.patch('__builtin__.open', my_mock): - # manager = my_mock.return_value.__enter__.return_value - # manager.read.return_value = 'some data' - # with open('foo') as h: - # data = h.read() - # print data - - # with mock.patch('__builtin__.open') as my_mock: - # my_mock.return_value.__enter__ = lambda s: s - # my_mock.return_value.__exit__ = mock.Mock() - # my_mock.return_value.read.return_value = 'some data' - # with open('foo') as h: - # data = h.read() - - # mocked_open = unittest.mock.mock_open(read_data='foo') - # with unittest.mock.patch('vos.open', mocked_open, create=True): - # hash = vos.compute_md5('foo') - # self.assertEqual(hash, '098f6bcd4621d373cade4e832627b4f6') - def test_node_eq(self): # None node raises LoookupError with self.assertRaises(LookupError): diff --git a/vos/vos/vos.py b/vos/vos/vos.py index 1d4d4ba4..a6d80775 100644 --- a/vos/vos/vos.py +++ b/vos/vos/vos.py @@ -796,7 +796,7 @@ def get_children(self, client, sort, order, limit=500): current_index = 1 else: run = False - with client.nodeCache.watch(yield_node.uri) as childWatch: + with nodeCache.watch(yield_node.uri) as childWatch: childWatch.insert(yield_node) yield yield_node @@ -1353,6 +1353,9 @@ def set_auth(self, vospace_certfile=None, vospace_token=None): resource_id=self.resource_id) +nodeCache = NodeCache() + + class Client(object): """The Client object does the work""" @@ -1426,7 +1429,7 @@ def __init__(self, vospace_certfile=None, self.protocols = Client.VO_TRANSFER_PROTOCOLS self.rootNode = root_node - self.nodeCache = NodeCache() + # self.nodeCache = NodeCache() self.transfer_shortcut = transfer_shortcut self.secure_get = secure_get self._endpoints = {} @@ -1434,7 +1437,6 @@ def __init__(self, vospace_certfile=None, Client.VOSPACE_CERTFILE or vospace_certfile self.vospace_token = vospace_token self.insecure = insecure - return def glob(self, pathname): """Return a list of paths matching a pathname pattern. @@ -2052,11 +2054,11 @@ def get_node(self, uri, limit=0, force=False): logger.debug("Getting node {0}".format(uri)) uri = self.fix_uri(uri) node = None - if not force and uri in self.nodeCache: - node = self.nodeCache[uri] + if not force and uri in nodeCache: + node = nodeCache[uri] if node is None: logger.debug("Getting node {0} from ws".format(uri)) - with self.nodeCache.watch(uri) as watch: + with nodeCache.watch(uri) as watch: # If this is vospace URI then we can request the node info # using the uri directly, but if this a URL then the metadata # comes from the HTTP header. @@ -2110,7 +2112,7 @@ def get_node(self, uri, limit=0, force=False): next_page.node_list.pop(0) node.node_list.extend(next_page.node_list) for childNode in node.node_list: - with self.nodeCache.watch(childNode.uri) as childWatch: + with nodeCache.watch(childNode.uri) as childWatch: childWatch.insert(childNode) return node @@ -2319,7 +2321,7 @@ def link(self, src_uri, link_uri): if self.isdir(link_uri): link_uri = os.path.join(link_uri, os.path.basename(src_uri)) - with self.nodeCache.volatile(src_uri), self.nodeCache.volatile( + with nodeCache.volatile(src_uri), nodeCache.volatile( link_uri): link_node = Node(link_uri, node_type="vos:LinkNode") ElementTree.SubElement(link_node.node, "target").text = src_uri @@ -2344,7 +2346,7 @@ def move(self, src_uri, destination_uri): """ src_uri = self.fix_uri(src_uri) destination_uri = self.fix_uri(destination_uri) - with self.nodeCache.volatile(src_uri), self.nodeCache.volatile( + with nodeCache.volatile(src_uri), nodeCache.volatile( destination_uri): job_url = self.transfer(self.get_endpoints(src_uri).async_transfer, src_uri, destination_uri, view='move') @@ -2357,12 +2359,12 @@ def move(self, src_uri, destination_uri): return self.get_transfer_error(job_url, src_uri) def _get(self, uri, view="defaultview", cutout=None): - with self.nodeCache.volatile(uri): + with nodeCache.volatile(uri): return self.transfer(self.get_endpoints(uri).transfer, uri, "pullFromVoSpace", view, cutout) def _put(self, uri, content_length=None, md5_checksum=None): - with self.nodeCache.volatile(uri): + with nodeCache.volatile(uri): return self.transfer(self.get_endpoints(uri).transfer, uri, "pushToVoSpace", view="defaultview", content_length=content_length, @@ -2673,7 +2675,7 @@ def delete(self, uri): """ uri = self.fix_uri(uri) logger.debug("delete {0}".format(uri)) - with self.nodeCache.volatile(uri): + with nodeCache.volatile(uri): url = self.get_node_url(uri, method='GET') response = self.get_session(uri).delete(url) response.raise_for_status()