Skip to content

Commit

Permalink
cleanup fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Apr 26, 2024
1 parent 9d3c31a commit c585c8a
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 37 deletions.
46 changes: 31 additions & 15 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import humanfriendly
import requests
from requests.adapters import HTTPAdapter

try:
from alluxiocommon import _DataManager
except ModuleNotFoundError:
print("[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke.")
print(
"[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
)

from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
Expand Down Expand Up @@ -202,16 +205,20 @@ def __init__(
self.logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if ALLUXIO_COMMON_EXTENSION_ENABLE in options \
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true":
print(f"Using alluxiocommon extension..")
self.logger.debug(
"alluxiocommon extension enabled."
if (
ALLUXIO_COMMON_EXTENSION_ENABLE in options
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
):
print("Using alluxiocommon extension..")
self.logger.debug("alluxiocommon extension enabled.")
ondemand_pool_disabled = (
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
== "true"
)
self.data_manager = _DataManager(
concurrency, ondemand_pool_disabled=ondemand_pool_disabled
)
ondemand_pool_disabled = ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options \
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower() == "true"
self.data_manager = _DataManager(concurrency,
ondemand_pool_disabled=ondemand_pool_disabled)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
Expand Down Expand Up @@ -604,7 +611,9 @@ def write_page(self, file_path, page_index, page_bytes):
f"Error writing to file {file_path} at page {page_index}: {e}"
)

def _all_page_generator_alluxiocommon(self, worker_host, worker_http_port, path_id):
def _all_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id
):
page_index = 0
fetching_pages_num_each_round = 4
while True:
Expand All @@ -619,9 +628,14 @@ def _all_page_generator_alluxiocommon(self, worker_host, worker_http_port, path_
)
read_urls.append(page_url)
page_index += 1
pages_content = self.data_manager.make_multi_http_req(read_urls)
pages_content = self.data_manager.make_multi_http_req(
read_urls
)
yield pages_content
if len(pages_content) < fetching_pages_num_each_round * self.page_size:
if (
len(pages_content)
< fetching_pages_num_each_round * self.page_size
):
break
except Exception as e:
# data_manager won't throw exception if there are any first few content retrieved
Expand Down Expand Up @@ -653,14 +667,16 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
page_index += 1

def _range_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id, offset, length
self, worker_host, worker_http_port, path_id, offset, length
):
read_urls = []
start = offset
while start < offset + length:
page_index = start // self.page_size
inpage_off = start % self.page_size
inpage_read_len = min(self.page_size - inpage_off, offset + length - start)
inpage_read_len = min(
self.page_size - inpage_off, offset + length - start
)
page_url = None
if inpage_off == 0 and inpage_read_len == self.page_size:
page_url = FULL_PAGE_URL_FORMAT.format(
Expand Down
3 changes: 2 additions & 1 deletion benchmark/AbstractBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def init(self):
# protocol = self.get_protocol(self.args.path)
alluxio_options = {
# "alluxio.common.extension.enable": "True",
"alluxio.worker.page.store.page.size" : "1MB"}
"alluxio.worker.page.store.page.size": "1MB"
}
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts,
Expand Down
3 changes: 1 addition & 2 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## A common native lib for alluxio python client
to build and play:

create virtualenv, (a tool used to create isolated Python environments):
create virtualenv, (a tool used to create isolated Python environments):

python3 -m venv .env
source .env/bin/activate
Expand All @@ -12,4 +12,3 @@ then can do:
python3
>>> import alluxiocommon
>>> alluxiocommon.multi_http_requests(["http://google.com"],[(0,0)])

1 change: 0 additions & 1 deletion rust/alluxiocommon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,3 @@ log = "0.4.21"
[dependencies.pyo3]
version = "0.21.1"
features = ["extension-module"]

3 changes: 2 additions & 1 deletion rust/alluxiocommon/setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from setuptools import setup
from setuptools_rust import Binding, RustExtension
from setuptools_rust import Binding
from setuptools_rust import RustExtension


setup(
Expand Down
13 changes: 8 additions & 5 deletions tests/client/test_read_docker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import random
from hashlib import md5

from alluxiofs import AlluxioClient
from tests.conftest import ALLUXIO_FILE_PATH
from tests.conftest import LOCAL_FILE_PATH
from hashlib import md5

NUM_TESTS = 10

Expand All @@ -19,6 +19,7 @@ def _get_md5(payload):
m.update(payload)
return m.hexdigest()


def validate_read_range(
alluxio_client: AlluxioClient,
alluxio_file_path,
Expand Down Expand Up @@ -70,6 +71,7 @@ def validate_invalid_read_range(
"Expected an exception from local file read but none occurred."
)


def validate_full_read(
alluxio_client: AlluxioClient,
alluxio_file_path,
Expand Down Expand Up @@ -144,13 +146,14 @@ def test_alluxio_client(alluxio_client: AlluxioClient):

# test full data read

validate_full_read(alluxio_client,
ALLUXIO_FILE_PATH,
LOCAL_FILE_PATH)
validate_full_read(alluxio_client, ALLUXIO_FILE_PATH, LOCAL_FILE_PATH)


def test_etcd_alluxio_client(etcd_alluxio_client: AlluxioClient):
test_alluxio_client(etcd_alluxio_client)

def test_alluxio_client_alluxiocommon(alluxio_client_alluxiocommon: AlluxioClient):

def test_alluxio_client_alluxiocommon(
alluxio_client_alluxiocommon: AlluxioClient,
):
test_alluxio_client(alluxio_client_alluxiocommon)
10 changes: 5 additions & 5 deletions tests/client/test_read_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from alluxiofs.client.const import ALLUXIO_COMMON_EXTENSION_ENABLE
from alluxiofs.client.const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE


def parse_args():
parser = argparse.ArgumentParser(
description="Validate Alluxio read_range with local file."
Expand Down Expand Up @@ -40,14 +41,14 @@ def parse_args():
"--enable-alluxiocommon",
type=bool,
default=False,
help="To enable alluxiocommon extension"
help="To enable alluxiocommon extension",
)
parser.add_argument(
"--disable-alluxiocommon-ondemandpool",
type=bool,
default=False,
help="To disable alluxiocommon ondemand pool, "
"effective when --enable-alluxiocommon is enabeld"
"effective when --enable-alluxiocommon is enabeld",
)
return parser.parse_args()

Expand Down Expand Up @@ -101,14 +102,13 @@ def manual_test_invalid_read_range(


def main(args):
options={}
options = {}
if args.enable_alluxiocommon:
options[ALLUXIO_COMMON_EXTENSION_ENABLE] = "True"
if args.disable_alluxiocommon_ondemandpool:
options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE] = "True"

alluxio_client = AlluxioClient(etcd_hosts=args.etcd_hosts,
options=options)
alluxio_client = AlluxioClient(etcd_hosts=args.etcd_hosts, options=options)
file_size = os.path.getsize(args.local_file_path)

invalid_test_cases = [(-1, 100), (file_size - 1, -2)]
Expand Down
18 changes: 11 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,22 @@ def alluxio_client(docker_alluxio):
alluxio_client = AlluxioClient(worker_hosts=host, worker_http_port=port)
yield alluxio_client


@pytest.fixture
def alluxio_client_alluxiocommon(docker_alluxio):
LOGGER.debug(f"get AlluxioClient with alluxiocommon connect to {docker_alluxio}")
LOGGER.debug(
f"get AlluxioClient with alluxiocommon connect to {docker_alluxio}"
)
parsed_url = urlparse(docker_alluxio)
host = parsed_url.hostname
port = parsed_url.port
alluxio_options = {
"alluxio.common.extension.enable": "True"}
alluxio_client = AlluxioClient(worker_hosts=host, worker_http_port=port,
options=alluxio_options)
alluxio_options = {"alluxio.common.extension.enable": "True"}
alluxio_client = AlluxioClient(
worker_hosts=host, worker_http_port=port, options=alluxio_options
)
yield alluxio_client


@pytest.fixture
def etcd_alluxio_client(docker_alluxio_with_etcd):
LOGGER.debug(
Expand All @@ -189,6 +193,7 @@ def alluxio_file_system(docker_alluxio):
)
yield alluxio_file_system


@pytest.fixture
def alluxio_file_system_alluxiocommon(docker_alluxio):
LOGGER.debug(
Expand All @@ -198,8 +203,7 @@ def alluxio_file_system_alluxiocommon(docker_alluxio):
parsed_url = urlparse(docker_alluxio)
host = parsed_url.hostname
fsspec.register_implementation("alluxio", AlluxioFileSystem, clobber=True)
alluxio_options = {
"alluxio.common.extension.enable": "True"}
alluxio_options = {"alluxio.common.extension.enable": "True"}
alluxio_file_system = fsspec.filesystem(
"alluxio",
worker_hosts=host,
Expand Down
2 changes: 2 additions & 0 deletions tests/fs/test_docker_alluxio_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
def test_simple_fsspec(alluxio_file_system):
alluxio_file_system.ls(TEST_ROOT) # no error


def test_simple_fsspec_alluxiocommon(alluxio_file_system_alluxiocommon):
alluxio_file_system_alluxiocommon.ls(TEST_ROOT)


def test_simple_etcd_fsspec(etcd_alluxio_file_system):
etcd_alluxio_file_system.ls(TEST_ROOT) # no error

0 comments on commit c585c8a

Please sign in to comment.