From ba650ede4b8f0f1e677c43f4d354839a07ef418f Mon Sep 17 00:00:00 2001 From: Vanya Belyaev Date: Wed, 24 Apr 2024 14:56:46 +0200 Subject: [PATCH] reshuffle pickling --- ostap/io/pickling.py | 46 +++++++- ostap/io/sqlitedict.py | 7 +- ostap/io/zipshelve.py | 15 --- ostap/parallel/parallel.py | 21 +++- .../parallel/tests/test_parallel_addbranch.py | 108 ++++++++++-------- ostap/plotting/canvas.py | 33 +++++- 6 files changed, 153 insertions(+), 77 deletions(-) diff --git a/ostap/io/pickling.py b/ostap/io/pickling.py index 7e1e3f78..37f8e3ed 100644 --- a/ostap/io/pickling.py +++ b/ostap/io/pickling.py @@ -20,6 +20,8 @@ 'Pickler' , 'Unpickler' , 'BytesIO' , + 'dumps' , + 'loads' , ) # ============================================================================= import os, sys @@ -28,15 +30,23 @@ if '__main__' == __name__ : logger = getLogger ( 'ostap.io.pickling' ) else : logger = getLogger ( __name__ ) # ============================================================================= -if sys.version_info < ( 3 , 0 ) : +if (3, 0 ) <= sys.version_info : + from pickle import ( Pickler, Unpickler, + DEFAULT_PROTOCOL, HIGHEST_PROTOCOL, + dumps, loads, + PicklingError, UnpicklingError ) +else : DEFAULT_PROTOCOL = 2 try: - from cPickle import Pickler, Unpickler, HIGHEST_PROTOCOL + from cPickle import ( Pickler, Unpickler, HIGHEST_PROTOCOL, + dumps, loads, + PicklingError, UnpicklingError ) except ImportError: - from pickle import Pickler, Unpickler, HIGHEST_PROTOCOL + from pickle import ( Pickler, Unpickler, HIGHEST_PROTOCOL, + dumps, loads, + PicklingError, UnpicklingError ) DEFAULT_PROTOCOL = min ( DEFAULT_PROTOCOL , HIGHEST_PROTOCOL ) -else : - from pickle import Pickler, Unpickler, DEFAULT_PROTOCOL, HIGHEST_PROTOCOL + # ============================================================================= try : from io import BytesIO @@ -46,6 +56,32 @@ except ImportError: from StringIO import StringIO as BytesIO # ============================================================================= +## Primitive check if the object casn be pickled and unpickled +def pickles ( obj ) : + """Primitive check if the object casn be pickled and unpickled + """ + try: + pkl = loads ( dumps ( obj ) ) + return pkl == obj + except ( PicklingError, UnpicklingError ) : + return False + +# ============================================================================= +## Check pickling of an object across another process +def check ( obj ): + """Check pickling of an object across another process + """ + import subprocess + fail = True + try: + _obj = dumps(obj,) + except PicklingError : + return None + ## + msg = "%s -c import pickle; print(pickle.loads(%s))" % ( python , repr ( _obj ) ) + return subprocess.call ( msg.split ( None , 2 ) ) + +# ============================================================================= ## helper function to get the protocol def get_protocol ( p ) : """helper function to get the protocol""" diff --git a/ostap/io/sqlitedict.py b/ostap/io/sqlitedict.py index 9abd001c..6d7f32b7 100755 --- a/ostap/io/sqlitedict.py +++ b/ostap/io/sqlitedict.py @@ -80,10 +80,6 @@ def reraise(tp, value, tb=None): raise value.with_traceback(tb) raise value -try: - from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL -except ImportError: - from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL # some Python 3 vs 2 imports try: @@ -96,6 +92,9 @@ def reraise(tp, value, tb=None): except ImportError: from Queue import Queue + +# ============================================================================= +from ostap.io.pickling import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL logger = logging.getLogger(__name__) diff --git a/ostap/io/zipshelve.py b/ostap/io/zipshelve.py index 311e2170..3bbf32cb 100755 --- a/ostap/io/zipshelve.py +++ b/ostap/io/zipshelve.py @@ -143,21 +143,6 @@ # ============================================================================= logger.debug ( "Simple generic (c)Pickle-based ``zipped''-database" ) # ============================================================================= -from sys import version_info as python_version -# ============================================================================= -## try: -## from cPickle import Pickler, Unpickler -## except ImportError: -## from pickle import Pickler, Unpickler -## # ============================================================================= -## if 2 < python_version.major : -## from io import BytesIO -## else : -## try: -## from cStringIO import StringIO as BytesIO -## except ImportError: -## from StringIO import StringIO as BytesIO -# ============================================================================== import os, sys, shelve, shutil import zlib ## use zlib to compress DB-content from ostap.io.compress_shelve import CompressShelf, ENCODING, PROTOCOL, HIGHEST_PROTOCOL diff --git a/ostap/parallel/parallel.py b/ostap/parallel/parallel.py index c6633ae1..03f55546 100644 --- a/ostap/parallel/parallel.py +++ b/ostap/parallel/parallel.py @@ -22,11 +22,11 @@ 'GenericTask' , ## very generic "template" tasl ) # ============================================================================= -import sys, os, warnings from ostap.parallel.task import Task, GenericTask from ostap.utils.basic import has_env as ostap_hasenv from ostap.utils.basic import get_env as ostap_getenv from ostap.logger.logger import getLogger +import sys, os, warnings if '__main__' == __name__ : logger = getLogger ( 'ostap.parallel.parallel') else : logger = getLogger ( __name__ ) # ============================================================================= @@ -125,6 +125,25 @@ from ostap.parallel.parallel_gaudi import WorkManager logger.debug ('Use TaskManager from GaudiMP.Parallel' ) + worker = 'GAUDI' + +# ============================================================================= +## check if object can be pickled +def pickles ( obj ) : + if dill and WorkManager and worker == 'PATHOS' : + return dill.pickles ( obj ) + from ostap.io.pickling import pickles as _pickles + return _pickles ( obj ) + +# ============================================================================= +## Check pickling of an object across another process +def check ( obj ): + """Check pickling of an object across another process + """ + if dill and WorkManager and worker == 'PATHOS' : + return dill.check ( obj ) + from ostap.io.pickling import check as _check + return _check ( obj ) # ============================================================================= if '__main__' == __name__ : diff --git a/ostap/parallel/tests/test_parallel_addbranch.py b/ostap/parallel/tests/test_parallel_addbranch.py index 16f2af6c..b19d36ee 100755 --- a/ostap/parallel/tests/test_parallel_addbranch.py +++ b/ostap/parallel/tests/test_parallel_addbranch.py @@ -30,7 +30,7 @@ from ostap.math.make_fun import make_fun1, make_fun2, make_fun3 from ostap.utils.timing import timing from ostap.utils.progress_bar import progress_bar -from ostap.parallel.parallel import DILL_PY3_issue +from ostap.parallel.parallel import pickles import ostap.parallel.parallel_add_branch import ROOT, math, random, array # ============================================================================= @@ -95,9 +95,6 @@ def prepare_data ( nfiles = 50 , nentries = 500 ) : ## top-level function def fun_ftwo ( x ) : return 2 * x -## top-level callabke -class CALL(object): - def __call__ ( self , x ) : return 2.0 * x # ============================================================================= # Many ways to add branch into TTree/Tchain @@ -110,7 +107,7 @@ def test_addbranch() : - using string formula (TTreeFormula-based) - using pure python function - using histogram/function - - using histogram sampling + - using histogram sampling """ ## files = prepare_data ( 100 , 1000 ) @@ -136,8 +133,8 @@ def test_addbranch() : with timing ('simultaneous' , logger = logger ) : chain = data.chain chain.padd_new_branch ( { 'Et1' : 'sqrt(pt*pt+mass*mass)' , - 'Et2' : 'sqrt(pt*pt+mass*mass)*2' , - 'Et3' : 'sqrt(pt*pt+mass*mass)*3' } , None ) + 'Et2' : 'sqrt(pt*pt+mass*mass)*2' , + 'Et3' : 'sqrt(pt*pt+mass*mass)*3' } , None ) ## reload the chain and check: logger.info ( 'With formula:\n%s' % data.chain.table ( prefix = '# ' ) ) assert 'Et1' in data.chain , "Branch `Et1' is not here!" @@ -148,15 +145,15 @@ def test_addbranch() : ## 2) add new branch as pure python function # ========================================================================= with timing ( 'pyfunc' , logger = logger ) : - if DILL_PY3_issue : - logger.info ( 'The test is disabled (lambda cannot be pickled)' ) - else : - et2 = lambda tree : tree.pt**2 + tree.mass**2 + et2 = lambda tree : tree.pt**2 + tree.mass**2 + if pickles ( et2 ) : chain = data.chain chain.padd_new_branch ( 'et2', et2 ) ## reload the chain and check: logger.info ( 'With python:\n%s' % data.chain.table ( prefix = '# ' ) ) assert 'et2' in data.chain , "Branch `et2' is not here!" + else : + logger.warning ( "The test 'pyfunc' is disabled (lambda cannot be pickled)" ) # ========================================================================= ## 3) add new branch as histogram-function @@ -165,17 +162,20 @@ def test_addbranch() : h1 = ROOT.TH1D ( hID () , 'some pt-correction' , 100 , 0 , 10 ) h1 += lambda x : 1.0 + math.tanh( 0.2* ( x - 5 ) ) from ostap.trees.funcs import FuncTH1 - ptw = FuncTH1 ( h1 , 'pt' ) - chain = data.chain - chain.padd_new_branch ( 'ptw', ptw ) - ## reload the chain and check: - logger.info ( 'With histogram:\n%s' % data.chain.table ( prefix = '# ' ) ) - assert 'ptw' in data.chain , "Branch `ptw' is not here!" - + ptw = FuncTH1 ( h1 , 'pt' ) + if pickles ( ptw ) : + chain = data.chain + chain.padd_new_branch ( 'ptw', ptw ) + ## reload the chain and check: + logger.info ( 'With histogram:\n%s' % data.chain.table ( prefix = '# ' ) ) + assert 'ptw' in data.chain , "Branch `ptw' is not here!" + else : + logger.warning ( "The test 'histo-1' is disabled (object cannot be pickled)" ) + # ========================================================================= ## 4) add several functions simultanepusly # ========================================================================= - with timing ('histo-1' , logger = logger ) : + with timing ('histo-3' , logger = logger ) : from ostap.trees.funcs import FuncTH1 hh = ROOT.TH1D ( hID() , 'some pt-correction' , 100 , 0 , 10 ) h1 = hh + ( lambda x : 1.0 + math.tanh ( 0.1 * ( x - 5 ) ) ) @@ -185,37 +185,46 @@ def test_addbranch() : h3 = h1 + ( lambda x : 1.0 + math.tanh ( 0.3 * ( x - 5 ) ) ) ptw3 = FuncTH1 ( h3 , 'pt' ) chain = data.chain - brs = { 'ptw1' : ptw1 , 'ptw2' : ptw2 , 'ptw3' : ptw1 } - chain.padd_new_branch ( None , brs ) - ## reload the chain and check: - logger.info ( 'With histogram:\n%s' % data.chain.table ( prefix = '# ' ) ) - assert 'ptw' in data.chain , "Branch `ptw1' is not here!" - assert 'ptw' in data.chain , "Branch `ptw2' is not here!" - assert 'ptw' in data.chain , "Branch `ptw3' is not here!" - + brs = { 'ptw1' : ptw1 , 'ptw2' : ptw2 , 'ptw3' : ptw1 } + if pickles ( brs ) : + chain.padd_new_branch ( None , brs ) + ## reload the chain and check: + logger.info ( 'With histogram:\n%s' % data.chain.table ( prefix = '# ' ) ) + assert 'ptw' in data.chain , "Branch `ptw1' is not here!" + assert 'ptw' in data.chain , "Branch `ptw2' is not here!" + assert 'ptw' in data.chain , "Branch `ptw3' is not here!" + else : + logger.warning ( "The test 'histo-2' is disabled (object cannot be pickled)" ) + # ========================================================================= ## 5) add the variable sampled from the histogram # ========================================================================= with timing ('histo-2' , logger = logger ) : h2 = ROOT.TH1D( hID() , 'Gauss' , 120 , -6 , 6 ) - for i in range ( 100000 ) : h2.Fill ( random.gauss ( 0 , 1 ) ) - chain = data.chain - chain.padd_new_branch ( 'hg', h2 ) - ## reload the chain and check: - logger.info ( 'With sampled:\n%s' % data.chain.table ( prefix = '# ' ) ) - assert 'hg' in data.chain , "Branch `hg' is not here!" - + for i in range ( 100000 ) : h2.Fill ( random.gauss ( 0 , 1 ) ) + if pickles ( h2 ) : + chain = data.chain + chain.padd_new_branch ( 'hg', h2 ) + ## reload the chain and check: + logger.info ( 'With sampled:\n%s' % data.chain.table ( prefix = '# ' ) ) + assert 'hg' in data.chain , "Branch `hg' is not here!" + else : + logger.warning ( "The test 'histo-2' is disabled (object cannot be pickled)" ) + # ========================================================================= ## 6) python function again # ========================================================================= with timing ('gauss' , logger = logger ) : def gauss ( *_ ) : return random.gauss(0,1) - chain = data.chain - chain.padd_new_branch ( 'gauss', gauss ) - ## reload the chain and check: - logger.info ( 'With gauss:\n%s' % data.chain.table ( prefix = '# ' ) ) - assert 'gauss' in data.chain , "Branch `gauss' is not here!" - + chain = data.chain + if pickles ( gauss ) : + chain.padd_new_branch ( 'gauss', gauss ) + ## reload the chain and check: + logger.info ( 'With gauss:\n%s' % data.chain.table ( prefix = '# ' ) ) + assert 'gauss' in data.chain , "Branch `gauss' is not here!" + else : + logger.warning ( "The test 'gauss' is disabled (object cannot be pickled)" ) + # ========================================================================= ## 7) add numpy array # ========================================================================= @@ -224,7 +233,6 @@ def gauss ( *_ ) : return random.gauss(0,1) except ImportError : numpy = None - if numpy : ## ATTETNION! with timing ('numpy float16' , logger = logger ) : @@ -255,7 +263,7 @@ def gauss ( *_ ) : return random.gauss(0,1) ## adata = numpy.full ( 10000 , -1 , dtype = numpy.int8 ) ## chain = data.chain ## chain.padd_new_branch ( 'np_i8' , adata ) - ## ## reload the chain and check: + ## ## reload the chain and pickles: ## logger.info ( 'With numpy.int8:\n%s' % data.chain.table ( prefix = '# ' ) ) ## assert 'np_i8' in data.chain , "Branch `np_i8' is not here!" @@ -263,7 +271,7 @@ def gauss ( *_ ) : return random.gauss(0,1) ## adata = numpy.full ( 10000 , +2 , dtype = numpy.uint8 ) ## chain = data.chain ## chain.padd_new_branch ( 'np_ui8' , adata ) - ## ## reload the chain and check: + ## ## reload the chain and pickles: ## logger.info ( 'With numpy.uint8:\n%s' % data.chain.table ( prefix = '# ' ) ) ## assert 'np_ui8' in data.chain , "Branch `np_ui8' is not here!" @@ -287,7 +295,7 @@ def gauss ( *_ ) : return random.gauss(0,1) adata = numpy.full( 10000 , -5 , dtype = numpy.int32 ) chain = data.chain chain.padd_new_branch ( 'np_i32' , adata ) - ## reload the chain and check: + ## reload the chain and pickles: logger.info ( 'With numpy.int32:\n%s' % data.chain.table ( prefix = '# ' ) ) assert 'np_i32' in data.chain , "Branch `np_i32' is not here!" @@ -327,9 +335,9 @@ def gauss ( *_ ) : return random.gauss(0,1) chain = data.chain vname = 'arr_%s' % l chain.padd_new_branch ( vname , adata ) - ## reload the chain and check: - logger.info ( "With array '%s':\n%s" % ( l , data.chain.table ( prefix = '# ' ) ) ) - assert vname in data.chain , "Branch `%s' is not here!" % vname + ## reload the chain : + logger.info ( "With array '%s':\n%s" % ( l , data.chain.table ( prefix = '# ' ) ) ) + assert vname in data.chain , "Branch `%s' is not here!" % vname ## add function @@ -367,7 +375,11 @@ def gauss ( *_ ) : return random.gauss(0,1) with timing ('1D-callable' , logger = logger ) : logger.warning ('1D-callable is not allowed for parallel add_branch' ) - + + ## top-level callabke + ## class CALL(object): + ## def __call__ ( self , x ) : return 2.0 * x + ## ftwo = CALL() ## fun = ( make_fun1 ( ftwo , forcepc = True ) , 'pt' ) diff --git a/ostap/plotting/canvas.py b/ostap/plotting/canvas.py index 327f2833..b30990fe 100755 --- a/ostap/plotting/canvas.py +++ b/ostap/plotting/canvas.py @@ -54,7 +54,8 @@ def getCanvas ( name = 'glCanvas' , ## canvas name title = 'Ostap' , ## canvas title width = canvas_width , ## canvas width - height = canvas_height ) : ## canvas height + height = canvas_height , ## canvas height + **kwargs ) : ## other properties """Get create canvas/create new canvas >>> cnv = getCanvas ( 'glnewCanvas' , width = 1200 , height = 1000 ) @@ -64,7 +65,8 @@ def getCanvas ( name = 'glCanvas' , ## canvas name cnvlst = ROOT.gROOT.GetListOfCanvases() cnv = cnvlst.get ( name , None ) if cnv and isinstance ( cnv , ROOT.TCanvas ) : - _canvases.append ( cnv ) + _canvases.append ( cnv ) + set_pad ( cnv , **kwargs ) return cnv ## create new canvas @@ -83,7 +85,9 @@ def getCanvas ( name = 'glCanvas' , ## canvas name dw = width - cnv.GetWw() dh = height - cnv.GetWh() cnv.SetWindowSize ( width + dw , height + dh ) - + + set_pad ( cnv , **kwargs ) + _canvases.append ( cnv ) return cnv @@ -887,7 +891,6 @@ def set_pad ( pad , **config ) : if 'y_stat' in conf : changed[ 'y_stat' ] = pad.GetYstat () pad.SetYstat ( conf.pop ('y_stat') ) - if 'top_margin' in conf or 'margin_top' in conf : changed ['margin_top'] = pad.GetTopMargin() @@ -908,6 +911,28 @@ def set_pad ( pad , **config ) : changed ['margin_right'] = pad.GetRightMargin() if 'right_margin' in conf : pad.SetRightMargin ( conf.pop ( 'right_margin' ) ) else : pad.SetRightMargin ( conf.pop ( 'margin_right' ) ) + + if 'grid_x' in conf or 'x_grid' in conf : + changed ['grid_x'] = pad.GetGridx () + if 'grid_x' in conf : pad.SetGridx ( conf.pop ( 'grid_x' ) ) + else : pad.SetGridx ( conf.pop ( 'x_grid' ) ) + + if 'grid_y' in conf or 'y_grid' in conf : + changed ['grid_y'] = pad.GetGridy () + if 'grid_y' in conf : pas.SetGridy ( conf.pop ( 'grid_y' ) ) + else : pas.SetGridy ( conf.pop ( 'y_grid' ) ) + + if 'log_x' in conf or 'x_log' in conf : + changed ['log_x'] = pad.GetLogx () + if 'log_x' in conf : pad.SetLogx ( conf.pop ( 'log_x' ) ) + else : pad.SetLogx ( conf.pop ( 'x_log' ) ) + + if 'log_y' in conf or 'y_log' in conf : + changed ['log_y'] = pad.GetLogy () + if 'log_y' in conf : pad.SetLogy ( conf.pop ( 'log_y' ) ) + else : pad.SetLogy ( conf.pop ( 'y_log' ) ) + + if conf : logger.warning ("set_pad: unprocessed items: %s" % conf )