Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
VanyaBelyaev committed May 2, 2024
1 parent ee23338 commit b572d38
Showing 1 changed file with 43 additions and 34 deletions.
77 changes: 43 additions & 34 deletions ostap/parallel/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
__all__ = (
'Task' , ## the base class for task
'TaskManager' , ## the base class for task-manager
'GenericTask' , ## the generic ``templated'' task
'FuncTask' , ## the simple ``function'' task
'GenericTask' , ## the generic `templated' task
'FuncTask' , ## the simple `function' task
'Statistics' , ## helper class to collect statistics
'StatMerger' , ## helper class to merge statistics
'TaskMerger' , ## simple merger for task results
Expand All @@ -64,8 +64,8 @@
# One can specify following attributes:
# - <code>directory</code>: the working directory for the job
# - <code>environment</code>: additional environmental variables
# - <code>append_to</code>: append some path-like enviroment varibales
# - <code>prepend_to</code>: prepend some path-like enviroment varibales
# - <code>append_to</code>: append some path-like environment varibales
# - <code>prepend_to</code>: prepend some path-like environment varibales
# - <code>dot_in_path</code>: shoud the '.' be added to sys.path?
# @author Pere MATO [email protected]
class Task(object) :
Expand All @@ -77,10 +77,10 @@ class Task(object) :
- process
- finalize.
One can specify following attributes:
- <directory : the working directory for the job
- directory : the working directory for the job
- environment : additional environmental variables
- append_to : append some path-like enviroment varibales
- prepend_to : prepend some path-like enviroment varibales
- append_to : append some path-like environment varibales
- prepend_to : prepend some path-like environment varibales
- dot_in_path : shoud the '.' be added to sys.path?
"""
__metaclass__ = abc.ABCMeta
Expand Down Expand Up @@ -155,12 +155,12 @@ def __call__ ( self , jobid , *params ) :

@property
def output ( self ) :
"""`output' : get a task output (it invokes the method `results` """
"""`output' : get a task output (internally it invokes the method `results` """
return self.results()

@property
def directory ( self ) :
"""``Directory'' : directory where job starts"""
"""`Directory' : directory where job starts"""
return self.__directory

@directory.setter
Expand All @@ -169,12 +169,12 @@ def directory ( self , value ) :

@property
def environment ( self ) :
"""``environment'' : additional environment for the job"""
"""`environment' : additional environment for the job"""
return self.__environment

@environment.setter
def environment ( self , value ) :
self.__environment.update (value )
self.__environment.update ( value )

@property
def append_to ( self ) :
Expand Down Expand Up @@ -234,7 +234,7 @@ def cleanup ( self , value ) :

# =============================================================================
## @class GenericTask
# Generic ``templated'' task for Parallel processing
# Generic `templated' task for Parallel processing
# One needs to define three functions/functors:
# - processor :<code> output = processor ( jobid , item ) </code>
# - merger :<code>updated_output = merger ( old_output , new_output ) </code>
Expand All @@ -243,7 +243,7 @@ def cleanup ( self , value ) :
# - append_to : additional variables to be ''appended''
# - prepend_to : additional variables to be ''prepended''
class GenericTask(Task) :
"""Generic ``templated'' task for parallel processing.
"""Generic `templated' task for parallel processing.
One needs to define three functions/functors:
- processor : output = processor ( jobid , item )
- merger : updated_output = merger ( old_output , new_output )
Expand Down Expand Up @@ -280,6 +280,12 @@ def __init__ ( self ,
import operator
merger = operator.add

assert processor , "GenericTask: `processor' it not defiend!"

assert merger or collector , \
"GenericTask: neither `merger' nor `collector' are defined!"


self.__processor = processor
self.__merger = merger
self.__collector = collector
Expand All @@ -292,7 +298,7 @@ def __init__ ( self ,
self.prepend_to . update ( prepend_to )

self.cleanup = cleanup


# =========================================================================
## local initialization (executed once in parent process)
Expand Down Expand Up @@ -324,25 +330,27 @@ def results ( self ) :
# =========================================================================
@property
def processor ( self ) :
"""``processor'' : the actual function for each subprocess
"""`processor' : the actual function for each subprocess
- Signature: output = processor ( item )
"""
return self.__processor

@property
def merger ( self ) :
"""``merger'' : the actual fuction to merge results
"""`merger' : the actual function to merge results
- Signature: updated_output = merger ( old_output , new_output )
"""
return self.__merger

@property
def collector ( self ) :
"""``collector'' : the actual fuction to merge/collect results
"""`collector' : the actual function to merge/collect results
- Signature: updated_output = collector ( old_output , new_output , jobid )
"""
return self.__collector
@property
def initializer ( self ) :
"""``initializer'' : the actual fuction to initialize local output
"""`initializer' : the actual function to initialize local output
- Signature: output = initializer()
"""
return self.__initializer
Expand Down Expand Up @@ -411,13 +419,14 @@ def results ( self ) :

@property
def merger ( self ) :
"""``merger'' : the actual fuction to merge results
"""`merger' : the actual fuction to merge results
- Signature: updated_output = merger ( old_output , new_output )
"""
return self.__merger

@property
def initializer ( self ) :
"""``initializer'' : the actual fuction to initialize local output
"""`initializer' : the actual fuction to initialize local output
- Signature: output = initializer()
"""
return self.__initializer
Expand Down Expand Up @@ -450,7 +459,7 @@ def __exit__ ( self , *_ ) : self.stop()

@property
def host ( self ) :
"""``host'' : the host where the job executed"""
"""`host' : the host where the job executed"""
return self.__host

def __repr__ ( self ) :
Expand Down Expand Up @@ -503,7 +512,7 @@ def __iter__ ( self ) :

@property
def merged ( self ) :
"""``merged'' : get the full merged statistic"""
"""`merged' : get the full merged statistic"""
return self.__merged

# =========================================================================
Expand Down Expand Up @@ -593,7 +602,7 @@ def __str__ ( self ) :

@property
def njobs ( self ) :
"""``njobs'' : total number of jobs"""
"""`njobs' : total number of jobs"""
return sum ( s.njobs for s in self.__merged.values() )

__repr__ = __str__
Expand Down Expand Up @@ -679,12 +688,12 @@ def merge ( self , result ) :

@property
def result ( self ) :
"""``result'' : the merged results"""
"""`result' : the merged results"""
return self.__result

@property
def nmerged ( self ) :
"""``nmerged'' : number of merged results"""
"""`nmerged' : number of merged results"""
return self.__nmerged

def __nonzero__ ( self ) : return 0 < self.__nmerged
Expand All @@ -698,7 +707,7 @@ def __len__ ( self ) : return self.__nmerged
# @see Task
def task_executor ( item ) :
"""Helper function to execute the task and collect job execution statistic
- unfortunately due to limitation of ``parallel python'' one cannot
- unfortunately due to limitation of `parallel python' one cannot
use python decorators here :-(
- see Task
"""
Expand Down Expand Up @@ -788,7 +797,7 @@ def task_executor ( item ) :
# use decorators here :-(
def func_executor ( item ) :
"""Helper function to execute the task and collect job execution statistic
- unfornately due to limitation of ``parallel python'' one cannot
- unfornately due to limitation of `parallel python' one cannot
use python decorators here :-(
"""
## unpack
Expand Down Expand Up @@ -893,9 +902,9 @@ def __process_func ( self , task , chunks , **kwargs ) :

## mergers for statistics & results
if not merger and not collector :
logger.warning ( "Neither ``merger'' nor ``collector'' are specified for merging!")
logger.warning ( "Neither `merger' nor `collector' are specified for merging!")
elif merger and collector :
logger.warning ( "Both ``merger'' and ``collector'' are specified for merging!")
logger.warning ( "Both `merger' and `collector' are specified for merging!")

## mergers for statistics
merged_stat = StatMerger ()
Expand Down Expand Up @@ -996,17 +1005,17 @@ def __process_task ( self , task , chunks , **kwargs ) :

@property
def silent ( self ) :
"""``silent'' : silent processing?"""
"""`silent' : silent processing?"""
return self.__silent

@property
def progress ( self ) :
"""``progress'' : show progress bar?"""
"""`progress' : show progress bar?"""
return self.__progress

@property
def ncpus ( self ) :
"""``ncpus'' : number of CPUs"""
"""`ncpus' : number of CPUs"""
return self.__ncpus

# ===========================================================================
Expand Down Expand Up @@ -1043,9 +1052,9 @@ def iexecute ( self , job , jobs_args , progress = False ) :
>>> for result in mgr.iexecute ( job , args ) :
...
...
It is a ``minimal'' interface
It is a `bare minimal' interface
- no statistics
- no summary prin
- no summary printout
- no merging of results
"""
return None
Expand Down

0 comments on commit b572d38

Please sign in to comment.