From e745151dc780bc97d1dc61494bb618b73127ad24 Mon Sep 17 00:00:00 2001 From: Greg Werbin Date: Tue, 18 Apr 2023 13:49:30 -0400 Subject: [PATCH] Add initializer and initargs Initial draft implementation of https://github.com/nalepae/pandarallel/issues/231 --- pandarallel/core.py | 56 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/pandarallel/core.py b/pandarallel/core.py index c54ec14..98c7a90 100644 --- a/pandarallel/core.py +++ b/pandarallel/core.py @@ -205,6 +205,8 @@ def parallelize_with_memory_file_system( nb_requested_workers: int, data_type: Type[DataType], progress_bars_type: ProgressBarsType, + initializer: Callable[..., object] | None = None, + initargs: tuple[object, ...] = (), ): def closure( data: Any, @@ -291,7 +293,7 @@ def closure( ) in enumerate(zip(input_files, output_files)) ] - pool = CONTEXT.Pool(nb_workers) + pool = CONTEXT.Pool(nb_workers, initializer, initargs) results_promise = pool.starmap_async(wrapped_work_function, work_args_list) pool.close() @@ -355,6 +357,8 @@ def parallelize_with_pipe( nb_requested_workers: int, data_type: Type[DataType], progress_bars_type: ProgressBarsType, + initializer: Callable[..., object] | None = None, + initargs: tuple[object, ...] = (), ): def closure( data: Any, @@ -416,7 +420,7 @@ def closure( for worker_index, chunk in enumerate(chunks) ] - pool = CONTEXT.Pool(nb_workers) + pool = CONTEXT.Pool(nb_workers, initializer, initargs) results_promise = pool.starmap_async(wrapped_work_function, work_args_list) pool.close() @@ -457,6 +461,8 @@ def initialize( progress_bar=False, verbose=2, use_memory_fs: Optional[bool] = None, + initializer: Optional[Callable[..., object]] = None, + initargs: Tuple[object, ...] = (), ) -> None: show_progress_bars = progress_bar is_memory_fs_available = Path(MEMORY_FS_ROOT).exists() @@ -521,36 +527,68 @@ def initialize( # DataFrame pd.DataFrame.parallel_apply = parallelize( - nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function + nb_workers, + DataFrame.Apply, + progress_bars_in_user_defined_function, + initializer=initializer, + initargs=initargs, ) pd.DataFrame.parallel_applymap = parallelize( nb_workers, DataFrame.ApplyMap, progress_bars_in_user_defined_function_multiply_by_number_of_columns, + initializer=initializer, + initargs=initargs, ) # DataFrame GroupBy PandaDataFrameGroupBy.parallel_apply = parallelize( - nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function + nb_workers, + DataFrameGroupBy.Apply, + progress_bars_in_user_defined_function, + initializer=initializer, + initargs=initargs, ) # Expanding GroupBy PandasExpandingGroupby.parallel_apply = parallelize( - nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function + nb_workers, + ExpandingGroupBy.Apply, + progress_bars_in_work_function, + initializer=initializer, + initargs=initargs, ) # Rolling GroupBy PandasRollingGroupby.parallel_apply = parallelize( - nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function + nb_workers, + RollingGroupBy.Apply, + progress_bars_in_work_function, + initializer=initializer, + initargs=initargs, ) # Series pd.Series.parallel_apply = parallelize( - nb_workers, Series.Apply, progress_bars_in_user_defined_function + nb_workers, + Series.Apply, + progress_bars_in_user_defined_function, + initializer=initializer, + initargs=initargs, + ) + pd.Series.parallel_map = parallelize( + nb_workers, + Series.Map, + show_progress_bars, + initializer=initializer, + initargs=initargs, ) - pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars) # Series Rolling pd.core.window.Rolling.parallel_apply = parallelize( - nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function + nb_workers, + SeriesRolling.Apply, + progress_bars_in_user_defined_function, + initializer=initializer, + initargs=initargs, )