From 173dc6ebeaf82922b9d26a4e3b3cb66d9d8e7a14 Mon Sep 17 00:00:00 2001 From: Maud Ehrmann Date: Tue, 27 Aug 2024 09:41:53 +0200 Subject: [PATCH] format adjustments; docstrings; more helpers --- impresso_commons/utils/__init__.py | 21 ++++++++----- impresso_commons/utils/daskutils.py | 46 +++++++++++++++++++---------- impresso_commons/utils/s3.py | 5 ++-- 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/impresso_commons/utils/__init__.py b/impresso_commons/utils/__init__.py index 6c03d10..7b85070 100644 --- a/impresso_commons/utils/__init__.py +++ b/impresso_commons/utils/__init__.py @@ -3,17 +3,17 @@ # created on 2018.03.27 using PyCharm # project impresso-image-acquisition +import datetime import logging +import multiprocessing import sys import time -import datetime from datetime import timedelta import dask -from dask import compute, delayed +from dask import compute from dask.diagnostics import ProgressBar from dask.multiprocessing import get as mp_get -import multiprocessing logger = logging.getLogger(__name__) @@ -51,7 +51,7 @@ def init_logger(logger, log_level, log_file): ch.setFormatter(formatter) logger.addHandler(ch) - logger.info("Logger successfully initialised") + logger.info("LOGGER - Logger successfully initialised") return logger @@ -87,14 +87,18 @@ def user_confirmation(question, default=None): sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n") - def user_question(variable_to_confirm): - answer = user_confirmation(f"Is [{variable_to_confirm}] the correct one to work with?", None) + answer = user_confirmation( + f"\tIs the following the correct item to work with?\n" + f"{variable_to_confirm}", + None + ) + if not answer: - logger.info(f"Variable {variable_to_confirm} not confirmed, exiting.") + logger.info("Variable not confirmed, exiting.") sys.exit() else: - logger.info(f"Variable {variable_to_confirm} confirmed.") + logger.info("Variable confirmed.") def timestamp(): @@ -111,6 +115,7 @@ def timestamp(): class Timer: """ Basic timer""" + def __init__(self): self.start = time.time() self.intermediate = time.time() diff --git a/impresso_commons/utils/daskutils.py b/impresso_commons/utils/daskutils.py index a5697d5..d04a006 100644 --- a/impresso_commons/utils/daskutils.py +++ b/impresso_commons/utils/daskutils.py @@ -11,29 +11,43 @@ --config-file= json configuration dict specifying various arguments """ -import os import logging -import docopt +import os -from dask.diagnostics import ProgressBar import dask.bag as db +import docopt import numpy as np +from dask.bag import Bag +from dask.diagnostics import ProgressBar -from impresso_commons.utils import init_logger -from impresso_commons.utils import Timer from impresso_commons.path.path_s3 import s3_filter_archives -from impresso_commons.utils.s3 import get_bucket, read_jsonlines, readtext_jsonlines -from impresso_commons.utils.s3 import IMPRESSO_STORAGEOPT +from impresso_commons.utils import Timer +from impresso_commons.utils import init_logger from impresso_commons.utils.config_loader import PartitionerConfig +from impresso_commons.utils.s3 import IMPRESSO_STORAGEOPT +from impresso_commons.utils.s3 import get_bucket, read_jsonlines, readtext_jsonlines __author__ = "maudehrmann" logger = logging.getLogger(__name__) -def partitioner(bag, path, nbpart): - """Partition a bag into n partitions and write each partition in a file""" - grouped_items = bag.groupby(lambda x: np.random.randint(500), npartitions=nbpart) +def partitioner(bag: Bag, + path: str, + nb_partitions: int) -> None: + """ + Partition a Dask bag into n partitions and write each to a separate file. + + Args: + bag (dask.bag.Bag): The Dask bag to be partitioned. + path (str): Directory path where partitioned files will be saved. + nb_partitions (int): Number of partitions to create. + + Returns: + None: The function writes partitioned files to the specified path. + """ + grouped_items = bag.groupby(lambda x: np.random.randint(500), + npartitions=nb_partitions) items = grouped_items.map(lambda x: x[1]).flatten() path = os.path.join(path, "*.jsonl.bz2") with ProgressBar(): @@ -41,12 +55,12 @@ def partitioner(bag, path, nbpart): def create_even_partitions( - bucket, - config_newspapers, - output_dir, - local_fs=False, - keep_full=False, - nb_partition=500, + bucket, + config_newspapers, + output_dir, + local_fs=False, + keep_full=False, + nb_partition=500, ): """Convert yearly bz2 archives to even bz2 archives, i.e. partitions. diff --git a/impresso_commons/utils/s3.py b/impresso_commons/utils/s3.py index 8678405..150fe31 100644 --- a/impresso_commons/utils/s3.py +++ b/impresso_commons/utils/s3.py @@ -17,7 +17,7 @@ from smart_open.s3 import iter_bucket from impresso_commons.utils import _get_cores -from utils.utils import bytes_to +from impresso_commons.utils.utils import bytes_to logger = logging.getLogger(__name__) @@ -557,6 +557,7 @@ def list_s3_directories(bucket_name, prefix=''): list: A list of 'directory' names found in the specified bucket and prefix. """ + logger.info(f"Listing 'folders'' of '{bucket_name}' under prefix '{prefix}'") s3 = get_s3_client() result = s3.list_objects_v2( Bucket=bucket_name, Prefix=prefix, Delimiter='/' @@ -568,7 +569,7 @@ def list_s3_directories(bucket_name, prefix=''): prefix['Prefix'][:-1].split("/")[-1] for prefix in result['CommonPrefixes'] ] - + logger.info(f"Returning {len(directories)} directories.") return directories