diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..cbc9015 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,101 @@ +version: 2 + + +defaults: &defaults + machine: + image: circleci/classic:latest + working_directory: ~/autouri + + +machine_defaults: &machine_defaults + machine: + image: ubuntu-1604:201903-01 + working_directory: ~/autouri + + +install_python3: &install_python3 + name: Install python3, pip3 + command: | + sudo apt-get update && sudo apt-get install software-properties-common git wget curl -y + sudo add-apt-repository ppa:deadsnakes/ppa -y + sudo apt-get update && sudo apt-get install python3.6 -y + sudo wget https://bootstrap.pypa.io/get-pip.py + sudo python3.6 get-pip.py + sudo ln -s /usr/bin/python3.6 /usr/local/bin/python3 + + +install_py3_packages: &install_py3_packages + name: Install Python packages + command: | + sudo pip3 install pytest requests dateparser filelock + sudo pip3 install --upgrade pyasn1-modules + + +install_gcs_lib: &install_gcs_lib + name: Install Google Cloud SDK (gcloud and gsutil) and Python API (google-cloud-storage) + command: | + echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list + curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - + sudo apt-get update && sudo apt-get install google-cloud-sdk -y + sudo pip3 install google-cloud-storage + + +install_aws_lib: &install_aws_lib + name: Install AWS Python API (boto3) and CLI (awscli) + command: | + sudo pip3 install boto3 awscli + + +make_root_only_dir: &make_root_only_dir + name: Create a directory accessible by root only (to test permission-denied cases) + command: | + sudo mkdir /test-permission-denied + sudo chmod -w /test-permission-denied + + +jobs: + pytest: + <<: *machine_defaults + steps: + - checkout + - run: *install_python3 + - run: *install_py3_packages + - run: *install_gcs_lib + - run: *install_aws_lib + - run: *make_root_only_dir + - run: + no_output_timeout: 60m + command: | + cd tests/ + # sign in + echo ${GCLOUD_SERVICE_ACCOUNT_SECRET_JSON} > tmp_key.json + gcloud auth activate-service-account --project=${GOOGLE_PROJECT_ID} --key-file=tmp_key.json + gcloud config set project ${GOOGLE_PROJECT_ID} + export GOOGLE_APPLICATION_CREDENTIALS="${PWD}/tmp_key.json" + aws configure set aws_access_key_id "${AWS_ACCESS_KEY_ID}" + aws configure set aws_secret_access_key "${AWS_SECRET_ACCESS_KEY}" + + # run pytest + pytest --ci-prefix ${CIRCLE_WORKFLOW_ID} \ + --gcp-private-key-file tmp_key.json \ + --s3-root ${S3_ROOT} \ + --gcs-root ${GCS_ROOT} \ + --gcs-root-url ${GCS_ROOT_URL} + + # to use gsutil + export BOTO_CONFIG=/dev/null + + # clean up + rm -f tmp_key.json + gsutil -m rm -rf ${S3_ROOT}/${CIRCLE_WORKFLOW_ID} + gsutil -m rm -rf ${GCS_ROOT}/${CIRCLE_WORKFLOW_ID} + gsutil -m rm -rf ${GCS_ROOT_URL}/${CIRCLE_WORKFLOW_ID} + + +# Define workflow here +workflows: + version: 2 + build_workflow: + jobs: + - pytest + diff --git a/README.md b/README.md index c0d67df..c066ef2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ +[![CircleCI](https://circleci.com/gh/ENCODE-DCC/autouri.svg?style=svg)](https://circleci.com/gh/ENCODE-DCC/autouri) + +> **IMPORTANT**: If you use `--use-gsutil-for-s3` or `GCSURI.USE_GSUTIL_FOR_S3` then you need to update your `gsutil`. This flag allows a direct transfer between `gs://` and `s3://`. This requires `gsutil` >= 4.47. See this [issue](https://github.com/GoogleCloudPlatform/gsutil/issues/935) for details. +```bash +$ pip install gsutil --upgrade +``` + # Autouri ## Introduction @@ -110,9 +117,9 @@ optional arguments: ## Requirements - Python >= 3.6 - - Packages: `requests` and `filelock` + - Packages: `requests`, `dateparser` and `filelock` ```bash - $ pip3 install requests filelock + $ pip3 install requests dateparser filelock ``` - Install [Google Cloud SDK](https://cloud.google.com/sdk/docs/quickstarts) to get CLIs (`gcloud` and `gsutil`). @@ -130,34 +137,42 @@ optional arguments: ## Authentication -GCS: Use `gcloud` CLI. You will be asked to enter credential information of your Google account or redirected to authenticate on a web browser. -``` -$ gcloud init -``` +- GCS: Use `gcloud` CLI. + - Using end-user credentials: You will be asked to enter credentials of your Google account. + ``` + $ gcloud init + ``` + - Using service account credentials: If you use a service account and a JSON key file associated with it. + ``` + $ gcloud auth activate-service-account --key-file=[YOUR_JSON_KEY.json] + $ GOOGLE_APPLICATION_CREDENTIALS="PATH/FOR/YOUR_JSON_KEY.json" + ``` + Then set your default project. + ``` + $ gcloud config set project [YOUR_GCP_PROJECT_ID] + ``` -S3: Use `aws` CLI. You will be asked to enter credential information of your AWS account. -``` -$ aws configure -``` +- S3: Use `aws` CLI. You will be asked to enter credentials of your AWS account. + ``` + $ aws configure + ``` + +- URL: Use `~/.netrc` file to get access to private URLs. Example `.netrc` file. You can define credential per site. + ``` + machine www.encodeproject.org + login XXXXXXXX + password abcdefghijklmnop + ``` -URL: Use `~/.netrc` file to get access to private URLs. Example `.netrc` file. You can define credential per site. -``` -machine www.encodeproject.org -login XXXXXXXX -password abcdefghijklmnop -``` ## Using `gsutil` for direct trasnfer between GCS and S3 Autouri can use `gsutil` CLI for a direct file transfer between S3 and GCS. Define `--use-gsutil-for-s3` in command line arguments or use `GCSURI.init_gcsuri(use_gsutil_for_s3=True)` in Python. Otherwise, file transfer between GCS and S3 will be streamed through your local machine. -`gsutil` must be configured correctly to obtain AWS credentials. -``` -$ aws configure # make sure that you already authenticated for AWS -$ gsutil config # write auth info on ~/.boto -``` +`gsutil` will take AWS credentials from `~/.aws/credentials` file, which is already generated in [Authentication](#authentication). + -## GCS/S3 bucket policies +## GCS/S3 bucket configuration Autouri best works with default bucket configuration for both cloud storages. @@ -169,6 +184,3 @@ S3 (`s3://bucket-name`) - Object versioning must be turned off. -## Known issues - -Race condition is tested with multiple threads trying to write on the same file. File locking mechanism is based on [filelock](https://github.com/benediktschmitt/py-filelock). Such file locking is stable on local/GCS files but rather unstable on S3 (tested with 5 threads). diff --git a/autouri/__init__.py b/autouri/__init__.py index fcb40a9..2cf401a 100644 --- a/autouri/__init__.py +++ b/autouri/__init__.py @@ -10,7 +10,7 @@ from .gcsuri import GCSURI -__version__ = '0.1.0' +__version__ = '0.1.1' def parse_args(): @@ -71,10 +71,11 @@ def parse_args(): p_loc = subparser.add_parser( 'loc', - help='type(target_dir).localize(src): Localize source on target directory (class)', + help='AutoURI(src).localize_on(target): Localize source on target directory ' + 'Target directory must end with directory separator', parents=[parent_src, parent_target, parent_cp]) p_loc.add_argument('--recursive', action='store_true', - help='Recursively localize source into target class.') + help='Recursively localize source into target directory.') p_presign = subparser.add_parser( 'presign', @@ -119,27 +120,21 @@ def main(): elif args.action == 'cp': u_src = AutoURI(src) - sep = AutoURI(target).__class__.get_path_sep() - if target.endswith(sep): - type_ = 'dir' - target = sep.join([target.rstrip(sep), u_src.basename]) - print(target) - else: - type_ = 'file' - _, flag = u_src.cp(target, make_md5_file=args.make_md5_file) + _, flag = u_src.cp( + target, make_md5_file=args.make_md5_file, return_flag=True) if flag == 0: - logger.info('Copying from file {s} to {type} {t} done'.format( - s=src, type=type_, t=target)) + logger.info('Copying from file {s} to {t} done'.format( + s=src, t=target)) elif flag: if flag == 1: reason = 'skipped due to md5 hash match' elif flag == 2: - reason = 'skipped due to filename/size/mtime match' + reason = 'skipped due to filename/size match and mtime test' else: raise NotImplementedError - logger.info('Copying from file {s} to {type} {t} {reason}'.format( - s=src, type=type_, t=target, reason=reason)) + logger.info('Copying from file {s} to {t} {reason}'.format( + s=src, t=target, reason=reason)) elif args.action == 'read': s = AutoURI(src).read() @@ -157,11 +152,10 @@ def main(): logger.info('Deleted {s}'.format(s=src)) elif args.action == 'loc': - _, localized = AutoURI(target).__class__.localize( - src, + _, localized = AutoURI(src).localize_on( + target, recursive=args.recursive, - make_md5_file=args.make_md5_file, - loc_prefix=target) + make_md5_file=args.make_md5_file) if localized: logger.info('Localized {s} on {t}'.format(s=src, t=target)) else: diff --git a/autouri/abspath.py b/autouri/abspath.py index 2bfb786..5051a6a 100644 --- a/autouri/abspath.py +++ b/autouri/abspath.py @@ -1,8 +1,10 @@ import hashlib +import errno import os import shutil from filelock import SoftFileLock from typing import Dict, Optional, Union +from shutil import copyfile, SameFileError from .autouri import URIBase, AutoURI, logger from .metadata import URIMetadata, get_seconds_from_epoch @@ -26,7 +28,8 @@ class AbsPath(URIBase): _PATH_SEP = os.sep def __init__(self, uri, thread_id=-1): - uri = os.path.expanduser(uri) + if isinstance(uri, str): + uri = os.path.expanduser(uri) super().__init__(uri, thread_id=thread_id) @property @@ -99,15 +102,34 @@ def _cp(self, dest_uri): if isinstance(dest_uri, AbsPath): dest_uri.mkdir_dirname() - shutil.copyfile(self._uri, dest_uri._uri, follow_symlinks=True) + try: + copyfile(self._uri, dest_uri._uri, follow_symlinks=True) + except SameFileError as e: + logger.debug( + 'cp: ignored SameFileError. src={src}, dest={dest}'.format( + src=self._uri, + dest=dest_uri._uri)) + if os.path.islink(dest_uri._uri): + dest_uri._rm() + copyfile(self._uri, dest_uri._uri, follow_symlinks=True) + return True return False def _cp_from(self, src_uri): return False - def get_mapped_url(self) -> Optional[str]: - for k, v in AbsPath.MAP_PATH_TO_URL.items(): + def get_mapped_url(self, map_path_to_url=None) -> Optional[str]: + """ + Args: + map_path_to_url: + dict with k, v where k is a path prefix and v is a URL prefix + k will be replaced with v. + If not given, defaults to use class constant AbsPath.MAP_PATH_TO_URL + """ + if map_path_to_url is None: + map_path_to_url = AbsPath.MAP_PATH_TO_URL + for k, v in map_path_to_url.items(): if k and self._uri.startswith(k): return self._uri.replace(k, v, 1) return None @@ -122,6 +144,30 @@ def mkdir_dirname(self): d=self.dirname)) return + def soft_link(self, target, force=False): + """Make a soft link of self on target absolute path. + If target already exists delete it and create a link. + + Args: + target: + Target file's absolute path or URI object. + force: + Delete target file (or link) if it exists + """ + target = AbsPath(target) + if not target.is_valid: + raise ValueError('Target path is not a valid abs path: {t}.'.format( + t=target.uri)) + try: + target.mkdir_dirname() + os.symlink(self._uri, target._uri) + except OSError as e: + if e.errno == errno.EEXIST and force: + target.rm() + os.symlink(self._uri, target._uri) + else: + raise e + def __calc_md5sum(self): """Expensive md5 calculation """ @@ -131,6 +177,15 @@ def __calc_md5sum(self): hash_md5.update(chunk) return hash_md5.hexdigest() + @staticmethod + def get_abspath_if_exists(path): + if isinstance(path, URIBase): + path = path._uri + if isinstance(path, str): + if os.path.exists(os.path.expanduser(path)): + return os.path.abspath(os.path.expanduser(path)) + return path + @staticmethod def init_abspath( loc_prefix: Optional[str]=None, diff --git a/autouri/autouri.py b/autouri/autouri.py index 3270a0b..ddd266a 100644 --- a/autouri/autouri.py +++ b/autouri/autouri.py @@ -10,7 +10,7 @@ from .metadata import URIMetadata -logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') +logging.basicConfig(level=logging.INFO, format='%(asctime)s|%(name)s|%(levelname)s| %(message)s') logger = logging.getLogger('autouri') logger_filelock().setLevel(logging.CRITICAL) @@ -188,6 +188,7 @@ def md5(self) -> str: @property def md5_from_file(self) -> str: """Get md5 from a md5 file (.md5) if it exists. + Check md5 file is newer than the file that it is associated with """ u_md5 = self.md5_file_uri if u_md5.is_valid: @@ -195,13 +196,18 @@ def md5_from_file(self) -> str: m_md5 = u_md5.get_metadata(skip_md5=True) if m_md5.exists: self_mtime = self.mtime + logger.debug('md5 file exists. mt={mt}, md5_mt={md5_mt}, uri={u}'.format( + mt=self_mtime, + md5_mt=m_md5.mtime, + u=self._uri)) if m_md5.mtime is not None and self_mtime is not None \ and m_md5.mtime >= self_mtime: return u_md5.read() except Exception as e: pass - logger.debug('Failed to get md5 hash from md5 file.') + logger.debug('Failed to get md5 hash from md5 file. uri={u}'.format( + u=self._uri)) return None @property @@ -210,14 +216,18 @@ def md5_file_uri(self) -> 'AutoURI': """ return AutoURI(str(self._uri) + AutoURI.MD5_FILE_EXT) - def cp(self, dest_uri: Union[str, 'AutoURI'], no_lock=False, no_checksum=False, make_md5_file=False) -> 'AutoURI': + def cp(self, dest_uri: Union[str, 'AutoURI'], + no_lock=False, no_checksum=False, make_md5_file=False, + return_flag=False) -> 'AutoURI': """Makes a copy on destination. It is protected by a locking mechanism. Check md5 hash, file name/size and last modified date if possible to prevent unnecessary re-uploading. Args: dest_uri: - Target URI + Target URI. + If it's an explicit directory with slash (or os.sep) then + make it a file URI by suffixing self.basename. no_lock: Do not use a locking mechanism no_checksum: @@ -231,7 +241,9 @@ def cp(self, dest_uri: Union[str, 'AutoURI'], no_lock=False, no_checksum=False, hash then md5 file will not be created. Returns: - Tuple of (copy on destination, rc) + Tuple of (s, rc) + s: + URI string of copy on destination rc: 0: made a copy @@ -241,18 +253,37 @@ def cp(self, dest_uri: Union[str, 'AutoURI'], no_lock=False, no_checksum=False, md5 not found but matched file size and mtime is not newer """ d = AutoURI(dest_uri) + sep = d.__class__.get_path_sep() + if d._uri.endswith(sep): + print(d._uri.rstrip(sep), sep, self.basename) + d = AutoURI(sep.join([d._uri.rstrip(sep), self.basename])) with d.get_lock(no_lock=no_lock) as lock: if not no_checksum: # checksum (by md5, size, mdate) m_dest = d.get_metadata(make_md5_file=make_md5_file) + logger.debug( + 'cp: dest metadata={m}, dest={dest}'.format( + m=m_dest, + dest=d.uri)) + if m_dest.exists: m_src = self.get_metadata() + logger.debug( + 'cp: src metadata={m}, src={src}'.format( + m=m_src, + src=self._uri)) md5_matched = m_src.md5 is not None and m_dest.md5 is not None and \ m_src.md5 == m_dest.md5 if md5_matched: - return d, 1 + logger.info( + 'cp: skipped due to md5_match, ' + 'md5={md5}, src={src}, dest={dest}'.format( + md5=m_src.md5, + src=self._uri, + dest=d.uri)) + return (d._uri, 1) if return_flag else d._uri name_matched = self.basename == d.basename size_matched = m_src.size is not None and m_dest.size is not None and \ @@ -260,14 +291,26 @@ def cp(self, dest_uri: Union[str, 'AutoURI'], no_lock=False, no_checksum=False, src_is_not_newer = m_src.mtime is not None and m_dest.mtime is not None and \ m_src.mtime <= m_dest.mtime if name_matched and size_matched and src_is_not_newer: - return d, 2 + logger.info( + 'cp: skipped due to name_size_match, ' + 'size={sz}, mt={mt}, src={src}, dest={dest}'.format( + sz=m_src.size, + mt=m_src.mtime, + src=self._uri, + dest=d.uri)) + return (d._uri, 2) if return_flag else d._uri if not self._cp(dest_uri=d): if not d._cp_from(src_uri=self): raise Exception( 'cp failed. src: {s} dest: {d}'.format( s=str(self), d=str(d))) - return d, 0 + + logger.info( + 'cp: copied, src={src}, dest={dest}'.format( + src=self._uri, + dest=d.uri)) + return (d._uri, 0) if return_flag else d._uri def write(self, s, no_lock=False): """Write string/bytes to file. It is protected by a locking mechanism. @@ -293,6 +336,14 @@ def get_lock(self, no_lock=False, timeout=None, poll_interval=None) -> BaseFileL else: return self._get_lock(timeout=timeout, poll_interval=poll_interval) + def localize_on(self, loc_prefix, recursive=False, make_md5_file=False, depth=0) -> Tuple[str, bool]: + """Wrapper for classmethod localize(). + Localizes self on target directory loc_prefix. + """ + return AutoURI.localize( + src_uri=self, recursive=recursive, make_md5_file=make_md5_file, + loc_prefix=loc_prefix, return_flag=False, depth=depth) + @abstractmethod def _get_lock(self, timeout=None, poll_interval=None) -> BaseFileLock: """Locking mechanism with "with" context. @@ -390,7 +441,8 @@ def get_loc_prefix(cls) -> str: return cls.LOC_PREFIX.rstrip(cls.get_path_sep()) @classmethod - def localize(cls, src_uri, recursive=False, make_md5_file=False, loc_prefix=None, depth=0) -> Tuple[str, bool]: + def localize(cls, src_uri, recursive=False, make_md5_file=False, loc_prefix=None, + return_flag=False, depth=0) -> Tuple[str, bool]: """Localize a source URI on this URI class (cls). Recursive localization is supported for the following file extensions: @@ -413,29 +465,37 @@ def localize(cls, src_uri, recursive=False, make_md5_file=False, loc_prefix=None assuming that you have write permission on target's directory and its subdirectories recursively. loc_prefix: - If defined, use it instead of cls.get_loc_prefix() + If defined, use it instead of cls.get_loc_prefix() + return_flag: + Returns a tuple of (localized uri string, flag) + instead of localized uri string only + See "Returns" section for details about flag depth: To count recursion depth. Returns: loc_uri: Localized URI STRING (not a AutoURI instance) since it should be used for external function as a callback function. - localized: - Whether file is ACTUALLY localized on this cls's storage. - ACTUALLY means making a (possibly modified) copy of the original file - on this cls' storage (on loc_prefix). - This flag includes the following two cases: - modified: - file contents are modified during recursive localization - so localized file is suffixed with - source's storage type. e.g. .s3, .gcs, and .local - but not modified: - file contents are not modified so localized file is not suffixed - and hence will keep the original file basename + flag: (only if return_flag is on) + Whether file is modified or localized on a different storage. + "modified" means: + file contents are modified during recursive localization + so localized file is suffixed with + source's storage type. e.g. .s3, .gcs, and .local + not modified: + "localized on a different storage" means: + file contents are NOT modified so localized file is not suffixed + and hence will keep the original file basename + but localiziation actually happened + Otherwise, this flag will be False, which means that localization didn't + happen because file's contents didn't change + and it exists on the same storage so there is no need for localization. + In this case, loc_uri will be identical to self._uri. """ src_uri = AutoURI(src_uri) if not src_uri.is_valid: - return src_uri._uri, False + return (src_uri._uri, False) if return_flag else src_uri._uri + if depth >= AutoURI.LOC_RECURSION_DEPTH_LIMIT: raise AutoURIRecursionError( 'Maximum recursion depth {m} exceeded. ' @@ -462,7 +522,7 @@ def localize(cls, src_uri, recursive=False, make_md5_file=False, loc_prefix=None # use cls.localize() itself as a callback fnc in recursion fnc_loc = lambda x: cls.localize( x, recursive=recursive, make_md5_file=make_md5_file, loc_prefix=loc_prefix, - depth=depth + 1) + return_flag=True, depth=depth + 1) for ext, fnc_recurse in AutoURI.LOC_RECURSE_EXT_AND_FNC.items(): if src_uri.ext == ext: # read source contents for recursive localization @@ -486,7 +546,7 @@ def localize(cls, src_uri, recursive=False, make_md5_file=False, loc_prefix=None else: loc_uri = src_uri._uri - return loc_uri, modified or on_different_storage + return (loc_uri, modified or on_different_storage) if return_flag else loc_uri @staticmethod def init_uribase( @@ -534,22 +594,26 @@ def __init__(self, uri, thread_id=-1): return def _get_lock(self, timeout=None, poll_interval=None): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def get_metadata(self, skip_md5=False, make_md5_file=False): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def read(self, byte=False): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def _write(self, s): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def _rm(self): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def _cp(self, dest_uri): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() def _cp_from(self, src_uri): - raise NotImplementedError('Not a valid URI?. {f}'.format(f=self._uri)) + self.__raise_value_error() + + def __raise_value_error(self): + raise ValueError('Not a valid URI?. {f}'.format(f=self._uri)) + diff --git a/autouri/gcsuri.py b/autouri/gcsuri.py index 39aa43d..240d1db 100644 --- a/autouri/gcsuri.py +++ b/autouri/gcsuri.py @@ -89,6 +89,8 @@ class GCSURI(URIBase): GCS client is not thread-safe. _CACHED_PRESIGNED_URLS: Can use cached presigned URLs. + _GCS_PUBLIC_URL_FORMAT: + End point for a bucket with public access + key path """ PRIVATE_KEY_FILE: str = '' DURATION_PRESIGNED_URL: int = 4233600 @@ -99,6 +101,7 @@ class GCSURI(URIBase): _CACHED_GCS_CLIENT_PER_THREAD = {} _CACHED_PRESIGNED_URLS = {} + _GCS_PUBLIC_URL_FORMAT = 'http://storage.googleapis.com/{bucket}/{path}' _LOC_SUFFIX = '.gcs' _SCHEMES = ('gs://',) @@ -328,6 +331,10 @@ def get_presigned_url(self, duration=None, private_key_file=None, use_cached=Fal cache[self._uri] = url return url + def get_public_url(self) -> str: + bucket, path = self.get_bucket_path() + return GCSURI._GCS_PUBLIC_URL_FORMAT.format(bucket=bucket, path=path) + @staticmethod def get_gcs_client(thread_id) -> storage.Client: if thread_id in GCSURI._CACHED_GCS_CLIENT_PER_THREAD: diff --git a/autouri/loc_aux.py b/autouri/loc_aux.py index f0995a2..2edd114 100644 --- a/autouri/loc_aux.py +++ b/autouri/loc_aux.py @@ -34,7 +34,7 @@ def recurse_dict(d, fnc, d_parent=None, d_parent_key=None, elif isinstance(d, list): for i, v in enumerate(d): modified |= recurse_dict(v, fnc, lst=d, - slst_idx=i, modified=modified) + lst_idx=i, modified=modified) elif isinstance(d, str): assert(d_parent is not None or lst is not None) new_val, modified_ = fnc(d) diff --git a/autouri/metadata.py b/autouri/metadata.py index 13b1665..2fa70b0 100644 --- a/autouri/metadata.py +++ b/autouri/metadata.py @@ -1,19 +1,33 @@ """URIMetadata and helper functions for metadata """ +import warnings from binascii import hexlify from base64 import b64decode from collections import namedtuple -from datetime import datetime -from dateutil.parser import parse as parse_timestamp -from dateutil.tz import tzutc +from datetime import datetime, timezone +from dateparser import parse as dateparser_parse +from dateutil.parser import parse as dateutil_parse +from dateutil.tz import tzlocal, tzutc URIMetadata = namedtuple('URIMetadata', ('exists', 'mtime', 'size', 'md5')) def get_seconds_from_epoch(timestamp: str) -> float: - utc_t = parse_timestamp(timestamp) - utc_epoch = datetime(1970, 1, 1, tzinfo=tzutc()) + """If dateutil.parser.parse cannot parse DST timezones + (e.g. PDT, EDT) correctly, then use dateparser.parse instead. + """ + utc_epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) + utc_t = None + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + utc_t = dateutil_parse(timestamp) + except: + pass + if utc_t is None or utc_t.tzname() not in ('UTC', 'Z'): + utc_t = dateparser_parse(timestamp) + utc_t = utc_t.astimezone(timezone.utc) return (utc_t - utc_epoch).total_seconds() diff --git a/autouri/s3uri.py b/autouri/s3uri.py index 4e5c1f9..1fa1b04 100644 --- a/autouri/s3uri.py +++ b/autouri/s3uri.py @@ -2,6 +2,7 @@ S3 Object versioning must be turned off """ import requests +import time from boto3 import client from botocore.exceptions import ClientError from filelock import BaseFileLock @@ -13,12 +14,22 @@ class S3URILock(BaseFileLock): - """Unstable locking without using S3 Object Lock. + """Locking without using S3 Object Lock. + Without S3 object lock, boto3's put_object(), which is used for _write() and _cp() in this module, + does not ensure consistency of multiple write operations at the same time. + It overwrites for all write requests but the last object written. + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object + + To make this lock as stable as possible, this module uses .lock file with id(self) written on it. + This module first checks if .lock does not exist, then tries to write .lock with id(self). + It waits for a short time (self._lock_read_delay) and checks if written .lock has the same id(self). + self._lock_read_delay is set as poll_interval/10. """ def __init__( self, lock_file, timeout=900, poll_interval=10.0, no_lock=False): super().__init__(lock_file, timeout=timeout) self._poll_interval = poll_interval + self._lock_read_delay = self._poll_interval/10.0 def acquire(self, timeout=None, poll_intervall=5.0): """To use self._poll_interval instead of poll_intervall in args. @@ -26,10 +37,16 @@ def acquire(self, timeout=None, poll_intervall=5.0): super().acquire(timeout=timeout, poll_intervall=self._poll_interval) def _acquire(self): + """Unlike GCSURI, this module does not use S3 Object locking. + This will write id(self) on a .lock file. + """ u = S3URI(self._lock_file) + str_id = str(id(self)) try: if not u.exists: - u.write('', no_lock=True) + u.write(str_id, no_lock=True) + time.sleep(self._lock_read_delay) + if u.read() == str_id: self._lock_file_fd = id(self) except ClientError as e: status = e.response["ResponseMetadata"]["HTTPStatusCode"] @@ -58,11 +75,14 @@ class S3URI(URIBase): Protected class constants: _CACHED_BOTO3_CLIENT_PER_THREAD: _CACHED_PRESIGNED_URLS: + _S3_PUBLIC_URL_FORMAT: + End point for a bucket with public access + key path """ DURATION_PRESIGNED_URL: int = 4233600 _CACHED_BOTO3_CLIENT_PER_THREAD = {} _CACHED_PRESIGNED_URLS = {} + _S3_PUBLIC_URL_FORMAT = 'http://{bucket}.s3.amazonaws.com/{path}' _LOC_SUFFIX = '.s3' _SCHEMES = ('s3://',) @@ -232,6 +252,10 @@ def get_presigned_url(self, duration=None, use_cached=False) -> str: cache[self._uri] = url return url + def get_public_url(self) -> str: + bucket, path = self.get_bucket_path() + return S3URI._S3_PUBLIC_URL_FORMAT.format(bucket=bucket, path=path) + @staticmethod def get_boto3_client(thread_id=-1) -> client: if thread_id in S3URI._CACHED_BOTO3_CLIENT_PER_THREAD: diff --git a/setup.py b/setup.py index 124d1ef..3eacbcf 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,6 @@ 'requests', 'pyopenssl', 'google-cloud-storage', 'boto3', 'awscli', - 'filelock' + 'dateparser', 'filelock' ] ) diff --git a/tests/conftest.py b/tests/conftest.py index 01ab485..9f9b76e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,11 +31,24 @@ def pytest_addoption(parser): parser.addoption( '--s3-root', default='s3://encode-test-autouri/tmp', help='S3 root path for CI test. ' - 'This S3 bucket must be configured without versioning.' + 'This S3 bucket must be configured without versioning. ' + 'Make it publicly accessible. ' + 'Read access for everyone is enough for testing. ' + ) + parser.addoption( + '--s3-public-url-test-v6-file', default='s3://encode-test-autouri/tmp/v6.txt', + help='Write "v6: Hello World" to this file named "v6.txt" ' + 'and grant "Read object" permission on it. ' + 'Since S3 object does not inherit ACL from bucket/parent ' + 'and S3URI does not have methods to control ACL of an object ' + 'so this is the only way to test get_public_url(self) method in ' + 'S3URI.' ) parser.addoption( '--gcs-root', default='gs://encode-test-autouri/tmp', help='GCS root path for CI test. ' + 'This GCS bucket must be publicly accessible ' + '(read access for everyone is enough for testing).' ) parser.addoption( '--gcs-root-url', default='gs://encode-test-autouri/tmp_url', @@ -64,6 +77,11 @@ def s3_root(request): return request.config.getoption("--s3-root").rstrip('/') +@pytest.fixture(scope="session") +def s3_public_url_test_v6_file(request): + return request.config.getoption("--s3-public-url-test-v6-file") + + @pytest.fixture(scope="session") def gcs_root(request): """GCS root to generate test GCS URIs on. diff --git a/tests/test_abspath.py b/tests/test_abspath.py index 73e04fa..6e7828a 100644 --- a/tests/test_abspath.py +++ b/tests/test_abspath.py @@ -121,7 +121,6 @@ def test_abspath_md5_file_uri(local_v6_txt): assert AbsPath(local_v6_txt + URIBase.MD5_FILE_EXT).uri == local_v6_txt + URIBase.MD5_FILE_EXT -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_abspath_cp_url( local_v6_txt, url_test_path) -> 'AutoURI': @@ -134,7 +133,8 @@ def test_abspath_cp_url( for test_path in (url_test_path, ): u_dest = AutoURI(os.path.join(test_path, 'test_abspath_cp', basename)) - _, ret = u.cp(u_dest) + with pytest.raises(ReadOnlyStorageError): + _, ret = u.cp(u_dest, return_flag=True) def test_abspath_cp( @@ -167,26 +167,26 @@ def test_abspath_cp( u_dest.rm() assert not u_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() assert not u_dest.exists # cp without lock will be tested throughly in test_race_cond.py - _, ret = u.cp(u_dest, no_lock=True) + _, ret = u.cp(u_dest, no_lock=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() # trivial: copy without checksum when target doesn't exists assert not u_dest.exists - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 # copy without checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists time.sleep(1) - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) # compare new mtime vs old mtime # new time should be larger if it's overwritten as intended assert u_dest.mtime > m_dest.mtime and u.read() == u_dest.read() and ret == 0 @@ -194,7 +194,7 @@ def test_abspath_cp( # copy with checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) # compare new mtime vs old mtime # new time should be the same as old time assert u_dest.mtime == m_dest.mtime and u.read() == u_dest.read() and ret == 1 @@ -207,7 +207,7 @@ def test_abspath_cp( u_dest_md5_file = AutoURI(u_dest.uri + URIBase.MD5_FILE_EXT) if u_dest_md5_file.exists: u_dest_md5_file.rm() - _, ret = u.cp(u_dest, make_md5_file=True) + _, ret = u.cp(u_dest, make_md5_file=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 1 u_dest.rm() @@ -229,19 +229,9 @@ def test_abspath_write(local_test_path): def test_abspath_write_no_permission(): - try: - u = AbsPath('/x.tmp') + u = AbsPath('/test-permission-denied/x.tmp') + with pytest.raises(PermissionError): u.write('test') - assert False - except PermissionError: - pass - - try: - u = AbsPath('/x.tmp') - u.write('test') - assert False - except PermissionError: - pass def test_abspath_rm(local_test_path): @@ -308,6 +298,49 @@ def test_abspath_mkdirname(local_test_path): assert os.path.exists(os.path.dirname(f)) +def test_abspath_soft_link(local_test_path, local_v6_txt): + u_src = AbsPath(local_v6_txt) + f = os.path.join(local_test_path, 'test_abspath_soft_link', 'v6.txt') + u_target = AbsPath(f) + u_target.mkdir_dirname() + u_src.soft_link(u_target) + assert u_target.exists and u_target.read() == v6_txt_contents() + assert u_src.uri == os.path.realpath(u_target.uri) + + with pytest.raises(OSError): + # file already exists + u_src.soft_link(u_target) + # no error if force + u_src.soft_link(u_target, force=True) + u_target.rm() + + +# staticmethods +def test_abspath_get_abspath_if_exists(): + # write a local file on CWD. + test_local_file_abspath = os.path.join(os.getcwd(), 'test.txt') + u = AbsPath(test_local_file_abspath) + if u.exists: + u.rm() + + # if it doesn't exist + assert AbsPath.get_abspath_if_exists('test.txt') == 'test.txt' + assert AbsPath.get_abspath_if_exists(AutoURI('test.txt')) == 'test.txt' + + u.write('hello-world') + + # if it exists + assert AbsPath.get_abspath_if_exists('test.txt') == test_local_file_abspath + assert AbsPath.get_abspath_if_exists(AutoURI('test.txt')) == test_local_file_abspath + + assert AbsPath.get_abspath_if_exists('tttttttttest.txt') == 'tttttttttest.txt' + assert AbsPath.get_abspath_if_exists(AutoURI('tttttttttest.txt')) == 'tttttttttest.txt' + assert AbsPath.get_abspath_if_exists('~/if-it-does-not-exist') == '~/if-it-does-not-exist' + assert AbsPath.get_abspath_if_exists('non-existing-file') == 'non-existing-file' + + u.rm() + + # classmethods def test_abspath_get_path_sep() -> str: assert AbsPath.get_path_sep() == os.path.sep @@ -394,6 +427,7 @@ def test_abspath_localize( loc_uri, localized = AbsPath.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not os.path.exists(loc_prefix) @@ -401,6 +435,7 @@ def test_abspath_localize( loc_uri, localized = AbsPath.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not os.path.exists(loc_prefix) @@ -415,6 +450,7 @@ def test_abspath_localize( loc_uri, localized = AbsPath.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == os.path.join( loc_prefix_, u_j1_json.loc_dirname, @@ -424,6 +460,7 @@ def test_abspath_localize( loc_uri, localized = AbsPath.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == os.path.join( loc_prefix_, u_j1_json.loc_dirname, diff --git a/tests/test_autouri_localize.py b/tests/test_autouri_localize.py index 7184afe..8cac0db 100644 --- a/tests/test_autouri_localize.py +++ b/tests/test_autouri_localize.py @@ -19,7 +19,6 @@ recurse_raise_if_uri_not_exist) -@pytest.mark.xfail(raises=AutoURIRecursionError) def test_localize_self_ref( local_test_path, gcs_j1_json_self_ref, @@ -43,10 +42,11 @@ def test_localize_self_ref( u_j1_json = AutoURI(j1_json) loc_prefix_ = loc_prefix + u_j1_json.__class__.get_loc_suffix() - loc_uri, localized = AbsPath.localize( - u_j1_json, - recursive=True, - loc_prefix=loc_prefix_) + with pytest.raises(AutoURIRecursionError): + loc_uri, localized = AbsPath.localize( + u_j1_json, + recursive=True, + loc_prefix=loc_prefix_) def test_localize_mixed( @@ -70,6 +70,7 @@ def test_localize_mixed( loc_uri, localized = AbsPath.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) # check if all URIs defeind in localized JSON file exist recurse_raise_if_uri_not_exist(loc_uri) diff --git a/tests/test_gcsuri.py b/tests/test_gcsuri.py index afb2c52..0c31d26 100644 --- a/tests/test_gcsuri.py +++ b/tests/test_gcsuri.py @@ -126,7 +126,6 @@ def test_gcsuri_md5_file_uri(gcs_v6_txt): assert GCSURI(gcs_v6_txt + URIBase.MD5_FILE_EXT).uri == gcs_v6_txt + URIBase.MD5_FILE_EXT -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_gcsuri_cp_url( gcs_v6_txt, url_test_path) -> 'AutoURI': @@ -139,7 +138,8 @@ def test_gcsuri_cp_url( for test_path in (url_test_path,): u_dest = AutoURI(os.path.join(test_path, 'test_gcsuri_cp', basename)) - _, ret = u.cp(u_dest) + with pytest.raises(ReadOnlyStorageError): + _, ret = u.cp(u_dest, return_flag=True) def test_gcsuri_cp( @@ -171,26 +171,26 @@ def test_gcsuri_cp( u_dest.rm() assert not u_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() assert not u_dest.exists # cp without lock will be tested throughly in test_race_cond.py - _, ret = u.cp(u_dest, no_lock=True) + _, ret = u.cp(u_dest, no_lock=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() # trivial: copy without checksum when target doesn't exists assert not u_dest.exists - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 # copy without checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists time.sleep(1) - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) # compare new mtime vs old mtime # new time should be larger if it's overwritten as intended assert u_dest.mtime > m_dest.mtime and u.read() == u_dest.read() and ret == 0 @@ -198,7 +198,7 @@ def test_gcsuri_cp( # copy with checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) # compare new mtime vs old mtime # new time should be the same as old time assert u_dest.mtime == m_dest.mtime and u.read() == u_dest.read() and ret == 1 @@ -211,7 +211,7 @@ def test_gcsuri_cp( u_dest_md5_file = AutoURI(u_dest.uri + URIBase.MD5_FILE_EXT) if u_dest_md5_file.exists: u_dest_md5_file.rm() - _, ret = u.cp(u_dest, make_md5_file=True) + _, ret = u.cp(u_dest, make_md5_file=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 1 u_dest.rm() @@ -312,6 +312,12 @@ def test_gcsuri_get_presigned_url(gcs_v6_txt, gcp_private_key_file): # assert u_url.read() != v6_txt_contents() +def test_gcsuri_get_public_url(gcs_v6_txt): + url = GCSURI(gcs_v6_txt).get_public_url() + u_url = HTTPURL(url) + assert u_url.is_valid and u_url.read() == v6_txt_contents() + + # classmethods def test_gcsuri_get_path_sep() -> str: assert GCSURI.get_path_sep() == os.path.sep @@ -400,6 +406,7 @@ def test_gcsuri_localize( loc_uri, localized = GCSURI.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not AutoURI(os.path.join(loc_prefix_, basename)).exists @@ -407,6 +414,7 @@ def test_gcsuri_localize( loc_uri, localized = GCSURI.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not AutoURI(os.path.join(loc_prefix_, basename)).exists @@ -422,6 +430,7 @@ def test_gcsuri_localize( loc_uri, localized = GCSURI.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) expected = os.path.join( loc_prefix_, u_j1_json.loc_dirname, @@ -432,6 +441,7 @@ def test_gcsuri_localize( loc_uri, localized = GCSURI.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) expected = os.path.join( loc_prefix_, u_j1_json.loc_dirname, diff --git a/tests/test_httpurl.py b/tests/test_httpurl.py index 016d46b..1fb8e36 100644 --- a/tests/test_httpurl.py +++ b/tests/test_httpurl.py @@ -145,7 +145,6 @@ def test_httpurl_md5_file_uri(url_v6_txt): assert HTTPURL(url_v6_txt + URIBase.MD5_FILE_EXT).uri == url_v6_txt + URIBase.MD5_FILE_EXT -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_httpurl_cp_url( url_v6_txt, url_test_path) -> 'AutoURI': @@ -158,7 +157,9 @@ def test_httpurl_cp_url( for test_path in (url_test_path,): u_dest = AutoURI(os.path.join(test_path, 'test_httpurl_cp', basename)) - _, ret = u.cp(u_dest) + + with pytest.raises(ReadOnlyStorageError): + _, ret = u.cp(u_dest, return_flag=True) def test_httpurl_cp( @@ -190,26 +191,26 @@ def test_httpurl_cp( u_dest.rm() assert not u_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() assert not u_dest.exists # cp without lock will be tested throughly in test_race_cond.py - _, ret = u.cp(u_dest, no_lock=True) + _, ret = u.cp(u_dest, no_lock=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() # trivial: copy without checksum when target doesn't exists assert not u_dest.exists - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 # copy without checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists time.sleep(1) - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) # compare new mtime vs old mtime # new time should be larger if it's overwritten as intended assert u_dest.mtime > m_dest.mtime and u.read() == u_dest.read() and ret == 0 @@ -217,7 +218,7 @@ def test_httpurl_cp( # copy with checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) # compare new mtime vs old mtime # new time should be the same as old time assert u_dest.mtime == m_dest.mtime and u.read() == u_dest.read() and ret == 1 @@ -230,21 +231,21 @@ def test_httpurl_cp( u_dest_md5_file = AutoURI(u_dest.uri + URIBase.MD5_FILE_EXT) if u_dest_md5_file.exists: u_dest_md5_file.rm() - _, ret = u.cp(u_dest, make_md5_file=True) + _, ret = u.cp(u_dest, make_md5_file=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 1 u_dest.rm() -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_httpurl_write(url_test_path): u = HTTPURL(url_test_path + '/test_httpurl_write.tmp') - u.write('test') + with pytest.raises(ReadOnlyStorageError): + u.write('test') -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_httpurl_rm(url_test_path): u = HTTPURL(url_test_path + '/test_httpurl_rm.tmp') - u.rm() + with pytest.raises(ReadOnlyStorageError): + u.rm() def test_httpurl_get_metadata(url_v6_txt, v6_txt_size, v6_txt_md5_hash): @@ -296,7 +297,6 @@ def test_httpurl_get_loc_prefix() -> str: assert HTTPURL.get_loc_prefix() == '' -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_httpurl_localize( url_test_path, gcs_j1_json, gcs_v41_json, gcs_v421_tsv, gcs_v5_csv, gcs_v6_txt, @@ -315,7 +315,8 @@ def test_httpurl_localize( # nothing should be localized actually # since they are already on a local storage # so loc_prefix directory itself shouldn't be created - loc_uri, localized = HTTPURL.localize( - u_j1_json, - recursive=False, - loc_prefix=loc_prefix_) + with pytest.raises(ReadOnlyStorageError): + loc_uri, localized = HTTPURL.localize( + u_j1_json, + recursive=False, + loc_prefix=loc_prefix_) diff --git a/tests/test_race_cond.py b/tests/test_race_cond.py index 5a1ae5a..a1c300d 100644 --- a/tests/test_race_cond.py +++ b/tests/test_race_cond.py @@ -17,6 +17,16 @@ HTTPURL: Read-only storage so don't need to test +Test nth threads competing to write on the same file v6.txt. +Compare written string vs read string. + +Important notes: + Python API for GCS client() is not thread-safe. + So we need to specify thread_id here. + URIBase (and its child GCSURI) has a thread_id + This will make a new GCS client instance for each thread. + S3 Object Lock is based on Versioning. + We don't allow versioning so keep using unstable soft file lock. """ import os import pytest @@ -43,34 +53,30 @@ def write_v6_txt(x): assert u.read() == s -def test_race_cond_autouri_write( - local_test_path, - gcs_test_path, - s3_test_path): - """Test nth threads competing to write on the same file v6.txt. - Compare written string vs read string. - - Important notes: - Python API for GCS client() is not thread-safe. - So we need to specify thread_id here. - URIBase (and its child GCSURI) has a thread_id - This will make a new GCS client instance for each thread. - S3 Object Lock is based on Versioning. - We don't allow versioning so keep using unstable soft file lock. - """ - tests = ( - (local_test_path, 50), - (gcs_test_path, 10), - (s3_test_path, 5), - ) - # (local_test_path, gcs_test_path, s3_test_path,): - for (test_path, nth) in tests: - prefix = os.path.join(test_path, 'test_race_cond_autouri_write') - s = os.path.join(prefix, 'v6.txt') - u = AutoURI(s) - if u.exists: - u.rm() - p = Pool(nth) - p.map(write_v6_txt, list(zip([s]*nth, range(nth)))) - p.close() - p.join() +def run_write_v6_txt(prefix, nth): + s = os.path.join(prefix, 'v6.txt') + u = AutoURI(s) + if u.exists: + u.rm() + p = Pool(nth) + p.map(write_v6_txt, list(zip([s]*nth, range(nth)))) + p.close() + p.join() + + +def test_race_cond_autouri_write_local(local_test_path): + prefix = os.path.join(local_test_path, 'test_race_cond_autouri_write_local') + nth = 50 + run_write_v6_txt(prefix, nth) + + +def test_race_cond_autouri_write_gcs(gcs_test_path): + prefix = os.path.join(gcs_test_path, 'test_race_cond_autouri_write_gcs') + nth = 10 + run_write_v6_txt(prefix, nth) + + +def test_race_cond_autouri_write_s3(s3_test_path): + nth = 5 + prefix = os.path.join(s3_test_path, 'test_race_cond_autouri_write_s3') + run_write_v6_txt(prefix, nth) diff --git a/tests/test_s3uri.py b/tests/test_s3uri.py index 414fe48..7641b66 100644 --- a/tests/test_s3uri.py +++ b/tests/test_s3uri.py @@ -127,7 +127,6 @@ def test_s3uri_md5_file_uri(s3_v6_txt): assert S3URI(s3_v6_txt + URIBase.MD5_FILE_EXT).uri == s3_v6_txt + URIBase.MD5_FILE_EXT -@pytest.mark.xfail(raises=ReadOnlyStorageError) def test_s3uri_cp_url( s3_v6_txt, url_test_path) -> 'AutoURI': @@ -140,7 +139,9 @@ def test_s3uri_cp_url( for test_path in (url_test_path,): u_dest = AutoURI(os.path.join(test_path, 'test_s3uri_cp', basename)) - _, ret = u.cp(u_dest) + + with pytest.raises(ReadOnlyStorageError): + _, ret = u.cp(u_dest, return_flag=True) def test_s3uri_cp( @@ -172,26 +173,26 @@ def test_s3uri_cp( u_dest.rm() assert not u_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() assert not u_dest.exists # cp without lock will be tested throughly in test_race_cond.py - _, ret = u.cp(u_dest, no_lock=True) + _, ret = u.cp(u_dest, no_lock=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 u_dest.rm() # trivial: copy without checksum when target doesn't exists assert not u_dest.exists - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 0 # copy without checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists time.sleep(1) - _, ret = u.cp(u_dest, no_checksum=True) + _, ret = u.cp(u_dest, no_checksum=True, return_flag=True) # compare new mtime vs old mtime # new time should be larger if it's overwritten as intended assert u_dest.mtime > m_dest.mtime and u.read() == u_dest.read() and ret == 0 @@ -199,7 +200,7 @@ def test_s3uri_cp( # copy with checksum when target exists m_dest = u_dest.get_metadata() assert m_dest.exists - _, ret = u.cp(u_dest) + _, ret = u.cp(u_dest, return_flag=True) # compare new mtime vs old mtime # new time should be the same as old time assert u_dest.mtime == m_dest.mtime and u.read() == u_dest.read() and ret == 1 @@ -212,7 +213,7 @@ def test_s3uri_cp( u_dest_md5_file = AutoURI(u_dest.uri + URIBase.MD5_FILE_EXT) if u_dest_md5_file.exists: u_dest_md5_file.rm() - _, ret = u.cp(u_dest, make_md5_file=True) + _, ret = u.cp(u_dest, make_md5_file=True, return_flag=True) assert u_dest.exists and u.read() == u_dest.read() and ret == 1 u_dest.rm() @@ -293,12 +294,16 @@ def test_s3uri_get_presigned_url(s3_v6_txt): assert u_url.is_valid and u_url.read() == v6_txt_contents() time.sleep(5) # should expire in 2 seconds - try: - s = u_url.read() - assert False - except HTTPError: + with pytest.raises(HTTPError): # forbidden since it's already expired - pass + s = u_url.read() + + +def test_s3uri_get_public_url(s3_public_url_test_v6_file): + url = S3URI(s3_public_url_test_v6_file).get_public_url() + u_url = HTTPURL(url) + assert u_url.is_valid + assert u_url.read() == v6_txt_contents() # classmethods @@ -388,6 +393,7 @@ def test_s3uri_localize( loc_uri, localized = S3URI.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not AutoURI(os.path.join(loc_prefix_, basename)).exists @@ -395,6 +401,7 @@ def test_s3uri_localize( loc_uri, localized = S3URI.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) assert loc_uri == u_j1_json.uri and not localized assert not AutoURI(os.path.join(loc_prefix_, basename)).exists @@ -410,6 +417,7 @@ def test_s3uri_localize( loc_uri, localized = S3URI.localize( u_j1_json, recursive=False, + return_flag=True, loc_prefix=loc_prefix_) expected = os.path.join( loc_prefix_, u_j1_json.loc_dirname, @@ -420,6 +428,7 @@ def test_s3uri_localize( loc_uri, localized = S3URI.localize( u_j1_json, recursive=True, + return_flag=True, loc_prefix=loc_prefix_) expected = os.path.join( loc_prefix_, u_j1_json.loc_dirname,