diff --git a/ostap/parallel/task.py b/ostap/parallel/task.py index adfef39c..a213a435 100644 --- a/ostap/parallel/task.py +++ b/ostap/parallel/task.py @@ -36,8 +36,8 @@ __all__ = ( 'Task' , ## the base class for task 'TaskManager' , ## the base class for task-manager - 'GenericTask' , ## the generic ``templated'' task - 'FuncTask' , ## the simple ``function'' task + 'GenericTask' , ## the generic `templated' task + 'FuncTask' , ## the simple `function' task 'Statistics' , ## helper class to collect statistics 'StatMerger' , ## helper class to merge statistics 'TaskMerger' , ## simple merger for task results @@ -64,8 +64,8 @@ # One can specify following attributes: # - directory: the working directory for the job # - environment: additional environmental variables -# - append_to: append some path-like enviroment varibales -# - prepend_to: prepend some path-like enviroment varibales +# - append_to: append some path-like environment varibales +# - prepend_to: prepend some path-like environment varibales # - dot_in_path: shoud the '.' be added to sys.path? # @author Pere MATO Pere.Meto@cern.ch class Task(object) : @@ -77,10 +77,10 @@ class Task(object) : - process - finalize. One can specify following attributes: - - output = processor ( jobid , item ) # - merger :updated_output = merger ( old_output , new_output ) @@ -243,7 +243,7 @@ def cleanup ( self , value ) : # - append_to : additional variables to be ''appended'' # - prepend_to : additional variables to be ''prepended'' class GenericTask(Task) : - """Generic ``templated'' task for parallel processing. + """Generic `templated' task for parallel processing. One needs to define three functions/functors: - processor : output = processor ( jobid , item ) - merger : updated_output = merger ( old_output , new_output ) @@ -280,6 +280,12 @@ def __init__ ( self , import operator merger = operator.add + assert processor , "GenericTask: `processor' it not defiend!" + + assert merger or collector , \ + "GenericTask: neither `merger' nor `collector' are defined!" + + self.__processor = processor self.__merger = merger self.__collector = collector @@ -292,7 +298,7 @@ def __init__ ( self , self.prepend_to . update ( prepend_to ) self.cleanup = cleanup - + # ========================================================================= ## local initialization (executed once in parent process) @@ -324,25 +330,27 @@ def results ( self ) : # ========================================================================= @property def processor ( self ) : - """``processor'' : the actual function for each subprocess + """`processor' : the actual function for each subprocess - Signature: output = processor ( item ) """ return self.__processor + @property def merger ( self ) : - """``merger'' : the actual fuction to merge results + """`merger' : the actual function to merge results - Signature: updated_output = merger ( old_output , new_output ) """ return self.__merger + @property def collector ( self ) : - """``collector'' : the actual fuction to merge/collect results + """`collector' : the actual function to merge/collect results - Signature: updated_output = collector ( old_output , new_output , jobid ) """ return self.__collector @property def initializer ( self ) : - """``initializer'' : the actual fuction to initialize local output + """`initializer' : the actual function to initialize local output - Signature: output = initializer() """ return self.__initializer @@ -411,13 +419,14 @@ def results ( self ) : @property def merger ( self ) : - """``merger'' : the actual fuction to merge results + """`merger' : the actual fuction to merge results - Signature: updated_output = merger ( old_output , new_output ) """ return self.__merger + @property def initializer ( self ) : - """``initializer'' : the actual fuction to initialize local output + """`initializer' : the actual fuction to initialize local output - Signature: output = initializer() """ return self.__initializer @@ -450,7 +459,7 @@ def __exit__ ( self , *_ ) : self.stop() @property def host ( self ) : - """``host'' : the host where the job executed""" + """`host' : the host where the job executed""" return self.__host def __repr__ ( self ) : @@ -503,7 +512,7 @@ def __iter__ ( self ) : @property def merged ( self ) : - """``merged'' : get the full merged statistic""" + """`merged' : get the full merged statistic""" return self.__merged # ========================================================================= @@ -593,7 +602,7 @@ def __str__ ( self ) : @property def njobs ( self ) : - """``njobs'' : total number of jobs""" + """`njobs' : total number of jobs""" return sum ( s.njobs for s in self.__merged.values() ) __repr__ = __str__ @@ -679,12 +688,12 @@ def merge ( self , result ) : @property def result ( self ) : - """``result'' : the merged results""" + """`result' : the merged results""" return self.__result @property def nmerged ( self ) : - """``nmerged'' : number of merged results""" + """`nmerged' : number of merged results""" return self.__nmerged def __nonzero__ ( self ) : return 0 < self.__nmerged @@ -698,7 +707,7 @@ def __len__ ( self ) : return self.__nmerged # @see Task def task_executor ( item ) : """Helper function to execute the task and collect job execution statistic - - unfortunately due to limitation of ``parallel python'' one cannot + - unfortunately due to limitation of `parallel python' one cannot use python decorators here :-( - see Task """ @@ -788,7 +797,7 @@ def task_executor ( item ) : # use decorators here :-( def func_executor ( item ) : """Helper function to execute the task and collect job execution statistic - - unfornately due to limitation of ``parallel python'' one cannot + - unfornately due to limitation of `parallel python' one cannot use python decorators here :-( """ ## unpack @@ -893,9 +902,9 @@ def __process_func ( self , task , chunks , **kwargs ) : ## mergers for statistics & results if not merger and not collector : - logger.warning ( "Neither ``merger'' nor ``collector'' are specified for merging!") + logger.warning ( "Neither `merger' nor `collector' are specified for merging!") elif merger and collector : - logger.warning ( "Both ``merger'' and ``collector'' are specified for merging!") + logger.warning ( "Both `merger' and `collector' are specified for merging!") ## mergers for statistics merged_stat = StatMerger () @@ -996,17 +1005,17 @@ def __process_task ( self , task , chunks , **kwargs ) : @property def silent ( self ) : - """``silent'' : silent processing?""" + """`silent' : silent processing?""" return self.__silent @property def progress ( self ) : - """``progress'' : show progress bar?""" + """`progress' : show progress bar?""" return self.__progress @property def ncpus ( self ) : - """``ncpus'' : number of CPUs""" + """`ncpus' : number of CPUs""" return self.__ncpus # =========================================================================== @@ -1043,9 +1052,9 @@ def iexecute ( self , job , jobs_args , progress = False ) : >>> for result in mgr.iexecute ( job , args ) : ... ... - It is a ``minimal'' interface + It is a `bare minimal' interface - no statistics - - no summary prin + - no summary printout - no merging of results """ return None