Skip to content

Commit

Permalink
1. improve parallel_copy, rely on xargs when GNU parallel is n…
Browse files Browse the repository at this point in the history
…ot available

  1. add add parallel `sync, based on `rsync -a` & `xargs/parallel`
  • Loading branch information
VanyaBelyaev committed Jun 10, 2024
1 parent 4023b13 commit d87ab20
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 32 deletions.
5 changes: 4 additions & 1 deletion ReleaseNotes/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
## New features

1. Improve `addTMVAResponse` and `addChoppingResponse` (and their paralell analogues)

1. improve `parallel_copy`, rely on `xargs` when `GNU parallel` is not available
1. add add parallel `sync, based on `rsync -a` & `xargs/parallel`


## Backward incompatible

## Bug fixes
Expand Down
116 changes: 104 additions & 12 deletions ostap/io/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,40 @@ def copy_files ( self , new_dir = None , parallel = False , also_bad = False ) :
progress = not self.silent )

if not self.silent :
logger.info ( "copy_files: #%d files are copied to '%s'" % ( len ( copied ) , nd ) )
from ostap.utils.basic import commonpath
cp = commonpath ( copied )
logger.info ( "copy_files: #%d files are copied to '%s'" % ( len ( copied ) , cp ) )

return self.clone ( files = sorted ( copied ) )


# =========================================================================
## Sync/copy all the files to new directory
# - new directory will be created (if needed)
# - common path (prefix) for all files will be replaced by new directory
def sync_files ( self , new_dir = None , parallel = False , also_bad = False ) :
"""Sync/copy all the files to new directory
- new directory will be created (if needed)
- common path (prefix) for all files will be replaced by new directory
"""

files_to_copy = set ( self.__files )
if also_bad and self.bad_files :
files_to_copy |= set ( self.bad_files )

## use generic function
copied = sync_files ( files_to_copy ,
new_dir = new_dir ,
parallel = parallel ,
progress = not self.silent )

if not self.silent :
from ostap.utils.basic import commonpath
cp = commonpath ( copied )
logger.info ( "sync_files: #%d files are sync/copied to '%s'" % ( len ( copied ) , cp ) )

return self.clone ( files = sorted ( copied ) )

# =========================================================================
## Copy set of files into new directory.
# Files are copied in a way to preserve they name uniqueness:
Expand All @@ -647,11 +677,12 @@ def copy_files ( self , new_dir = None , parallel = False , also_bad = False ) :
# @param copier the low-level copy routine ot be used
# @param progress show progrees bar if possible
# @return list of copied files
def copy_files ( files_to_copy ,
new_dir = None ,
parallel = False ,
copier = None ,
progress = False ) :
def copy_files ( files_to_copy ,
new_dir = None ,
parallel = False ,
copier = None ,
copy_cmd = '' ,
progress = False ) :
"""Copy set of files into new directory.
Files are copied in a way to preserve they name uniqueness:
Expand Down Expand Up @@ -679,7 +710,7 @@ def copy_files ( files_to_copy ,
## create directory if needed
if not os.path.exists ( new_dir ) : os.makedirs ( new_dir )

from ostap.utils.basic import writeable, copy_file
from ostap.utils.basic import writeable
assert writeable ( new_dir ), \
"copy_files: the destination directory `%s' is not writable!" % new_dir

Expand All @@ -701,17 +732,18 @@ def copy_files ( files_to_copy ,

nfiles = len ( pairs )

if not copier : copier = copy_file
if not copier :
from ostap.utils.basic import copy_file
copier = copy_file

from ostap.utils.basic import numcpu
from ostap.utils.utils import which

if parallel and 1 < nfiles and 1 < numcpu () :

if which ( 'parallel' ) : from ostap.utils.parallel_copy import copy_files as parallel_copy
else : from ostap.parallel.parallel_copy import copy_files as parallel_copy
from ostap.utils.parallel_copy import copy_files as parallel_copy

copied = parallel_copy ( pairs , maxfiles = 1 , copier = copier , progress = progress )
copied = parallel_copy ( pairs , maxfiles = 1 , copier = copier , copy_cmd = copy_cmd , progress = progress )
copied = tuple ( f [ 1 ] for f in copied )

else :
Expand All @@ -725,7 +757,67 @@ def copy_files ( files_to_copy ,
copied.append ( result )

return tuple ( copied )


# =========================================================================
## Sync/copy set of files into new directory.
# Files are copied in a way to preserve they name uniqueness:
# - a.txt -> NEWDIR/a.txt
# - subdira.txt -> NEWDIR//subdira.txt
# - subdir/subdir/a.txt -> NEWDIR/subdir/subdir.txt
# Essentially a common prefix for all input files will be replaced
# by the destination directory
# @param file_to_copy sequence of files to be copied
# @param new_dir destination directory, for None temproary directory wil lbe used
# @param copier the low-level copy routine ot be used
# @param progress show progrees bar if possible
# @return list of copied files
def sync_files ( files_to_copy ,
new_dir = None ,
parallel = False ,
copier = None ,
copy_cmd = '' ,
progress = False ) :
"""Sync/copy set of files into new directory.
Files are copied in a way to preserve they name uniqueness:
- a.txt -> NEWDIR/a.txt
- subdira.txt -> NEWDIR//subdira.txt
- subdir/subdir/a.txt -> NEWDIR/subdir/subdir.txt
Essentially a common prefix for all input files is replaced
by the destination directory
- file_to_copy sequence of files to be copied
- new_dir destination directory, for None temproary directory wil lbe used
- copier low-level copy routine ot be used
- progress show progrees bar if possible
A list of copied files is returned
"""

from ostap.utils.utils import which
if not which ( 'rsync' ) :
from ostap.utils.basic import copy_file
return copy_files ( files_to_copy ,
new_dir = new_dir ,
parallel = parallel ,
copier = copy_file ,
copy_cmd = '' ,
progress = progress )

if not copy_cmd : copy_cmd = 'rsync -a'
if not copier :
from ostap.utils.basic import sync_file
copier = sync_file

return copy_files ( files_to_copy ,
new_dir = new_dir ,
parallel = parallel ,
copier = copier ,
copy_cmd = copy_cmd ,
progress = progress )

# =============================================================================
if '__main__' == __name__ :

Expand Down
7 changes: 6 additions & 1 deletion ostap/tools/reweighter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ class Reweighter(object) :
def __init__ ( self , **kwargs ) :

with warnings.catch_warnings():
warnings.simplefilter("ignore")
warnings.simplefilter("ignore")

import numpy
if not hasattr ( numpy , 'float' ) :
numpy.float = numpy.float64

from hep_ml.reweight import GBReweighter as GBRW
self.__reweighter = GBRW ( **kwargs )
self.__variables = ()
Expand Down
43 changes: 42 additions & 1 deletion ostap/utils/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ def mtime ( path ) :
# =========================================================================
## copy source file into destination, creating intermediate directories
# @see https://stackoverflow.com/questions/2793789/create-destination-path-for-shutil-copy-files/49615070
def copy_file ( source , destination , progress = False ) :
def copy_file ( source ,
destination ,
progress = False ) :
"""Copy source file into destination, creating intermediate directories
- see https://stackoverflow.com/questions/2793789/create-destination-path-for-shutil-copy-files/49615070
"""
Expand All @@ -378,6 +380,45 @@ def copy_file ( source , destination , progress = False ) :
from ostap.utils.utils import copy_with_progress
return copy_with_progress ( source , destination )

# =========================================================================
## Sync/copy source file into destination, creating intermediate directories, using 'rsync -a'
# @see https://stackoverflow.com/questions/2793789/create-destination-path-for-shutil-copy-files/49615070
def sync_file ( source ,
destination ,
progress = False ) :
"""Sync/Copy source file into destination, creating intermediate directories
- see https://stackoverflow.com/questions/2793789/create-destination-path-for-shutil-copy-files/49615070
"""
from ostap.utils.utils import which
rsync = which ( 'rsync' )
if not rsync :
return copy_file ( source = source , destination = destination , progress = progress )

assert os.path.exists ( source ) and os.path.isfile ( source ), \
"sync_file: `source' %s does not exist!" % source

destination = os.path.abspath ( destination )
destination = os.path.normpath ( destination )
destination = os.path.realpath ( destination )

if os.path.exists ( destination ) and os.path.isdir ( destination ) :
destination = os.path.join ( destination , os.path.basename ( source ) )

make_dirs ( os.path.dirname ( destination ) , exist_ok = True )

import subprocess, shlex

if progress : command = 'rsync --progress -a %s %s ' % ( source , destination )
else : command = 'rsync -a %s %s ' % ( source , destination )

subprocess.check_call ( shlex.split ( command ) )

if not os.path.exists ( destination ) :
logger.warning ( "copy_files: no expected output '%s'" % nf )
return ''

return destination

# =============================================================================
if ( 3 , 4 ) <= sys.version_info :
## Get number of cores/CPUs
Expand Down
Loading

0 comments on commit d87ab20

Please sign in to comment.