Skip to content

Commit

Permalink
Add initializer and initargs
Browse files Browse the repository at this point in the history
Initial draft implementation of nalepae#231
  • Loading branch information
gwerbin authored Apr 18, 2023
1 parent b78c63c commit e745151
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions pandarallel/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ def parallelize_with_memory_file_system(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
initializer: Callable[..., object] | None = None,
initargs: tuple[object, ...] = (),
):
def closure(
data: Any,
Expand Down Expand Up @@ -291,7 +293,7 @@ def closure(
) in enumerate(zip(input_files, output_files))
]

pool = CONTEXT.Pool(nb_workers)
pool = CONTEXT.Pool(nb_workers, initializer, initargs)
results_promise = pool.starmap_async(wrapped_work_function, work_args_list)

pool.close()
Expand Down Expand Up @@ -355,6 +357,8 @@ def parallelize_with_pipe(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
initializer: Callable[..., object] | None = None,
initargs: tuple[object, ...] = (),
):
def closure(
data: Any,
Expand Down Expand Up @@ -416,7 +420,7 @@ def closure(
for worker_index, chunk in enumerate(chunks)
]

pool = CONTEXT.Pool(nb_workers)
pool = CONTEXT.Pool(nb_workers, initializer, initargs)
results_promise = pool.starmap_async(wrapped_work_function, work_args_list)
pool.close()

Expand Down Expand Up @@ -457,6 +461,8 @@ def initialize(
progress_bar=False,
verbose=2,
use_memory_fs: Optional[bool] = None,
initializer: Optional[Callable[..., object]] = None,
initargs: Tuple[object, ...] = (),
) -> None:
show_progress_bars = progress_bar
is_memory_fs_available = Path(MEMORY_FS_ROOT).exists()
Expand Down Expand Up @@ -521,36 +527,68 @@ def initialize(

# DataFrame
pd.DataFrame.parallel_apply = parallelize(
nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function
nb_workers,
DataFrame.Apply,
progress_bars_in_user_defined_function,
initializer=initializer,
initargs=initargs,
)
pd.DataFrame.parallel_applymap = parallelize(
nb_workers,
DataFrame.ApplyMap,
progress_bars_in_user_defined_function_multiply_by_number_of_columns,
initializer=initializer,
initargs=initargs,
)

# DataFrame GroupBy
PandaDataFrameGroupBy.parallel_apply = parallelize(
nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function
nb_workers,
DataFrameGroupBy.Apply,
progress_bars_in_user_defined_function,
initializer=initializer,
initargs=initargs,
)

# Expanding GroupBy
PandasExpandingGroupby.parallel_apply = parallelize(
nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function
nb_workers,
ExpandingGroupBy.Apply,
progress_bars_in_work_function,
initializer=initializer,
initargs=initargs,
)

# Rolling GroupBy
PandasRollingGroupby.parallel_apply = parallelize(
nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function
nb_workers,
RollingGroupBy.Apply,
progress_bars_in_work_function,
initializer=initializer,
initargs=initargs,
)

# Series
pd.Series.parallel_apply = parallelize(
nb_workers, Series.Apply, progress_bars_in_user_defined_function
nb_workers,
Series.Apply,
progress_bars_in_user_defined_function,
initializer=initializer,
initargs=initargs,
)
pd.Series.parallel_map = parallelize(
nb_workers,
Series.Map,
show_progress_bars,
initializer=initializer,
initargs=initargs,
)
pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars)

# Series Rolling
pd.core.window.Rolling.parallel_apply = parallelize(
nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function
nb_workers,
SeriesRolling.Apply,
progress_bars_in_user_defined_function,
initializer=initializer,
initargs=initargs,
)

0 comments on commit e745151

Please sign in to comment.