diff --git a/ostap/parallel/parallel_ipyparallel.py b/ostap/parallel/parallel_ipyparallel.py index afd21133..618a48b2 100644 --- a/ostap/parallel/parallel_ipyparallel.py +++ b/ostap/parallel/parallel_ipyparallel.py @@ -111,7 +111,7 @@ def __init__( self , # - no summary printout # - no merging of results def iexecute ( self , job , jobs_args , progress = False ) : - """Process the bare `executor` function + """ Process the bare `executor` function >>> mgr = WorkManager ( .... ) >>> job = ... >>> args = ... @@ -127,23 +127,22 @@ def iexecute ( self , job , jobs_args , progress = False ) : silent = self.silent or not progress with warnings.catch_warnings() : warnings.simplefilter('ignore', category=UserWarning) - cluster = ipp.Cluster ( **self.__kwargs ) - with cluster : + with ipp.Cluster ( **self.__kwargs ) as cluster : - if self.__use_dill : - view = cluster[:] - view.use_dill() - elif self.__balanced : - view = cluster.load_balanced_view() - else : - view = cluster[:] - - results = view.map_async ( job , jobs_args ) - - for result in progress_bar ( results , - description = "# Jobs execution" , - silent = silent ) : - yield result + if self.__use_dill : + view = cluster[:] + view.use_dill () + elif self.__balanced : + view = cluster.load_balanced_view() + else : + view = cluster[:] + + results = view.map_async ( job , jobs_args ) + + for result in progress_bar ( results , + description = "# Jobs execution" , + silent = silent ) : + yield result # ========================================================================- ## get PP-statistics if/when possible diff --git a/ostap/parallel/tests/test_parallel_ipyparallel.py b/ostap/parallel/tests/test_parallel_ipyparallel.py index 6ca0e43f..86b07740 100644 --- a/ostap/parallel/tests/test_parallel_ipyparallel.py +++ b/ostap/parallel/tests/test_parallel_ipyparallel.py @@ -91,15 +91,14 @@ def test_ipyparallel_function () : result = None with warnings.catch_warnings() : warnings.simplefilter('ignore', category=UserWarning) - cluster = ipp.Cluster () - with cluster : + with ipp.Cluster () as cluster : - view = cluster.load_balanced_view() - results = view.map_async ( make_histos , zip ( count () , inputs ) ) - - for r in progress_bar ( results ) : - if not result : result = r - else : result.Add ( r ) + view = cluster.load_balanced_view() + results = view.map_async ( make_histos , zip ( count () , inputs ) ) + + for r in progress_bar ( results ) : + if not result : result = r + else : result.Add ( r ) with use_canvas ( 'test_ipyparallel_function' , wait = 2 ) : logger.info ( "Histogram is %s" % result.dump ( 80 , 20 ) ) @@ -135,18 +134,17 @@ def test_ipyparallel_callable () : result = None with warnings.catch_warnings() : warnings.simplefilter('ignore', category=UserWarning) - cluster = ipp.Cluster () - with cluster : - - ##view = cluster.load_balanced_view() - view = cluster[:] - view.use_dill() + with ipp.Cluster () as cluster : - results = view.map_async ( mh , zip ( count () , inputs ) ) - - for r in progress_bar ( results ) : - if not result : result = r - else : result.Add ( r ) + ##view = cluster.load_balanced_view() + view = cluster[:] + view.use_dill() + + results = view.map_async ( mh , zip ( count () , inputs ) ) + + for r in progress_bar ( results ) : + if not result : result = r + else : result.Add ( r ) with use_canvas ( 'test_ipyparallel_function' , wait = 2 ) : logger.info ( "Histogram is %s" % result.dump ( 80 , 20 ) )