From c585c8a35578bf56dbd3516e9b9e3ffaf7531283 Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Fri, 26 Apr 2024 14:19:20 -0700 Subject: [PATCH] cleanup fixes --- alluxiofs/client/core.py | 46 +++++++++++++++++--------- benchmark/AbstractBench.py | 3 +- rust/README.md | 3 +- rust/alluxiocommon/Cargo.toml | 1 - rust/alluxiocommon/setup.py | 3 +- tests/client/test_read_docker.py | 13 +++++--- tests/client/test_read_range.py | 10 +++--- tests/conftest.py | 18 ++++++---- tests/fs/test_docker_alluxio_fsspec.py | 2 ++ 9 files changed, 62 insertions(+), 37 deletions(-) diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index fbf7c4b..26b6b12 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -14,10 +14,13 @@ import humanfriendly import requests from requests.adapters import HTTPAdapter + try: from alluxiocommon import _DataManager except ModuleNotFoundError: - print("[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke.") + print( + "[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke." + ) from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE @@ -202,16 +205,20 @@ def __init__( self.logger.debug( f"Hash node per worker is set to {hash_node_per_worker}" ) - if ALLUXIO_COMMON_EXTENSION_ENABLE in options \ - and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true": - print(f"Using alluxiocommon extension..") - self.logger.debug( - "alluxiocommon extension enabled." + if ( + ALLUXIO_COMMON_EXTENSION_ENABLE in options + and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true" + ): + print("Using alluxiocommon extension..") + self.logger.debug("alluxiocommon extension enabled.") + ondemand_pool_disabled = ( + ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options + and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower() + == "true" + ) + self.data_manager = _DataManager( + concurrency, ondemand_pool_disabled=ondemand_pool_disabled ) - ondemand_pool_disabled = ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options \ - and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower() == "true" - self.data_manager = _DataManager(concurrency, - ondemand_pool_disabled=ondemand_pool_disabled) if ( not isinstance(hash_node_per_worker, int) or hash_node_per_worker <= 0 @@ -604,7 +611,9 @@ def write_page(self, file_path, page_index, page_bytes): f"Error writing to file {file_path} at page {page_index}: {e}" ) - def _all_page_generator_alluxiocommon(self, worker_host, worker_http_port, path_id): + def _all_page_generator_alluxiocommon( + self, worker_host, worker_http_port, path_id + ): page_index = 0 fetching_pages_num_each_round = 4 while True: @@ -619,9 +628,14 @@ def _all_page_generator_alluxiocommon(self, worker_host, worker_http_port, path_ ) read_urls.append(page_url) page_index += 1 - pages_content = self.data_manager.make_multi_http_req(read_urls) + pages_content = self.data_manager.make_multi_http_req( + read_urls + ) yield pages_content - if len(pages_content) < fetching_pages_num_each_round * self.page_size: + if ( + len(pages_content) + < fetching_pages_num_each_round * self.page_size + ): break except Exception as e: # data_manager won't throw exception if there are any first few content retrieved @@ -653,14 +667,16 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id): page_index += 1 def _range_page_generator_alluxiocommon( - self, worker_host, worker_http_port, path_id, offset, length + self, worker_host, worker_http_port, path_id, offset, length ): read_urls = [] start = offset while start < offset + length: page_index = start // self.page_size inpage_off = start % self.page_size - inpage_read_len = min(self.page_size - inpage_off, offset + length - start) + inpage_read_len = min( + self.page_size - inpage_off, offset + length - start + ) page_url = None if inpage_off == 0 and inpage_read_len == self.page_size: page_url = FULL_PAGE_URL_FORMAT.format( diff --git a/benchmark/AbstractBench.py b/benchmark/AbstractBench.py index 76246b4..99f3ae3 100644 --- a/benchmark/AbstractBench.py +++ b/benchmark/AbstractBench.py @@ -81,7 +81,8 @@ def init(self): # protocol = self.get_protocol(self.args.path) alluxio_options = { # "alluxio.common.extension.enable": "True", - "alluxio.worker.page.store.page.size" : "1MB"} + "alluxio.worker.page.store.page.size": "1MB" + } self.alluxio_fs = AlluxioFileSystem( etcd_hosts=self.args.etcd_hosts, worker_hosts=self.args.worker_hosts, diff --git a/rust/README.md b/rust/README.md index 2bf7a04..1cab1f6 100644 --- a/rust/README.md +++ b/rust/README.md @@ -1,7 +1,7 @@ ## A common native lib for alluxio python client to build and play: -create virtualenv, (a tool used to create isolated Python environments): +create virtualenv, (a tool used to create isolated Python environments): python3 -m venv .env source .env/bin/activate @@ -12,4 +12,3 @@ then can do: python3 >>> import alluxiocommon >>> alluxiocommon.multi_http_requests(["http://google.com"],[(0,0)]) - diff --git a/rust/alluxiocommon/Cargo.toml b/rust/alluxiocommon/Cargo.toml index d20d22e..ce4baad 100644 --- a/rust/alluxiocommon/Cargo.toml +++ b/rust/alluxiocommon/Cargo.toml @@ -23,4 +23,3 @@ log = "0.4.21" [dependencies.pyo3] version = "0.21.1" features = ["extension-module"] - diff --git a/rust/alluxiocommon/setup.py b/rust/alluxiocommon/setup.py index 9d8c5d5..2585310 100644 --- a/rust/alluxiocommon/setup.py +++ b/rust/alluxiocommon/setup.py @@ -1,5 +1,6 @@ from setuptools import setup -from setuptools_rust import Binding, RustExtension +from setuptools_rust import Binding +from setuptools_rust import RustExtension setup( diff --git a/tests/client/test_read_docker.py b/tests/client/test_read_docker.py index 6b7becb..1e4bf25 100644 --- a/tests/client/test_read_docker.py +++ b/tests/client/test_read_docker.py @@ -1,10 +1,10 @@ import os import random +from hashlib import md5 from alluxiofs import AlluxioClient from tests.conftest import ALLUXIO_FILE_PATH from tests.conftest import LOCAL_FILE_PATH -from hashlib import md5 NUM_TESTS = 10 @@ -19,6 +19,7 @@ def _get_md5(payload): m.update(payload) return m.hexdigest() + def validate_read_range( alluxio_client: AlluxioClient, alluxio_file_path, @@ -70,6 +71,7 @@ def validate_invalid_read_range( "Expected an exception from local file read but none occurred." ) + def validate_full_read( alluxio_client: AlluxioClient, alluxio_file_path, @@ -144,13 +146,14 @@ def test_alluxio_client(alluxio_client: AlluxioClient): # test full data read - validate_full_read(alluxio_client, - ALLUXIO_FILE_PATH, - LOCAL_FILE_PATH) + validate_full_read(alluxio_client, ALLUXIO_FILE_PATH, LOCAL_FILE_PATH) def test_etcd_alluxio_client(etcd_alluxio_client: AlluxioClient): test_alluxio_client(etcd_alluxio_client) -def test_alluxio_client_alluxiocommon(alluxio_client_alluxiocommon: AlluxioClient): + +def test_alluxio_client_alluxiocommon( + alluxio_client_alluxiocommon: AlluxioClient, +): test_alluxio_client(alluxio_client_alluxiocommon) diff --git a/tests/client/test_read_range.py b/tests/client/test_read_range.py index e149811..55c6e8c 100644 --- a/tests/client/test_read_range.py +++ b/tests/client/test_read_range.py @@ -6,6 +6,7 @@ from alluxiofs.client.const import ALLUXIO_COMMON_EXTENSION_ENABLE from alluxiofs.client.const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE + def parse_args(): parser = argparse.ArgumentParser( description="Validate Alluxio read_range with local file." @@ -40,14 +41,14 @@ def parse_args(): "--enable-alluxiocommon", type=bool, default=False, - help="To enable alluxiocommon extension" + help="To enable alluxiocommon extension", ) parser.add_argument( "--disable-alluxiocommon-ondemandpool", type=bool, default=False, help="To disable alluxiocommon ondemand pool, " - "effective when --enable-alluxiocommon is enabeld" + "effective when --enable-alluxiocommon is enabeld", ) return parser.parse_args() @@ -101,14 +102,13 @@ def manual_test_invalid_read_range( def main(args): - options={} + options = {} if args.enable_alluxiocommon: options[ALLUXIO_COMMON_EXTENSION_ENABLE] = "True" if args.disable_alluxiocommon_ondemandpool: options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE] = "True" - alluxio_client = AlluxioClient(etcd_hosts=args.etcd_hosts, - options=options) + alluxio_client = AlluxioClient(etcd_hosts=args.etcd_hosts, options=options) file_size = os.path.getsize(args.local_file_path) invalid_test_cases = [(-1, 100), (file_size - 1, -2)] diff --git a/tests/conftest.py b/tests/conftest.py index 8777961..a34b32e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -152,18 +152,22 @@ def alluxio_client(docker_alluxio): alluxio_client = AlluxioClient(worker_hosts=host, worker_http_port=port) yield alluxio_client + @pytest.fixture def alluxio_client_alluxiocommon(docker_alluxio): - LOGGER.debug(f"get AlluxioClient with alluxiocommon connect to {docker_alluxio}") + LOGGER.debug( + f"get AlluxioClient with alluxiocommon connect to {docker_alluxio}" + ) parsed_url = urlparse(docker_alluxio) host = parsed_url.hostname port = parsed_url.port - alluxio_options = { - "alluxio.common.extension.enable": "True"} - alluxio_client = AlluxioClient(worker_hosts=host, worker_http_port=port, - options=alluxio_options) + alluxio_options = {"alluxio.common.extension.enable": "True"} + alluxio_client = AlluxioClient( + worker_hosts=host, worker_http_port=port, options=alluxio_options + ) yield alluxio_client + @pytest.fixture def etcd_alluxio_client(docker_alluxio_with_etcd): LOGGER.debug( @@ -189,6 +193,7 @@ def alluxio_file_system(docker_alluxio): ) yield alluxio_file_system + @pytest.fixture def alluxio_file_system_alluxiocommon(docker_alluxio): LOGGER.debug( @@ -198,8 +203,7 @@ def alluxio_file_system_alluxiocommon(docker_alluxio): parsed_url = urlparse(docker_alluxio) host = parsed_url.hostname fsspec.register_implementation("alluxio", AlluxioFileSystem, clobber=True) - alluxio_options = { - "alluxio.common.extension.enable": "True"} + alluxio_options = {"alluxio.common.extension.enable": "True"} alluxio_file_system = fsspec.filesystem( "alluxio", worker_hosts=host, diff --git a/tests/fs/test_docker_alluxio_fsspec.py b/tests/fs/test_docker_alluxio_fsspec.py index 163aa8f..b1c27e5 100644 --- a/tests/fs/test_docker_alluxio_fsspec.py +++ b/tests/fs/test_docker_alluxio_fsspec.py @@ -4,8 +4,10 @@ def test_simple_fsspec(alluxio_file_system): alluxio_file_system.ls(TEST_ROOT) # no error + def test_simple_fsspec_alluxiocommon(alluxio_file_system_alluxiocommon): alluxio_file_system_alluxiocommon.ls(TEST_ROOT) + def test_simple_etcd_fsspec(etcd_alluxio_file_system): etcd_alluxio_file_system.ls(TEST_ROOT) # no error