Skip to content

Commit

Permalink
Add copy/update functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Coates committed Nov 26, 2014
1 parent 37ca298 commit 142f045
Showing 1 changed file with 62 additions and 31 deletions.
93 changes: 62 additions & 31 deletions pcp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ machines as possible to prevent local network bottlenecks.
parser.add_argument("-ld",
help="Do not stripe diretories.", default=False,
action="store_true")
parser.add_argument("-u",
help="Copy only when the source file is newer than the destination file, or the destination file is missing.", default=False, action="store_true")

parser.add_argument("-R",
help=("Restart a copy from a checkpoint file DUMPFILE."),
Expand Down Expand Up @@ -322,7 +324,7 @@ def scantree(sourcedir, destdir, statedb):
rate = (totalfiles + totaldirs) / walltime
walltime = time.strftime("%H hrs %M mins %S secs",
time.gmtime(walltime))
print ("Scan Done. Did %i files, %i dirs in %s"
print ("Phase I done: Did %i files, %i dirs in %s"
" (%.0f items/sec)."
% (totalfiles, totaldirs, walltime, rate))
# Shuffle rows. If we don't do this, chunks of files tend to be copied at
Expand Down Expand Up @@ -892,8 +894,7 @@ def ShutdownWorkers(starttime):
totalelapsedtime = endtime - starttime

print ""
print "Transfer Statisics:"
print ""
print "Copy Statisics:"

for r in range(1, workers):
filescopied, md5done, bytescopied, byteschksummed, copytime, \
Expand Down Expand Up @@ -1156,7 +1157,22 @@ class copydirtree(parallelwalk.ParallelWalk):
"""Walk the source directory tree in parallel, creating the destination tree
as we go. Return the list of files we encountered."""
def ProcessFile(self, filename):
self.results[1].append(filename)
if UPDATE:
# check to see if the destination exists
destination = mungePath(sourcedir, destdir, filename)
try:
dststat = safestat(destination)
# We can't access the file at the destination, so copy it.
except OSError, error:
self.results[1].append(filename)
return()
srcstat = safestat(filename)
# source is newer, so copy the file
if srcstat.st_mtime > dststat.st_mtime:
self.results[1].append(filename)
else:
self.results[1].append(filename)
return()

def ProcessDir(self, directoryname):
newdir = mungePath(sourcedir, destdir, directoryname)
Expand All @@ -1172,7 +1188,6 @@ class fixtimestamp(parallelwalk.ParallelWalk):
stat = safestat(directoryname)
newdir = mungePath(sourcedir, destdir, directoryname)
os.utime(newdir, (stat.st_atime, stat.st_mtime))


class MPIargparse(argparse.ArgumentParser):
"""Subclass argparse so we can add a call to Abort, to tidy up MPI bits and pieces."""
Expand All @@ -1197,25 +1212,29 @@ workers = comm.size
hostname = os.uname()[1]
INFINITY = float("inf")
STARTEDCOPY = False # flag to see whether we can start checkpointing.
resumed = False
# Signal handler to checkpoint on SIGUSR1
signal.signal(signal.SIGUSR1, handler)

try:
args = parseargs()
# Basic sanity checks for MPI.
if rank == 0:

args = parseargs()
timeout = args.d # dead worker timeout
# If we are a restored job, read in our arguments
# from the restoredb instead.

# Older openMPIs are buggy
checkVersion()

if workers < 2:
print ("ERROR: Only %i processes running. Did you invoke me via"
" mpirun?") % workers
print ("This program requires at least 2 processes to run"
" correctly.")
exit(0)

# Check that we are actually alive
timeout = args.d
checkAlive(rank, workers, timeout)

# Check to see if we are resumed from checkpoint, and restore the runtime
# options to all ranks if we are.
if rank == 0:
if args.R:
statedb, args = restoreDB(args.R)
resumed = True
Expand All @@ -1225,15 +1244,10 @@ try:
pargs = pickle.dumps(args)
statedb.execute("""INSERT OR REPLACE INTO ARGUMENTS (ARGS, ID)
VALUES(?,1)""", (pargs,))
else:
timeout = 0
args = 0
args, resumed = distribArgs((args, resumed))
if rank > 0:
statedb = None

# Check the workers are alive and send them the runtime arguments.
checkAlive(rank, workers, timeout)
args = distribArgs(args)

MD5SUM = args.c # checksum copy
DRYRUN = args.dry_run # Dry run
MAXTRIES = args.t # number of retries on IO error
Expand All @@ -1253,7 +1267,7 @@ try:
sourcedir = args.SOURCE.rstrip(os.path.sep) # source
destdir = args.DEST.rstrip(os.path.sep) # destination
glob = args.g # only copy files matching glob

UPDATE = args.u # Are we doing an update copy?
CHUNKSIZE = 1024 * 1024 * args.b

# Set the final state of process
Expand All @@ -1272,8 +1286,12 @@ try:
print "SOURCE %s" %sourcedir
print "DESTINATION %s" %destdir

if UPDATE:
print "Will only copy files if source is newer than destination"
print " or destination does not exist."

if DUMPDB:
print "Will checkpoint every %i minutes to %s" %(DUMPINTERVAL, DUMPDB)
print "Will checkpoint every %i minutes to %s" %(args.Km, DUMPDB)
if LSTRIPE:
print "Will copy lustre stripe information."

Expand All @@ -1294,10 +1312,12 @@ try:

sanitycheck(sourcedir, destdir)
starttime = time.time()
print "Scanning directory structure..."

# All ranks take part in the scan
if not resumed:
if rank == 0:
print ""
print "Starting phase I: Scanning and copying directory structure..."
scantree(sourcedir, destdir, statedb)

if rank == 0:
Expand All @@ -1310,25 +1330,36 @@ try:

print "Will only copy files matching %s (%i of %i)" \
% (glob, matchingfiles, totalfiles)

STARTEDCOPY = True
print "Copying files..."
print ""
if resumed:
print "Resuming phase II: Copying files..."
else:
print "Starting phase II: Copying files..."

DispatchWork(statedb)
print "Phase II done."

STARTEDCOPY = False
ShutdownWorkers(starttime)

if PRESERVE:
print "R0: %s Setting directory timestamps..." %timestamp()
print
print "Starting phase III: Setting directory timestamps..."
starttime = time.time()
fixupDirTimeStamp(sourcedir)
print "RO: %s Done." %timestamp()
print "Master process done."
endtime = time.time()
walltime = time.strftime("%H hrs %M mins %S secs",
time.gmtime(endtime-starttime))
print "Phase III Done. %s" %walltime
exit(0)

else:
# file copy workers
ConsumeWork(sourcedir, destdir)
if PRESERVE:
fixupDirTimeStamp(sourcedir)
exit(0)
if PRESERVE:
fixupDirTimeStamp(sourcedir)
exit(0)

# We need to call MPI ABORT in our exception handler,
# otherwise the other MPI processes spin forever.
Expand Down

0 comments on commit 142f045

Please sign in to comment.