diff --git a/.aux/test_with_lcg b/.aux/test_with_lcg index 8923cec5..3949cb55 100755 --- a/.aux/test_with_lcg +++ b/.aux/test_with_lcg @@ -4,4 +4,4 @@ CMTCONFIG=$2 source /cvmfs/sft.cern.ch/lcg/views/${LCG}/${CMTCONFIG}/setup.sh source build/INSTALL/thisostap.sh cd build -ctest -N && cmake .. -DCMAKE_INSTALL_PREFIX=./INSTALL/ && ctest -j4 --output-on-failure --test-output-size-failed=5000000 +ctest -N && cmake .. -DCMAKE_INSTALL_PREFIX=./INSTALL/ && ctest -j4 -R _io_ --output-on-failure --test-output-size-failed=5000000 diff --git a/ostap/io/bz2shelve.py b/ostap/io/bz2shelve.py index 93d835ad..77548e0c 100644 --- a/ostap/io/bz2shelve.py +++ b/ostap/io/bz2shelve.py @@ -60,9 +60,6 @@ # # @endcode # -# @attention: In case DB-name has extension ``.bz2'', the whole data base -# will be ``bzip2''-ed ". -# # @attention: When one tries to read the database with pickled ROOT object using newer # version of ROOT, one could get a ROOT read error, # in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D> @@ -121,8 +118,6 @@ ... >>> abcd = db['some_key'] - In case DB-name has extension ``.bz2'', the whole data base will be ``bzip2''-ed - Attention: When one tries to read the database with pickled ROOT object using newer version of ROOT, one could get a ROOT read error, in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D @@ -173,8 +168,6 @@ class Bz2Shelf(CompressShelf): - 'c' Open database for reading and writing, creating if it does not exist - 'n' Always create a new, empty database, open for reading and writing """ - ## the known "standard" extensions: - extensions = '.tbz' , '.tbz2' , '.bz2' ## def __init__( self , dbname , @@ -184,67 +177,14 @@ def __init__( self , assert 1 <= compress <= 9 , 'Invalid `compress` for `bz2`-compression: %s' % compress ## initialize the base class - CompressShelf.__init__ ( self , - dbname , - mode = mode , - compress = compress , **kwargs ) - - ## needed for proper (un)pickling - def __getinitargs__ ( self ) : - """for proper (un_pickling""" - return self.__init_args - - ## needed for proper (un)pickling - def __getstate__ ( self ) : - """for proper (un)pickling""" - self.sync() - return {} - - ## needed for proper (un)pickling - def __setstate__ ( self , dct ) : - """for proper (un)pickling""" - pass - - # ========================================================================= - ## compress (bz2) the file into temporary location, keep original - def compress_files ( self , files ) : - """Compress (bz2) the file into temporary location, keep original - """ - output = self.tempfile() - - import tarfile - with tarfile.open ( output , 'w:bz2' ) as tfile : - for file in files : - _ , name = os.path.split ( file ) - tfile.add ( file , name ) - ## - return output + CompressShelf.__init__ ( self , + dbname , + mode = mode , + compress = compress , + compresstype = 'bz2' , **kwargs ) - # ========================================================================= - ## uncompress (bz2) the file into temporary location, keep original - def uncompress_file ( self , filein ) : - """Uncompress (bz2) the file into temporary location, keep original - """ - items = [] - tmpdir = self.tempdir () + self.taropts = "x:bz2" - ## 2) try compressed-tarfile - import tarfile - if tarfile.is_tarfile ( filein ) : - with tarfile.open ( filein , 'r:*' ) as tfile : - for item in tfile : - tfile.extract ( item , path = tmpdir ) - items.append ( os.path.join ( tmpdir , item.name ) ) - items.sort() - return tuple ( items ) - - import tempfile , io - fd , fileout = tempfile.mkstemp ( prefix = 'ostap-tmp-' , suffix = '-bz2db' ) - with bz2.open ( filein , 'rb' ) as fin : - with io.open ( fileout , 'wb' ) as fout : - shutil.copyfileobj ( fin , fout ) - return fileout , - # ========================================================================== ## compress (bzip2) the item using bz2.compress def compress_item ( self , value ) : @@ -260,7 +200,6 @@ def uncompress_item ( self , value ) : - see bz2.decompress """ return self.unpickle ( bz2.decompress ( value ) ) - # ============================================================================= ## helper function to access Bz2Shelve data base diff --git a/ostap/io/compress_shelve.py b/ostap/io/compress_shelve.py index acd88617..bcccda38 100755 --- a/ostap/io/compress_shelve.py +++ b/ostap/io/compress_shelve.py @@ -14,8 +14,8 @@ # However is contains several new features: # # - Optionally it is possible to perform the compression -# of the whole data base, that can be rather useful for data base -# with large amout of keys +# of the whole data base, that can be useful for data base +# with large amout of keys # # The module has been developed and used with great success in # `Kali, framework for fine calibration of LHCb Electormagnetic Calorimeter' @@ -38,7 +38,7 @@ with large amout of keys The module has been developed and used with great success in - `Kali, framework for fine calibration of LHCb Electormagnetic Calorimeter' + `Kali, framework for fine calibration of LHCb Electromagnetic Calorimeter' """ # ============================================================================= @@ -56,11 +56,14 @@ # ============================================================================= import os, abc, shelve, shutil, glob, time, datetime, zipfile, tarfile from sys import version_info as python_version -from ostap.io.dbase import dbopen , whichdb, Item, ordered_dict +from ostap.io.dbase import dbopen , whichdb, Item, ordered_dict, dbfiles from ostap.core.meta_info import meta_info from ostap.io.pickling import ( Pickler , Unpickler, BytesIO, PROTOCOL, HIGHEST_PROTOCOL, DEFAULT_PROTOCOL ) +from ostap.utils.cleanup import CUBase +from ostap.utils.utils import file_size +from ostap.utils.basic import writeable # ============================================================================= from ostap.logger.logger import getLogger if '__main__' == __name__ : logger = getLogger ( 'ostap.io.compress_shelve' ) @@ -104,8 +107,8 @@ # - uncompress_file # @author Vanya BELYAEV Ivan.Belyaev@cern.ch # @date 2010-04-30 -class CompressShelf(shelve.Shelf,object): - """ `Compressed'-version of `shelve'-database +class CompressShelf (shelve.Shelf,CUBase) : + """ `Compressed' - version of `shelve'-database It has four abstract methods: - compress_item - uncompress_item @@ -113,8 +116,9 @@ class CompressShelf(shelve.Shelf,object): - uncompress_file """ __metaclass__ = abc.ABCMeta - extensions = () - + ZIP_EXTS = ( '.zip' , '.zipdb' , '.dbzip' , '.zdb' , '.dbz' ) ## whole DB is in zip-archive + TAR_EXTS = ( '.tar' , '.tardb' , '.dbtar' , '.tdb' , '.dbt' ) ## whole DB is in tar-archive + def __init__( self , dbname , @@ -135,13 +139,14 @@ def __init__( if not 0 <= protocol <= HIGHEST_PROTOCOL : logger.warning ("Invalid pickle protocol:%s, replace with:%s" % ( protocol , PROTOCOL ) ) protocol = PROTOCOL - - ## expand the actual file name - dbname = os.path.expandvars ( dbname ) - dbname = os.path.expanduser ( dbname ) - dbname = os.path.expandvars ( dbname ) - dbname = os.path.expandvars ( dbname ) + + self.__opened = False + ## expand the actual file name + dbname_ = dbname + dbname = self.name_expand ( dbname ) ## from CUBase + + self.__compresstype = kwargs.pop ( 'compresstype' , '???' ) self.__compresslevel = compress self.__silent = silent self.__protocol = protocol @@ -149,142 +154,154 @@ def __init__( ## all arguments for constructor self.__kwargs = { - 'dbname' : dbname , - 'mode' : mode , - 'compress' : self.compresslevel , - 'protocol' : protocol , ## from shelve.Shelf - 'writeback' : writeback , ## from shelf.Shelf - 'dbtype' : self.dbtype , ## preferred dbtype - 'silent' : self.silent , - 'keyencoding' : keyencoding , + 'dbname' : dbname , + 'mode' : mode , + 'compress' : self.compresslevel , + 'protocol' : protocol , ## from shelve.Shelf + 'writeback' : writeback , ## from shelf.Shelf + 'dbtype' : self.dbtype , ## preferred dbtype + 'silent' : self.silent , + 'keyencoding' : keyencoding , } self.__kwargs.update ( kwargs ) - - - self.__nominal_dbname = dbname - self.__actual_dbname = dbname - + self.__mode = mode + self.__nominal_dbname = dbname + + ## generic case + self.__actual_dbname = dbname self.__compress = () self.__remove = () - - self.__opened = False self.__files = () - + if not self.__silent : logger.info ( 'Open DB: %s' % dbname ) - ## filename without extension and the extension itself + ## filename without extension and the extension fname , ext = os.path.splitext ( dbname ) - self.__extension = ext - - ## predefined extension? - if ext.lower() in self.extensions : - - fexists = os.path.exists ( dbname ) - - if fexists and 'r' == mode : - - ## uncompress into the temporary location - tfiles = self.uncompress_file ( dbname ) - filename = self.dbase_name ( tfiles ) - - self.__remove = tfiles - self.__compress = () - self.__actual_dbname = filename - self.__files = tfiles - - elif fexists and 'r' != mode : + extlow = ext.lower () + + exists = os.path.exists ( dbname ) + fexist = exists and os.path.isfile ( dbname ) + + ## use zip-compression of whole db + zip = extlow in self.ZIP_EXTS + + ## use tar-compression of whole db + tar = extlow in self.TAR_EXTS + + zipf = zip and fexist and zipfile.is_zipfile ( dbname ) + tarf = tar and fexist and tarfile.is_tarfile ( dbname ) - ## uncompress locally - tfiles = self.__in_place_uncompress ( dbname ) - filename = self.dbase_name ( tfiles ) + self.__zip = zip + self.__tar = tar + + if 'r' == mode and ( tarf or zipf ) : + + ## uncompress into the temporary location + tmpdir = self.tempdir () + tfiles = self.uncompress_file ( dbname , where = tmpdir ) - self.__compress = tfiles - self.__remove = tfiles - self.__files = tfiles - self.__actual_dbname = filename + self.__compress = () + self.__remove = tfiles + self.__files = tfiles + self.__actual_dbname = os.path.join ( tmpdir , os.path.basename ( fname ) ) - else : - - ## - filename = fname - self.__compress = True - self.__remove = () - self.__actual_dbname = filename - - afiles = tuple ( [ self.dbname + suffix for suffix in ( '' , ',db' , '.dir' , '.pag' , '.dat' ) ] ) - ofiles = set ( [ i for i in glob.iglob ( self.dbname + '*' ) if i in afiles ] ) - - self.__opened = False + elif 'r' != mode and ( tarf or zipf ) : - ## actual database - dbase = dbopen ( self.dbname , flag = mode , dbtype = dbtype , **kwargs ) - conf = { 'protocol' : protocol , 'writeback' : writeback } + ## uncompress locally + outdir , _ = os.path.split ( os.path.abspath ( dbname ) ) + tfiles = self.uncompress_file ( dbname , where = outdir ) - if 3 <= python_version.major : conf [ 'keyencoding'] = keyencoding - else : self.keyencoding = keyencoding - - shelve.Shelf.__init__ ( self , dbase , **conf ) + self.__compress = tuple ( sorted ( tfiles ) ) + self.__remove = tfiles + self.__files = tfiles + self.__actual_dbname = os.path.join ( outdir , os.path.basename ( fname ) ) + elif ( zip or tar ) : + + filename = fname + self.__compress = True + self.__remove = () + self.__actual_dbname = filename + else : + + filename = dbname + self.__compress = False + self.__remove = () + self.__actual_dbname = filename + + # ===================================================================== + the_path = lambda s : os.path.normpath ( os.path.abspath ( s ) ) + ## all files before dbopen + ofiles = set ( [ the_path ( i ) for i in glob.iglob ( self.dbname + '*' ) ] ) + ofiles |= set ( [ the_path ( i ) for i in glob.iglob ( self.dbname + '/*' ) ] ) + + self.__ofiles = tuple ( sorted ( ofiles ) ) + + # ===================================================================== + ## open/create the actual underlying database + # ===================================================================== + self.__opened = False + dbase = dbopen ( self.dbname , flag = mode , dbtype = dbtype , **kwargs ) self.__opened = True - self.__mode = mode - ### self.sync () + # ======================================================================= + ## all files after dbopen + pfiles = set ( [ the_path ( i ) for i in glob.iglob ( self.dbname + '*' ) ] ) + pfiles |= set ( [ the_path ( i ) for i in glob.iglob ( self.dbname + '/*' ) ] ) + ## new files + nfiles = pfiles - ofiles + + self.__pfiles = tuple ( sorted ( pfiles ) ) + self.__nfiles = tuple ( sorted ( nfiles ) ) + + # ===================================================================== + ## initialize the base class + # ===================================================================== + conf = { 'protocol' : protocol , 'writeback' : writeback } + if 3 <= python_version.major : conf [ 'keyencoding'] = keyencoding + else : self.keyencoding = keyencoding + shelve.Shelf.__init__ ( self , dbase , **conf ) + # ====================================================================== + ## actual type of underlying database self.__dbtype = whichdb ( self.dbname ) ## actual dbtype - ## if hasattr ( self.dict , 'reopen' ) : self.dict.reopen() - - nfiles = set ( [ i for i in glob.iglob ( self.dbname + '*' ) if i in afiles ] ) - ofiles + + # ====================================================================== + ## expected files for the given DB type + efiles = set ( the_path ( f ) for f in dbfiles ( self.dbtype , self.dbname ) ) - if not self.__files : + if ( ofiles | efiles ) != pfiles : + logger.warning ( 'Some missing or unexpected files' ) - files = [] - f = self.dbname - db = self.dbtype - if os.path.exists ( f ) and os.path.isfile ( f ) and \ - db in ( 'dbm.gnu' , 'gdbm' , 'dbhash' , 'bsddb185' , 'bsddb' , 'bsddb3' , 'sqlite3' , 'berkeleydb') : - files.append ( f ) - elif f + '.db' in nfiles and db in ( 'dbm.ndmb' , 'dbm' ) : - files.append ( f + '.db' ) - elif f + '.pag' in nfiles and f + '.dir' in nfiles and db in ( 'dbm.ndbm' , 'dbm' ) : - files.append ( f + '.pag' ) - files.append ( f + '.dir' ) - elif f + '.dat' in nfiles and f + '.dir' in nfiles and db in ( 'dbm.dumb' , 'dumbdbm' ) : - files.append ( f + '.dat' ) - files.append ( f + '.dir' ) - elif f + '.pag' in nfiles and db in ( 'dbm.ndbm' , 'dbm' ) : - files.append ( f + '.pag' ) - elif f + '.db' in ofiles and db in ( 'dbm.ndmb' , 'dbm' ) : - files.append ( f + '.db' ) - elif f + '.pag' in ofiles and f + '.dir' in ofiles and db in ( 'dbm.ndbm' , 'dbm' ) : - files.append ( f + '.pag' ) - files.append ( f + '.dir' ) - elif f + '.dat' in ofiles and f + '.dir' in ofiles and db in ( 'dbm.dumb' , 'dumbdbm' ) : - files.append ( f + '.dat' ) - files.append ( f + '.dir' ) - elif f + '.pag' in ofiles and db in ( 'dbm.dumb' , 'dumbdbm' ) : - files.append ( f + '.pag' ) - ## else : - ## logger.error ( 'Cannot find DB for %s|%s' % ( self.dbname , self.dbtype ) ) - - files.sort () - self.__files = tuple ( files ) + files1 = pfiles & efiles ## expected and found + files2 = efiles - pfiles ## expected but not found + files3 = nfiles - efiles + + if files2 : logger.warning ( 'Expected but not found %s/%d: %s' % ( self.dbtype , len ( files2 ) , list ( files2 ) ) ) + if files3 : logger.warning ( 'New but not expected %s/%d: %s' % ( self.dbtype , len ( files3 ) , list ( files3 ) ) ) + + # ===================================================================== + ## list of files (expected) + self.__files = tuple ( sorted ( efiles ) ) if self.__compress is True : self.__compress = self.files self.__remove = self.files - if 'n' == self.mode : + if 'n' == self.mode or ( 'c' == mode and not '__metainfo__' in self ) : dct = ordered_dict() dct [ 'Created by' ] = meta_info.User dct [ 'Created at' ] = datetime.datetime.now ().strftime( '%Y-%m-%d %H:%M:%S' ) dct [ 'Created with Ostap version' ] = meta_info.Ostap dct [ 'Created with Python version' ] = meta_info.Python dct [ 'Created with ROOT version' ] = meta_info.ROOT - dct [ 'Pickle protocol' ] = protocol - dct [ 'Compress level' ] = self.__compresslevel + dct [ 'Pickle protocol' ] = protocol + dct [ 'Compress level' ] = self.compresslevel + dct [ 'Compress type' ] = self.compresstype + dct [ 'Underlying dbase type' ] = self.dbtype self [ '__metainfo__' ] = dct if not self.silent : @@ -295,7 +312,8 @@ def __init__( self.sync () - + self.__taropts = 'x:gz' + @property def dbtype ( self ) : """`dbtype' : the underlying type of database""" @@ -308,37 +326,47 @@ def protocol ( self ) : @property def compression ( self ) : - "`compression' : compression level" + """`compression' : compression level""" return self.__compresslevel @property def compresslevel ( self ) : - "`compress level' : compression level" + """`compress level' : compression level""" return self.__compresslevel + @property + def compresstype ( self ) : + """`compresstype' : type of compression""" + return self.__compresstype + @property def dbname ( self ) : - "`dbname' : the actual name for the database" + """`dbname' : the actual name for the database""" return self.__actual_dbname @property def nominal_dbname ( self ) : - "`nominal_dbname' : the actual name for the database" + """`nominal_dbname' : the actual name for the database""" return self.__nominal_dbname @property def opened ( self ) : - "`open' : is data base opened?" + """`opened' : is data base opened?""" return self.__opened + + @property + def closed ( self ) : + """`closed` : is database closed (==not-opened)? """ + return not self.opened @property def mode ( self ) : - "`mode' : the actual open-mode for the database" + """`mode' : the actual open-mode for the database""" return self.__mode @property def silent ( self ) : - "`silent' : silent actions?" + """`silent' : silent actions?""" return self.__silent @property @@ -356,6 +384,24 @@ def kwargs ( self ) : """`kwargs` : all constructor arguments""" return self.__kwargs + @property + def zip ( self ) : + """`zip` : use zip-archive """ + return self.__zip + + @property + def tar ( self ) : + """`tar` : use tar-archive """ + return self.__tar + + @property + def taropts ( self ) : + """`taropts` : options for tar-archive compression""" + return self.__taropts + @taropts.setter + def taropts ( self , value ) : + self.__taropts = value + # ========================================================================= ## valid, opened DB def __nonzero__ ( self ) : @@ -499,7 +545,7 @@ def table ( self , pattern = '' , load = True , prefix = '' ) : # ========================================================================= ## List the available keys (patterns included). - # Pattern matching is performed according to + # Pattern matching is performed according to # fnmatch/glob/shell rules [it is not regex!] # @code # db = ... @@ -544,17 +590,31 @@ def ls ( self , pattern = '' , load = True , prefix = '# ' , logger = None ) ## close and compress (if needed) def close ( self ) : """ Close the file (and compress it if required) - """ - + """ if not self.opened : return ## + if not self.silent : logger.info ( 'Closing database %s' % self.dbname ) + else : logger.debug ( 'Closing database %s' % self.dbname ) + # if 'r' != self.mode and 'n' != self.mode : dct = self.get ( '__metainfo__' , ordered_dict () ) dct [ 'Updated at' ] = datetime.datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ) - dct [ 'Updated by' ] = meta_info.User + dct [ 'Updated by' ] = meta_info.User dct [ 'Updated with Ostap version' ] = meta_info.Ostap dct [ 'Updated with Python version' ] = meta_info.Python dct [ 'Updated with ROOT version' ] = meta_info.ROOT + ## + if self.protocol != dct.get ( 'Pickle protocol' , 0 ) : + dct [ 'Pickle protocol' ] = self.protocol + if self.compresslevel != dct.get ( 'Compress level' , 0 ) : + dct [ 'Compress level' ] = self.compresslevel + ## + if self.compresstype != dct.get ( 'Compress type' , '' ) : + dct [ 'Compress type' ] = self.compresstype + ## + if self.dbtype != dct.get ( 'Underlying dbase type' , '' ) : + dct [ 'Underlying dbase type' ] = self.dbtype + ## self [ '__metainfo__' ] = dct if not self.silent : self.ls () @@ -564,69 +624,11 @@ def close ( self ) : ## if self.__compress : - self.__in_place_compress ( self.__compress ) - - ## remove the intermediate files - for f in self.__remove : - if os.path.exists ( f ) : - try : - os.remove ( f ) - except OSError : - pass - - # ========================================================================= - ## compress the files (`in-place') - def __in_place_compress ( self , files ) : - """ Compress the file `in-place' - - It is better to use here `os.system' or `popen'-family, - but it does not work properly for multiprocessing environemnt - """ - output = self.nominal_dbname - out , _ = os.path.split ( output ) - outdir = out if out else '.' - assert os.access ( outdir , os.W_OK ) ,\ - 'The directory "%s" is not writeable!' % os.abspath ( outdir ) - # - # compress the file - compressed = self.compress_files ( files ) - # remove input files - for f in files : - try : - os.remove ( f ) - except OSError : - pass - - shutil.move ( compressed , output ) + self.compress_files ( self.__compress , self.nominal_dbname ) - # ========================================================================= - ## uncompress the file (`in-place') - def __in_place_uncompress ( self , filein ) : - """ Uncompress the file `in-place' - - It is better to use here `os.system' or `popen'-family, - but unfortunately it does not work properly for multithreaded environemnt - """ - _ , ext = os.path.splitext ( filein ) - if ( not ext ) or ( ext not in self.extensions ) : - logger.error ( 'Unknown extension for %s' % filein ) - ## - out , _ = os.path.split ( filein ) - outdir = out if out else '.' - assert os.access ( outdir , os.W_OK ) ,\ - 'The directory "%s" is not writeable!' % os.abspath ( outdir ) - - ## uncompress the file - tfiles = self.uncompress_file ( filein ) - # remove the original - os.remove ( filein ) - ofiles = [] - for f in tfiles : - _ , ff = os.path.split ( f ) - ff = os.path.join ( out , ff ) - shutil.move ( f , ff ) - ofiles.append ( ff ) - - return tuple ( ofiles ) - + ## remove the intermediate files + for f in self.__remove : self.remove_file ( f ) + # ========================================================================= ## Context manager functionality : enter def __enter__ ( self ) : return self @@ -725,59 +727,8 @@ def disk_size ( self ) : >>> db = ... >>> ds = db.disk_size() """ - size = 0 - for f in self.files : - if os.path.exists ( f ) and os.path.isfile ( f ) : - size += os.path.getsize ( f ) - return size - - # ========================================================================= - ## Create the temporary directory - # The directory will be cleaned-up and deleted at-exit. - @classmethod - def tempdir ( cls , suffix = '-db-dir' , prefix = 'ostap-compress-shelve-dir-' , date = True ) : - """ Create the temporary directory - The directory will be cleaned-up and deleted at-exit. - """ - from ostap.utils.cleanup import CleanUp as CU - return CU.tempdir ( suffix = suffix , prefix = prefix, date = date ) - - # ========================================================================= - ## Ccreate the name for the temproary file - # The file will be deleted at-axit - @classmethod - def tempfile ( cls , suffix = '-db' , prefix = 'ostap-compress-shelve-' , dir = None , date = True ) : - """ Create the name for the temporary file - The file will be deleted at-axit - """ - from ostap.utils.cleanup import CleanUp as CU - return CU.tempfile ( suffix = suffix , prefix = prefix, dir = dir , date = date ) - - # ======================================================================== - ## guess the name of the database from the list of (uncompressed files) - @classmethod - def dbase_name ( cls , files ) : - """ Guess the name of the database from the list of (uncompressed files) - """ - - exts = set ( [ os.path.splitext ( f )[1] for f in files ] ) + return file_size ( *self.files ) - if 1 == len ( files ) and whichdb ( files [ 0 ] ) : return files [ 0 ] - elif 1 == len ( files ) and '.db' in exts : - f , _ = os.path.splitext ( files [0] ) - if whichdb ( f ) : return f - elif 2 <= len ( files ) and '.dir' in exts and '.pag' in exts : - f , _ = os.path.splitext ( files [0] ) - if whichdb ( f ) : return f - elif 1 <= len ( files ) and '.pag' in exts : - f , _ = os.path.splitext ( files [0] ) - if whichdb ( f ) : return f - elif 2 <= len ( files ) and '.dir' in exts and '.dat' in exts : - f , _ = os.path.splitext ( files [0] ) - if whichdb ( f ) : return f - - raise TypeErrro ('Cannot identify the database name: %s' % str ( files ) ) - # ========================================================================= ## Pickle/serialize compressed data def pickle ( self , value ) : @@ -803,47 +754,94 @@ def compress_item ( self , value ) : # ========================================================================= @abc.abstractmethod def uncompress_item ( self , value ) : - """Uncompress the value using the certain compressing engine""" + """ Uncompress the value using the certain compressing engine""" return NotImplemented # ========================================================================= - ## Compress the files into temporary location, keep original - @abc.abstractmethod - def compress_files ( self , files ) : - """Compress the files into temporary location, keep the original """ - return NotImplemented - + ## Compress the files into specified location + def compress_files ( self , files , output ) : + """ Compress the files into the specified location + """ + if not self.silent : logger.info ( 'Compress %s into %s' % ( files , output ) ) + + fdir = '' + fdirs = [ f for f in files if os.path.isdir ( f ) ] + if fdirs : + a , b = os.path.split ( fdirs [ 0 ] ) + if not b : + a , b = os.path.split ( a ) + fdir = os.path.join ( b , '' ) + + if self.zip : + with zipfile.ZipFile ( output , 'w' , allowZip64 = True ) as zfile : + for f in sorted ( files ) : + _ , name = os.path.split ( f ) + if fdir : name = os.path.join ( fdir , name ) + zfile.write ( f , name ) + if not self.silent : + logger.info ( "Zip-file `%s` content:" % output ) + for f in zfile.infolist() : + logger.info ( '%s' % f ) + return output + + elif self.tar : + + with tarfile.open ( output , self.taropts ) as tfile : + for f in sorted ( files ) : + _ , name = os.path.split ( f ) + if fdir : name = os.path.join ( fdir , name ) + tfile.add ( f , name ) + if not self.silent : + logger.info ( "Tar-file `%s` content:" % output ) + tfile.list() + return output + + return None + # ========================================================================= - ## Uncompress the file into temporary location, keep original - def uncompress_file ( self , filein ) : - """ Uncompress the file into temporary location, keep the original """ + ## Uncompress the file into specified location, keep original + def uncompress_file ( self , filein , where ) : + """ Uncompress the file into specofed location, keep the original """ + + if not self.silent : logger.info ( 'Uncompress %s into %s' % ( filein , where ) ) assert os.path.exists ( filein ) and os.path.isfile ( filein ) , \ - "Non existing/invalid file: %s" % filein - + "Non existing/invalid file:`%s'" % filein + + assert os.path.exists ( where ) and os.path.isdir ( where ) and writeable ( where ) ,\ + "Invalid/nonwriteable directory:`%s'" % where + items = [] - tmpdir = self.tempdir () ## 1) zip-archive ? if zipfile.is_zipfile ( filein ) : with zipfile.ZipFile ( filein , 'r' , allowZip64 = True ) as zfile : + if not self.silent : + logger.info ( "Zip-file `%s` content:" % filein ) + for f in zfile.infolist() : + logger.info ( '%s' % f ) for item in zfile.filelist : - zfile.extract ( item , path = tmpdir ) - items.append ( os.path.join ( tmpdir , item.filename ) ) - items.sort() - return tuple ( items ) + zfile.extract ( item , path = where ) + name = ( os.path.join ( where , item.filename ) ) + name = os.path.normpath ( name ) + items.append ( name ) + return tuple ( sorted ( items ) ) ## 2) compressed-tar archive ? if tarfile.is_tarfile ( filein ) : with tarfile.open ( filein , 'r:*' ) as tfile : + if not self.silent : + logger.info ( "Tar-file `%s` content:" % filein ) + tfile.list() for item in tfile : - tfile.extract ( item , path = tmpdir ) - items.append ( os.path.join ( tmpdir , item.name ) ) - items.sort() - return tuple ( items ) + tfile.extract ( item , path = where ) + name = ( os.path.join ( where , item.name ) ) + name = os.path.normpath ( name ) + items.append ( name ) + return tuple ( sorted ( items ) ) + + return () - return None - # ========================================================================= ## copy the database into new one # @code @@ -885,6 +883,7 @@ def clone ( self , dbname , **kwargs ) : """ return self.copy ( dbname , copykeys = self.keys () , **kwargs ) + # ============================================================================ ## a bit more decorations for shelve (optional) import ostap.io.shelve_ext diff --git a/ostap/io/dbase.py b/ostap/io/dbase.py index 179b67ad..7afbd4fb 100644 --- a/ostap/io/dbase.py +++ b/ostap/io/dbase.py @@ -108,7 +108,6 @@ def bsddb3_open ( filelame , bsddb3 = None use_bsddb3 = False - # ============================================================================= ## make a try to us eLMDB use_lmdb = False @@ -156,7 +155,7 @@ def whichdb ( filename ) : tst = std_whichdb ( filename ) ## dbase is identified - if tst : return txt + if tst : return tst ## make a try woth LMDB if use_lmdb and os.path.exists ( filename ) and os.path.isdir ( filename ) : @@ -245,7 +244,7 @@ def dbopen ( file , Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it only if it doesn't exist; and 'n' always creates a new database. - - Actually it is a bit extended form of `dbm.open` that accounts for `bsddb3`,`sqlite3` and `lmdb` + - Actually it is a bit extended form of `dbm.open` that accounts for `bsddb3`,`sqlite3`,'berkeleydb' and `lmdb` """ if 'n' in flag and os.path.exists ( file ) and os.path.isfile ( file ) : @@ -256,31 +255,28 @@ def dbopen ( file , if 'c' in flag and '' == check : check = None if os.path.exists ( file ) and os.path.isfile ( file ) : os.unlink ( file ) - - if 'n' == flag and file and os.path.exists ( file ) : - if os.path.isfile ( file ) : - try : os.unlink ( file ) - except : pass - elif os.path.isdir ( file ) : - try : shutil.rmtree ( file ) - except : pass - + # 'n' flag is specified or dbase does not exist and c flag is specified if 'n' in flag or ( check is None and 'c' in flag ) : if isinstance ( dbtype , str ) : db_types = [ dbtype.lower() ] else : db_types = [ db.lower() for db in dbtype ] + + ## check the preferred database type: for db in db_types : if use_berkeleydb and db in ( 'berkeleydb' , 'berkeley' ) : return berkeleydb_open ( file , flag , mode , **kwargs ) - if use_bsddb3 and 'bdsdb3' == db : + elif use_bsddb3 and 'bdsdb3' == db : return bsddb3_open ( file , flag , mode , **kwargs ) - if use_lmdb and 'lmdb' == db : + elif use_lmdb and 'lmdb' == db : return LmdbDict ( path = file , flag = flag , **kwargs ) - if db in ( 'sqlite' , 'sqlite3' ) : + elif db in ( 'sqlite' , 'sqlite3' ) : return SqliteDict ( filename = file , flag = flag , **kwargs ) + elif 'std' == db or db.startswith ( 'dbm.' ) or db.endswith ( 'dbm' ) : + if kwargs : logger.warning ( 'Ignore extra %d arguments:%s' % ( len ( kwargs ) , [ k for k in kwargs ] ) ) + return std_db.open ( file , flag , mode ) if concurrent and use_berkeleydb : return berkeleydb_open ( file , flag , mode , **kwargs ) @@ -288,13 +284,10 @@ def dbopen ( file , if concurrent and use_bsddb3 : return bsddb3_open ( file , flag , mode , **kwargs ) - ## temporarily disabled - ## if concurrent and use_lmdb : - ## return LmdbDict ( path = file , flag = flag , **kwargs ) - if concurrent : return SqliteDict ( filename = file , flag = flag , **kwargs ) + if kwargs : logger.warning ( 'Ignore extra %d arguments: %s' % ( len ( kwargs ) , [ k for k in kwargs ] ) ) return std_db.open ( file , flag , mode ) if use_berkeleydb and check in ( 'berkeleydb' , 'bsddb3' , 'dbhash' ) : @@ -309,6 +302,7 @@ def dbopen ( file , if check == 'sqlite3' : return SqliteDict ( filename = file , flag = flag , **kwargs ) + if kwargs : logger.warning ( 'Ignore extra %d arguments:%s' % ( len ( kwargs ) , [ k for k in kwargs ] ) ) return std_db.open ( file , flag , mode ) # ============================================================================= @@ -317,7 +311,7 @@ def dbopen ( file , # num, size = dbsize ( 'mydb' ) # @endcode def dbsize ( filename ) : - """Get disk size of data-base=like object + """ Get disk size of data-base-like object >>> num, size = dbsize ( 'mydb' ) """ size = 0 @@ -337,13 +331,32 @@ def dbsize ( filename ) : return num, size +# ============================================================================ +## Expected DB file names for the given basename +def dbfiles ( dbtype , basename ) : + """ Expected DB file names for the given basename + """ + if dbtype in ( 'dbm.ndbm' , 'dbm' ) : + return '%s.pag' % basename , '%s.dir' % basename , + elif dbtype in ( 'dbm.dump' , 'dumbdbm' ) : + return '%s.dat' % basename , '%s.dir' % basename , + elif dbtype in ( 'lmdb', ) : + return ( os.path.join ( basename , '' ) , ## directory + os.path.join ( basename , 'data.mdb' ) , + os.path.join ( basename , 'lock.mdb' ) ) + else : + return basename , + # ============================================================================ ## @class TmpDB -# Mixin class fo rtemporary databases +# Mixin class for temporary databases +# - remove : remove the temporary file immediately (just after `close') +# - keep : keep the file and do not delete it class TmpDB(object) : - """Mixin class for temporary databases + """ Mixin class for temporary databases + - remove : remove the temporary file immediately (just after `close') + - keep : keep the file and do not delete it """ - def __init__ ( self , suffix , remove = True , @@ -362,26 +375,26 @@ def __init__ ( self , @property def tmp_name ( self ) : - """``tmp_name'' : get the generated temporary file name + """`tmp_name' : get the generated temporary file name """ return self.__tmp_name @property def remove ( self ) : - """``remove'': remove the temporary file immediately (just after``close''), + """`remove': remove the temporary file immediately (just after `close'), otherwise remove it at the shutdown """ return self.__remove @property def keep ( self ) : - """``keep'' keep the file and do not delete it + """`keep': keep the file and do not delete it """ return self.__keep ## remove the file def clean ( self ) : - """remove the file + """ remove the file """ fname = self.nominal_dbname if self.remove and os.path.exists ( fname ) : diff --git a/ostap/io/lmdbdict.py b/ostap/io/lmdbdict.py index 1205ce91..888a40d4 100644 --- a/ostap/io/lmdbdict.py +++ b/ostap/io/lmdbdict.py @@ -22,7 +22,7 @@ # # @endcode # -# @attention both keys and valeus are bytestrings! +# @attention both keys and values are bytestrings! # # @author Vanya BELYAEV Ivan.Belyaev@cern.ch # @date 2024-08-19 diff --git a/ostap/io/lzshelve.py b/ostap/io/lzshelve.py index d27ebfe4..0a4bf964 100644 --- a/ostap/io/lzshelve.py +++ b/ostap/io/lzshelve.py @@ -60,9 +60,6 @@ # # @endcode # -# @attention: In case DB-name has extensions ``.lz'', ``.xz'', the whole data base -# will be ``LZMA''-ed ". -# # @attention: When one tries to read the database with pickled ROOT object using newer # version of ROOT, one could get a ROOT read error, # in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D> @@ -121,8 +118,6 @@ ... >>> abcd = db['some_key'] - In case DB-name has extension ``.lz'', ``.xz'', the whole data base will be ``LZMA''-ed - Attention: When one tries to read the database with pickled ROOT object using newer version of ROOT, one could get a ROOT read error, in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D @@ -184,8 +179,6 @@ class LzShelf(CompressShelf): - 'c' Open database for reading and writing, creating if it does not exist - 'n' Always create a new, empty database, open for reading and writing """ - ## the known "standard" extensions: - extensions = '.txz', '.tlz' , '.xz' , '.lz' , '.lzma' ## def __init__( self , dbname , @@ -195,51 +188,13 @@ def __init__( self , assert lzma, "`lzma` module is not available!" ## initialize the base class - CompressShelf.__init__ ( self , - dbname , - mode = mode , - compress = compress , **kwargs ) - - # ========================================================================= - ## compress (LZMA) the file into temporary location, keep original - def compress_files ( self , files ) : - """ Compress (LZMA) the file into temporary location, keep original - """ - output = self.tempfile() - - import tarfile - with tarfile.open ( output , 'x:xz' ) as tfile : - for file in files : - _ , name = os.path.split ( file ) - tfile.add ( file , name ) - return output - - # ========================================================================= - ## uncompress (LZMA) the file into temporary location, keep original - def uncompress_file ( self , filein ) : - """ Uncompress (LZMA) the file into temporary location, keep original - """ - - items = [] - tmpdir = self.tempdir () - - ## 1) try compressed-tarfile - import tarfile - if tarfile.is_tarfile ( filein ) : - with tarfile.open ( filein , 'r:*' ) as tfile : - for item in tfile : - tfile.extract ( item , path = tmpdir ) - items.append ( os.path.join ( tmpdir , item.name ) ) - items.sort() - return tuple ( items ) - - ## 2) try compressed file - import tempfile , io - fd , fileout = tempfile.mkstemp ( prefix = 'ostap-tmp-' , suffix = '-lzdb' ) - with lzma.open ( filein , 'rb' ) as fin : - with io.open ( fileout , 'wb' ) as fout : - shutil.copyfileobj ( fin , fout ) - return fileout , + CompressShelf.__init__ ( self , + dbname , + mode = mode , + compress = compress , + compresstype = 'lzma' , **kwargs ) + + self.taropts = 'x:xz' # ========================================================================== ## compress (LZMA) the item using lzma.compress @@ -255,8 +210,8 @@ def uncompress_item ( self , value ) : """ Uncompress (LZMA) the item using ``lzma.decompress'' - see lzma.decompress """ - return self.unpickle ( lzma.decompress ( value ) ) - + return self.unpickle ( lzma.decompress ( value ) ) + # ============================================================================= ## helper function to access LzShelve data base # @author Vanya BELYAEV Ivan.Belyaev@cern.ch diff --git a/ostap/io/rootshelve.py b/ostap/io/rootshelve.py index fe702acc..296e4d3f 100755 --- a/ostap/io/rootshelve.py +++ b/ostap/io/rootshelve.py @@ -118,7 +118,7 @@ from ostap.io.dbase import TmpDB from ostap.io.pickling import ( Pickler, Unpickler, BytesIO, PROTOCOL, - HIGHEST_PROTOCOL, DEFAULT_PROTOCOL ) + HIGHEST_PROTOCOL, DEFAULT_PROTOCOL ) import ROOT, shelve, zlib, os # ============================================================================= from ostap.logger.logger import getLogger @@ -212,7 +212,6 @@ def ikeys ( self , pattern = '' , regex = False ) : >>> for k in db.ikeys('*MC*') : print(k) """ - keys_ = self.keys() if not pattern : good = lambda k : True @@ -223,10 +222,9 @@ def ikeys ( self , pattern = '' , regex = False ) : else : import fnmatch good = lambda s : fnmatch.fnmatchcase ( k , pattern ) - - keys_ = self.keys() - for k in sorted ( keys_ ) : - if good ( k ) : yield k + + for key in self.keys() : + if good ( key ) : yield key @property def filename ( self ) : @@ -239,10 +237,10 @@ def __exit__ ( self , *_ ) : self.close () # ============================================================================= ## get the object from the ROOT file - def get ( self , key , defval = None ) : - """Get the object from the ROOT file + def get ( self , key , default = None ) : + """ Get the object from the ROOT file """ - return self.dict.get ( key , defval ) + return self.dict.get ( key , default ) # ============================================================================= ## get item from ROOT-file @@ -294,7 +292,11 @@ def ls ( self ) : """ return self.dict.ls_table( prefix = "# ") - + @property + def dbtype ( self ) : + """`dbtype` : the acual type of DB: `root`""" + return 'root' + # ============================================================================= ## need to disable endcode/decode for the keys if python_version.major > 2 : diff --git a/ostap/io/tests/test_io_shelves.py b/ostap/io/tests/test_io_shelves.py index 719d6ae2..c27085a0 100755 --- a/ostap/io/tests/test_io_shelves.py +++ b/ostap/io/tests/test_io_shelves.py @@ -25,14 +25,15 @@ ## ## sys.modules['dbm' ] = None # ============================================================================= from ostap.math.base import iszero -from ostap.core.pyrouts import VE -from ostap.utils.timing import timing +from ostap.core.pyrouts import VE, hID +from ostap.utils.timing import timing +from ostap.utils.utils import random_name from sys import version_info as python_version -from ostap.io.dbase import dbsize import ostap.utils.cleanup as CU import ostap.io.zipshelve as zipshelve import ostap.io.bz2shelve as bz2shelve import ostap.io.rootshelve as rootshelve +import ostap.logger.table as T import ROOT, os, random # ============================================================================= # logging @@ -60,7 +61,7 @@ bins = 1000 data = {} -h1 = ROOT.TH1D('h1','1D-histogram',bins,-5,5) ; h1.Sumw2() +h1 = ROOT.TH1D( 'h1' ,'1D-histogram',bins,-5,5) ; h1.Sumw2() m1 = VE(1,2) for i in range ( 0, 100000) : h1.Fill( m1.gauss() ) @@ -71,220 +72,145 @@ data [ 'histo-1D' ] = h1 data [ 'histo-2D' ] = h2 data [ 'both' ] = (123 , h1 , {'a':2}, h2,'comment',()) -data [ 'histos' ] = {} -for i in range ( 5000 ) : - ht = 'histo#%d' % i - hh = ROOT.TH1D ( ht , '' , 500 , 0 , 100 ) - for j in range ( 200 ) : - hh.Fill ( random.gauss ( 50 , 10) ) -data['histos'][ht] = hh +data [ 'histos' ] = {} +for i in range ( 1000 ) : + hn = hID() + hh = ROOT.TH1D ( hn ,'1D-histogram' , 100 , -5, 5 ) + for j in range ( 5000 ) : hh.Fill ( random.gauss ( 0 , 1 ) ) + if i < 100 : data [ hn ] = hh + data [ 'histos' ] [ hn ] = hh + # ============================================================================= def test_shelves1(): logger = getLogger ('Test shelves') - logger.info ( 'Test variosu shelves' ) - - db_sql_name = CU.CleanUp.tempfile ( suffix = '.sqldb' ) - db_zip_name = CU.CleanUp.tempfile ( suffix = '.zipdb' ) - db_bz2_name = CU.CleanUp.tempfile ( suffix = '.bz2db' ) - db_root_name = CU.CleanUp.tempfile ( suffix = '.root' ) - db_lz_name = CU.CleanUp.tempfile ( suffix = '.lzmadb' ) - db_zst_name = CU.CleanUp.tempfile ( suffix = '.zstdb' ) - - dbases = ( db_sql_name , db_zip_name , db_bz2_name , db_root_name ) - - from ostap.io.dbase import whichdb + logger.info ( 'Test varioouts shelves' ) - db_sql = zipshelve.open ( db_sql_name , 'c' , dbtype = 'sqlite3' ) - db_zip = zipshelve.open ( db_zip_name , 'c' ) - db_bz2 = bz2shelve.open ( db_bz2_name , 'c' ) - db_root = rootshelve.open ( db_root_name , 'c' ) + names = {} + dbases = {} + + names [ 'db_sql' ] = CU.CleanUp.tempfile ( suffix = '.sqlsh' ) + names [ 'db_zip' ] = CU.CleanUp.tempfile ( suffix = '.zipsh' ) + names [ 'db_bz2' ] = CU.CleanUp.tempfile ( suffix = '.bz2sh' ) + names [ 'db_root' ] = CU.CleanUp.tempfile ( suffix = '.root' ) - if lzshelve : db_lz = lzshelve.open ( db_lz_name , 'c' ) - else : db_lz = None + dbases [ 'db_zip' ] = zipshelve .open ( names [ 'db_zip' ] , 'c' ) + dbases [ 'db_sql' ] = zipshelve .open ( names [ 'db_sql' ] , 'c' , dbtype = 'sqlite3' ) + dbases [ 'db_bz2' ] = bz2shelve .open ( names [ 'db_bz2' ] , 'c' ) + dbases [ 'db_root' ] = rootshelve.open ( names [ 'db_root' ] , 'c' ) - if zstshelve : db_zst = zstshelve.open ( db_zst_name , 'c' ) - else : db_zst = None - - - for k in data : - db_sql [ k ] = data[k] - db_zip [ k ] = data[k] - db_bz2 [ k ] = data[k] - if lzshelve : - db_lz [ k ] = data[k] - if zstshelve : - db_zst [ k ] = data[k] - db_root [ k ] = data[k] - - - logger.info('SQLiteShelve #keys: %s' % len ( db_sql ) ) - logger.info('ZipShelve #keys: %s' % len ( db_zip ) ) - logger.info('Bz2Shelve #keys: %s' % len ( db_bz2 ) ) - logger.info('RootShelve #keys: %s' % len ( db_root ) ) if lzshelve : - logger.info('LzShelve #keys: %s' % len ( db_lz ) ) - if zstshelve : - logger.info('ZstShelve #keys: %s' % len ( db_zst ) ) - - db_sql .close() - db_zip .close() - db_bz2 .close() - db_root .close() - if lzshelve : db_lz .close() - if zstshelve : db_zst.close() - - logger.info('SQLiteShelve size: %d|%d ' % dbsize ( db_sql_name ) ) - logger.info('ZipShelve size: %d|%d ' % dbsize ( db_zip_name ) ) - logger.info('Bz2Shelve size: %d|%d ' % dbsize ( db_bz2_name ) ) - logger.info('RootShelve size: %d|%d' % dbsize ( db_root_name ) ) - if lzshelve : - logger.info('LzShelve size: %d|%d ' % dbsize ( db_lz_name ) ) - if zstshelve : - logger.info('ZstShelve size: %d|%d ' % dbsize ( db_zst_name ) ) - - db_sql = zipshelve.open ( db_sql_name , 'r' , dbtype = 'sqlite3') - db_zip = zipshelve.open ( db_zip_name , 'r' ) - db_bz2 = bz2shelve.open ( db_bz2_name , 'r' ) - if lzshelve : - db_lz = lzshelve.open ( db_lz_name , 'r' ) - if zstshelve : - db_zst = zstshelve.open ( db_zst_name , 'r' ) - db_root = rootshelve.open ( db_root_name , 'r' ) - - logger.info('SQLiteShelve #keys: %s' % len ( db_sql ) ) - logger.info('ZipShelve #keys: %s' % len ( db_zip ) ) - logger.info('Bz2Shelve #keys: %s' % len ( db_bz2 ) ) - if lzshelve : - logger.info('LzShelve #keys: %s' % len ( db_lz ) ) + names [ 'db_lzma' ] = CU.CleanUp.tempfile ( suffix = '.lzsh' ) + dbases [ 'db_lzma' ] = lzshelve .open ( names ['db_lzma'] , 'c' ) + if zstshelve : - logger.info('ZstShelve #keys: %s' % len ( db_zst ) ) - logger.info('RootShelve #keys: %s' % len ( db_root ) ) + names [ 'db_zstd' ] = CU.CleanUp.tempfile ( suffix = '.zstsh' ) + dbases [ 'db_zstd' ] = zstshelve .open ( names ['db_zstd'] , 'c' ) + + # =================================================================================== + ## test writing + # =================================================================================== + + rows = [ ( 'DBASE' , 'dbtype' , 'CPU [ms]' ) ] + for dbname in dbases : + db = dbases [ dbname ] + with timing ( 'Write %10s' % dbname ) as tm : + for key in data : + db [ key ] = data [ key ] + logger.info ( 'DB %-25s #keys: %d' % ( dbname , len ( db ) ) ) + row = dbname , db.dbtype , '%.1f' % ( tm.delta * 1000 ) + rows.append ( row ) + db.close() + + title = 'Write DB' + table = T.table ( rows , title = title , prefix = '# ' , alignment = 'llr' ) + logger.info ( '%s:\n%s' % ( title , table ) ) - with timing ( 'h2-read/SQL' ) : h2_sql = db_sql [ 'histo-2D'] - with timing ( 'h2_read/ZIP' ) : h2_zip = db_zip [ 'histo-2D'] - with timing ( 'h2_read/BZ2' ) : h2_bz2 = db_bz2 [ 'histo-2D'] - if lzshelve : - with timing ( 'h2_read/LZ' ) : - h2_lz = db_lz [ 'histo-2D'] - if zstshelve : - with timing ( 'h2_read/Zst' ) : - h2_zst = db_zst [ 'histo-2D'] - with timing ( 'h2_read/ROOT' ) : h2_root = db_root [ 'histo-2D'] + # =================================================================================== + ## test reading + # =================================================================================== + + dbases = {} - with timing ( 'tu-read/SQL' ) : tu_sql = db_sql [ 'both' ] - with timing ( 'tu_read/ZIP' ) : tu_zip = db_zip [ 'both' ] - with timing ( 'tu_read/BZ2' ) : tu_bz2 = db_bz2 [ 'both' ] + ## open for reading + logger.info ( 'Reopen databases as read-only' ) + + dbases [ 'db_zip' ] = zipshelve .open ( names [ 'db_zip' ] , 'r' ) + dbases [ 'db_sql' ] = zipshelve .open ( names [ 'db_sql' ] , 'r' , dbtype = 'sqlite3' ) + dbases [ 'db_bz2' ] = bz2shelve .open ( names [ 'db_bz2' ] , 'r' ) + dbases [ 'db_root' ] = rootshelve.open ( names [ 'db_root' ] , 'r' ) + if lzshelve : - with timing ( 'tu_read/LZ' ) : - tu_lz = db_lz [ 'both' ] + dbases [ 'db_lzma' ] = lzshelve .open ( names ['db_lzma'] , 'r' ) if zstshelve : - with timing ( 'tu_read/Zst' ) : - tu_zst = db_zst [ 'both' ] - with timing ( 'tu_read/ROOT' ) : tu_root = db_root [ 'both' ] - - with timing ( 'h1-read/SQL' ) : h1_sql = db_sql [ 'histo-1D'] - with timing ( 'h1-read/ZIP' ) : h1_zip = db_zip [ 'histo-1D'] - with timing ( 'h1-read/BZ2' ) : h1_bz2 = db_bz2 [ 'histo-1D'] - if lzshelve : - with timing ( 'h1-read/LZ' ) : - h1_lz = db_lz [ 'histo-1D'] - if zstshelve : - with timing ( 'h1-read/Zst' ) : - h1_zst = db_zst [ 'histo-1D'] - with timing ( 'h1-read/ROOT' ) : h1_root = db_root [ 'histo-1D'] - - for i in h1_sql : - v = h1_sql [i] - h1_zip [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(1)!') - v = h1_sql [i] - h1 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(2)!') - v = h1_root [i] - h1 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(3)!') - v = h1_bz2 [i] - h1 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(4)!') - if lzshelve : - v = h1_lz [i] - h1 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(5)!') - if zstshelve : - v = h1_zst [i] - h1 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 1D histogram(6)!') - - for i in h2_sql : - v = h2_sql [i] - h2_zip[i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(1)!') - v = h2_sql [i] - h2 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(2)!') - v = h2_root [i] - h2 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(3)!') - v = h2_bz2 [i] - h2 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(4)!') - if lzshelve : - v = h2_lz [i] - h2 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(5)!') - if zstshelve : - v = h2_zst [i] - h2 [i] - if not iszero ( v.value() ) : - logger.error('Large difference for 2D histogram(6)!') - - h1tq = tu_sql [1] - h1tz = tu_zip [1] - h1tr = tu_root[1] - - ## clone them - dbs = [ db_sql , db_zip , db_bz2 , db_root ] - if lzshelve : dbs.append ( db_lz ) - if zstshelve : dbs.append ( db_zst ) + dbases [ 'db_zstd' ] = zstshelve .open ( names ['db_zstd'] , 'r' ) + + rows = [ ( 'DBASE' , 'dbtype' , 'CPU [ms]' ) ] + for dbname in dbases : + db = dbases [ dbname ] + with timing ( 'Read %10s' % dbname ) as tm : + for key in db : value = db [ key ] + logger.info ( 'DB %-25s #keys: %d' % ( dbname , len ( db ) ) ) + row = dbname , db.dbtype , '%.1f' % ( tm.delta * 1000 ) + rows.append ( row ) + db.close() + + title = 'Read DB' + table = T.table ( rows , title = title , prefix = '# ' , alignment = 'llr' ) + logger.info ( '%s:\n%s' % ( title , table ) ) - for db in dbs : - cdb = db.clone ( CU.CleanUp.tempfile ( suffix = '.db' ) ) - logger.info('Cloned:') - cdb.ls() - del dbs - - with timing('Close SQL' ) : db_sql .close() - with timing('Close ZIP' ) : db_zip .close() - with timing('Close BZ2' ) : db_bz2 .close() - if lzshelve : - with timing('Close LZ' ) : db_lz .close() - if zstshelve : - with timing('Close ZST' ) : db_zst .close() - with timing('Close ROOT' ) : db_root.close() - - dbases = ( zipshelve . tmpdb ( dbtype = 'sqlite3' ) , - zipshelve . tmpdb () , - bz2shelve . tmpdb () , - rootshelve . tmpdb () ) + # =================================================================================== + ## test cloning + # =================================================================================== - if lzshelve : dbases = dbases + ( lzshelve . tmpdb () , ) - if zstshelve : dbases = dbases + ( zstshelve . tmpdb () , ) - + backends = [ + 'lmdb' , + 'berkleydb' , + 'berkley' , + 'bsddb3' , + 'sqlite' , + 'sqlite3' , + 'dbm.gnu' , + 'dumbdbm' , + 'std' , + ] + + clones = {} - for dbase in dbases : + with zipshelve .open ( names [ 'db_zip' ] , 'r' ) as original : - with timing () : + for dbtype in backends : - with dbase as db : - - db [ 'h1' ] = h1 - db [ 'h2' ] = h2 - db [ 'data' ] = data - db [ 'histos'] = data['histos'] - db.ls() - + nz = CU.CleanUp.tempfile ( suffix = '.zip' ) + nt = CU.CleanUp.tempfile ( suffix = '.tar' ) + tag1 = 'db_zip/%s' % dbtype + nn = CU.CleanUp.tempfile ( suffix = '.zipsh' ) + clones [ tag1 ] = original.clone ( dbname = nn , dbtype = dbtype ) + + tagz = 'db_zip/%s/zip' % dbtype + nz = CU.CleanUp.tempfile ( suffix = '.zip' ) + clones [ tagz ] = original.clone ( dbname = nz , dbtype = dbtype ) + + tagt = 'db_zip/%s/tar' % dbtype + nt = CU.CleanUp.tempfile ( suffix = '.tar' ) + clones [ tagt ] = original.clone ( dbname = nt , dbtype = dbtype ) + + rows = [ ( 'DBASE' , 'dbtype' , 'CPU [ms]' ) ] + for dbname in clones : + db = clones [ dbname ] + with timing ( 'Read %10s' % dbname ) as tm : + for key in db : value = db [ key ] + logger.info ( 'DB %-25s #keys: %d' % ( dbname , len ( db ) ) ) + row = dbname , db.dbtype , '%.1f' % ( tm.delta * 1000 ) + rows.append ( row ) + db.close() + + title = 'Read clones ' + table = T.table ( rows , title = title , prefix = '# ' , alignment = 'llr' ) + logger.info ( '%s:\n%s' % ( title , table ) ) + # ============================================================================= def test_shelves2 () : diff --git a/ostap/io/zipshelve.py b/ostap/io/zipshelve.py index c252911e..428f17be 100755 --- a/ostap/io/zipshelve.py +++ b/ostap/io/zipshelve.py @@ -14,7 +14,7 @@ # However is contains several new features: # # - Optionally it is possible to perform the compression -# of the whole data base, that can be rathe useful fo data base +# of the whole data base, that can be useful for data base # with large amout of keys # # The module has been developed and used with great success in @@ -54,9 +54,6 @@ # >>> abcd = db['some_key'] # @endcode # -# @attention: In case DB-name has extension ``.gz'', the whole data base -# will be ``gzip''-ed. -# # @attention: When one tries to read the database with pickled ROOT object using newer # version of ROOT, one could get a ROOT read error, # in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D> @@ -116,13 +113,12 @@ ... >>> abcd = db['some_key'] - In case DB-name has extension ``.gz'', the whole data base will be ``gzip''-ed - Attention: When one tries to read the database with pickled ROOT object using newer version of ROOT, one could get a ROOT read error, in case of evoltuion in ROOT streamers for some classes, e.g. ROOT.TH1D > Error in : Could not find the StreamerInfo for version 2 of the class TH1D, object skipped at offset 19 > Error in : object of class TH1D read too few bytes: 2 instead of 878 + The solution is simple and described in file ostap.io.dump_root - see ostap.io.dump_root """ @@ -165,8 +161,6 @@ class ZipShelf(CompressShelf): - 'c' Open database for reading and writing, creating if it does not exist - 'n' Always create a new, empty database, open for reading and writing """ - ## the known "standard" extensions: - extensions = '.zip' , '.tgz' , '.gz' ## def __init__( self , @@ -174,31 +168,18 @@ def __init__( mode = 'c' , compress = zlib.Z_DEFAULT_COMPRESSION , **kwargs ) : + ## check the compress level assert 0 <= compress <= zlib.Z_BEST_COMPRESSION or \ compress in ( -1 , zlib.Z_DEFAULT_COMPRESSION ) ,\ "Invalid `compress` for `zlib`: %s" % compress ## initialize the base class - CompressShelf.__init__ ( self , - dbname , - mode = mode , - compress = compress , **kwargs ) - - # ========================================================================= - ## compress the file into temporary location, keep original - def compress_files ( self , files ) : - """ Compress the files into the temporary location, keep original - """ - output = self.tempfile () + CompressShelf.__init__ ( self , + dbname , + mode = mode , + compress = compress , + compresstype = 'zip' , **kwargs ) - import zipfile - with zipfile.ZipFile ( output , 'w' , allowZip64 = True ) as zfile : - for file in files : - _ , name = os.path.split ( file ) - zfile.write ( file , name ) - - return output - # ========================================================================== ## compress (zip) the item using zlib.compress def compress_item ( self , value ) : @@ -215,6 +196,7 @@ def uncompress_item ( self , value ) : """ return self.unpickle ( zlib.decompress ( value ) ) + # ============================================================================= ## helper function to access ZipShelve data base # @author Vanya BELYAEV Ivan.Belyaev@cern.ch diff --git a/ostap/io/zstshelve.py b/ostap/io/zstshelve.py index ca5ddf28..ba25d27d 100644 --- a/ostap/io/zstshelve.py +++ b/ostap/io/zstshelve.py @@ -121,8 +121,6 @@ ... >>> abcd = db['some_key'] - In case DB-name has extension `.zst' the whole data base will be `ZST'-ed - Attention: When one tries to read the database with pickled ROOT object using newer version of ROOT, one could get a ROOT read error, in case of evolution in ROOT streamers for some classes, e.g. ROOT.TH1D @@ -183,8 +181,6 @@ class ZstShelf(CompressShelf): - 'c' Open database for reading and writing, creating if it does not exist - 'n' Always create a new, empty database, open for reading and writing """ - - extensions = '.zst', '.zstd' ## def __init__( self , dbname , @@ -203,10 +199,11 @@ def __init__( self , self.__decompressor = zst.ZstdDecompressor ( ) ## initialize the base class - CompressShelf.__init__ ( self , - dbname , - mode = mode , - compress = compress , **kwargs ) + CompressShelf.__init__ ( self , + dbname , + mode = mode , + compress = compress , + compresstype = 'zstd' , **kwargs ) conf = { 'threads' : threads } self.kwargs.update ( conf ) @@ -224,47 +221,6 @@ def decompressor ( self ) : """'decompressor' : get the actual decompressor object""" return self.__decompressor - # ========================================================================= - ## compress (zstandard) the file into temporary location, keep original - def compress_files ( self , files ) : - """ Compress (zstandard) the file into temporary location, keep original - """ - output = self.tempfile() - import tarfile - with tarfile.open ( output , 'x:gz' ) as tfile : - for file in files : - _ , name = os.path.split ( file ) - tfile.add ( file , name ) - return output - - # ========================================================================= - ## uncompress (zstandard) the file into temporary location, keep original - def uncompress_file ( self , filein ) : - """ Uncompress (zstandard) the file into temporary location, keep original - """ - - items = [] - tmpdir = self.tempdir () - - ## 1) try compressed-tarfile - import tarfile - if tarfile.is_tarfile ( filein ) : - with tarfile.open ( filein , 'r:*' ) as tfile : - for item in tfile : - tfile.extract ( item , path = tmpdir ) - items.append ( os.path.join ( tmpdir , item.name ) ) - items.sort() - return tuple ( items ) - - ## 2) single zst-file - import tempfile , io - fd , fileout = tempfile.mkstemp ( prefix = 'ostap-tmp-' , suffix = '-zstdb' ) - - with io.open ( filein , 'rb' ) as fin : - with io.open ( fileout , 'wb' ) as fout : - self.decompressor.copy_stream ( fin , fout ) - return fileout , - # ========================================================================== ## compress (ZST) the item using compressor def compress_item ( self , value ) : diff --git a/ostap/utils/cleanup.py b/ostap/utils/cleanup.py index 9d11349a..84716e57 100644 --- a/ostap/utils/cleanup.py +++ b/ostap/utils/cleanup.py @@ -213,6 +213,7 @@ def __del__ ( self ) : if ( not self.__cleaner ) or self.__cleaner.detach () : self._clean_trash_ ( self.__trash ) + # ========================================================================= @staticmethod def tempdir ( suffix = '' , prefix = 'ostap-tmp-dir-' , date = True ) : """Get the name of the newly created temporary directory. @@ -235,9 +236,10 @@ def tempdir ( suffix = '' , prefix = 'ostap-tmp-dir-' , date = True ) : logger.verbose ( 'temporary directory requested %s' % tmp ) return tmp + # ========================================================================= @staticmethod def get_temp_file ( suffix = '' , prefix = 'ostap-tmp-' , dir = None , date = True ) : - """Generate the name for the temporary file. + """ Generate the name for the temporary file. - the method should be avoided in favour of `CleanUp.tempfile` >>> fname = CleanUp.get_temp_file () """ @@ -262,30 +264,32 @@ def get_temp_file ( suffix = '' , prefix = 'ostap-tmp-' , dir = None , date = Tr logger.verbose ( 'temporary file requested %s' % fname ) return fname + # ========================================================================= @staticmethod def tempfile ( suffix = '' , prefix = 'ostap-tmp-' , dir = None , date = True , keep = False ) : - """Get the name of the temporary file. + """ Get the name of the temporary file. - The file will be deleted at-exit >>> fname = CleanUp.tempfile() """ fname = CleanUp.get_temp_file ( suffix = suffix , prefix = prefix , dir = dir , date = date ) - assert not os.path.exists ( fname ) + assert not os.path.exists ( fname ) CleanUp._tmpfiles.add ( fname ) - if keep : CleanUp._protected.add ( fname ) return fname + # ========================================================================= @staticmethod def protect_file ( fname ) : - """Protect the temporary from removal""" + """ Protect the temporary from removal""" if os.path.exists ( fname ) and os.path.isfile ( fname ) : CleanUp._protected.add ( fname ) logger.verbose ( 'the file is protected: %s ' % fname ) + # ========================================================================= @staticmethod def remove_file ( fname ) : - """Remove the (temporary) file + """ Remove the (temporary) file """ if os.path.exists ( fname ) and os.path.isfile ( fname ) : @@ -293,7 +297,7 @@ def remove_file ( fname ) : logger.verbose ( 'do not remove the protected file : %s ' % fname ) return False - logger.verbose ( 'remove temporary file : %s' % fname ) + logger.verbose ( 'remove the file : %s' % fname ) try : os.remove ( fname ) except : pass @@ -303,12 +307,13 @@ def remove_file ( fname ) : return False return True + # ========================================================================= @staticmethod def remove_dir ( fdir ) : """Remove the (temporary) directory """ if os.path.exists ( fdir ) and os.path.isdir ( fdir ) : - logger.verbose ( 'remove temporary dir : %s' % fdir ) + logger.verbose ( 'remove the dir : %s' % fdir ) ## 1: collect all files & subdirectories for root, subdirs, files in os.walk ( fdir , topdown = False ): ## 2: remove all files @@ -316,27 +321,87 @@ def remove_dir ( fdir ) : ## 3: remove all directories for dd in subdirs : dd = os.path.join ( root , dd ) - logger.verbose ( 'remove subdirectory %s in temporary directory %s ' % ( dd , fdir ) ) + logger.verbose ( 'remove subdirectory %s in the directory %s ' % ( dd , fdir ) ) try : os.rmdir ( dd ) except : pass if os.path.exists ( dd ) and os.path.isdir ( dd ) : CleanUp._failed.add ( dd ) - logger.error ( 'failed to remove %s in temporary directory %s ' % ( dd , fdir ) ) + logger.error ( 'failed to remove %s in the directory %s ' % ( dd , fdir ) ) ## 4: finally remove the root - try : os.rmdir ( fdir ) - except : pass + try : + os.rmdir ( fdir ) + return True + except : + pass if os.path.exists ( fdir ) and os.path.isdir ( fdir ) : CleanUp._failed.add ( fdir ) logger.error ( 'failed to remove : %s' % fdir ) - + return False + return True + + # ========================================================================= @staticmethod def remove ( fname ) : - """Remove temporary object (if any) + """ Remove temporary object(file or directory) (if any) """ - if os.path.exists ( fname ) and os.path.isdir ( fname ) : - return CleanUp.remove_dir ( fname ) - elif os.path.exists ( fname ) and os.path.isfile ( fname ) : - return CleanUp.remove_file ( fname ) + if os.path.exists ( fname ) : + if os.path.isdir ( fname ) : return CleanUp.remove_dir ( fname ) + elif os.path.isfile ( fname ) : return CleanUp.remove_file ( fname ) + +# ============================================================================ +## @class CUBase +# A ligth version of the CleanUp to bve used as helper base class +class CUBase(object) : + """ A ligth version of the CleanUp to be used as helper base class + """ + # ========================================================================== + ## expand the (file/dir)name + @classmethod + def name_expand ( cls , name ) : + """ Expand the (file/dir)name """ + thename = os.path.expandvars ( name ) + thename = os.path.expanduser ( thename ) + thename = os.path.expandvars ( thename ) + thename = os.path.expanduser ( thename ) + return os.path.expandvars ( thename ) + # ========================================================================= + ## remove the file + @classmethod + def remove_file ( cls , filename ) : + """ Remove the file """ + return CleanUp.remove_file ( filename ) + # ========================================================================= + ## remove the directory + @classmethod + def remove_dir ( cls , dirname ) : + """ Remove the directory """ + return CleanUp.remove_dir ( dirname ) + # ========================================================================= + ## remove the file or directory + @classmethod + def remove ( cls , name ) : + """ Remove the directory """ + return CleanUp.remove ( name ) + + # ========================================================================= + ## Create the temporary directory + # The directory will be cleaned-up and deleted at-exit. + @classmethod + def tempdir ( cls , suffix = '-TMP-dir' , prefix = 'ostap-TMP-dir-' , date = True ) : + """ Create the temporary directory + The directory will be cleaned-up and deleted at-exit. + """ + return CleanUp.tempdir ( suffix = suffix , prefix = prefix, date = date ) + + # ========================================================================= + ## Ccreate the name for the temporary file + # The file will be deleted at-axit + @classmethod + def tempfile ( cls , suffix = '-tmp' , prefix = 'ostap-TMP-' , dir = None , date = True ) : + """ Create the name for the temporary file + The file will be deleted at-axit + """ + return CleanUp.tempfile ( suffix = suffix , prefix = prefix, dir = dir , date = date ) # ============================================================================ ## Context manager to cleanup PID-dependent directories @@ -522,35 +587,37 @@ def __init__ ( self , suffix = '' , prefix = '' , dir = None ) : @property def filename ( self ) : - """`filename': the actual name of temporary file""" + """`filename' : the actual name of temporary file""" return self.__filename + # ========================================================================= ## context manager enter: no action def __enter__ ( self ) : - """Context manager enter: no action""" + """ Context manager enter: no action""" return self + # ========================================================================= ## Context manager exit: delete the file def __exit__ ( self , *_ ) : - """Context manager exit: delete the file + """ Context manager exit: delete the file """ self._remove_the_file_ ( self.__filename ) self.__filename = '' ## delete the file def __del__ ( self ) : - """Delete the temporary file + """ Delete the temporary file """ if not self.__filename : return if ( not self.__finalizer ) or self.__finalizer.detach () : self._remove_the_file_ ( self.__filename ) self.__filename = '' + # ========================================================================= @classmethod def _remove_the_file_ ( cls , name ) : if name and os.path.exists ( name ) : CleanUp.remove_file ( name ) - # ============================================================================= if '__main__' == __name__ : diff --git a/ostap/utils/utils.py b/ostap/utils/utils.py index 88722478..096fd26b 100644 --- a/ostap/utils/utils.py +++ b/ostap/utils/utils.py @@ -101,7 +101,9 @@ 'memoize' , ## Simple lightweight unbounded cache 'absproperty' , ## abstract property decorator 'classprop' , ## class property decorator - 'numcalls' , ## decoratro for #ncalls + 'numcalls' , ## decorator for #ncalls + ## + 'file_size' , ## get cumulative size of files/directories ## 'hadd' , ## merge ROOT files using command `hadd` 'num_fds' , ## get number of opened file descriptors @@ -1922,6 +1924,28 @@ def seed ( self ) : return self.__seed +# ============================================================================== +## get the total size of files/directories +# @code +# size = file_size ( 'a.f' , 'b.f' 'c.dir' ) +# @endfcode +def file_size ( *files ) : + """ Get the total size of files/directories + >>> size = file_size ( 'a.f' , 'b.f' 'c.dir' ) + """ + size = 0 + for name in files : + if not os.path.exists ( name ) : continue + elif os.path.islink ( name ) : continue + elif os.path.isfile ( name ) : size += os.path.getsize ( name ) + elif os.path.isdir ( name ) : + for dirpath , dirnames , filenames in os.walk ( name ) : + for f in filenames: + fp = os.path.join ( dirpath , f ) + if not os.path.islink ( fp ): + size += os.path.getsize ( fp ) + return size + # ============================================================================== ## Context manager to set/keep seed/state of ROOT random generator # @code