From 59f5ae766645101850126fe5b065232ad7a4079c Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Fri, 14 Nov 2014 16:19:28 +0000 Subject: [PATCH 1/7] Switch the directory walk code to the parallel version. --- pcp | 160 +++++++++++++++++++++++++++++------------------------------- 1 file changed, 77 insertions(+), 83 deletions(-) diff --git a/pcp b/pcp index 59342df..0b7adda 100755 --- a/pcp +++ b/pcp @@ -31,7 +31,7 @@ import math import signal import gzip -from pcplib import fastwalk +from pcplib import parallelwalk from pcplib import lustreapi from collections import deque from mpi4py import MPI @@ -101,11 +101,6 @@ ATTEMPTS INTEGER DEFAULT 0, LASTRANK INTEGER DEFAULT 0)""") filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK)""") filedb.execute("""CREATE INDEX MD5_IDX ON FILECPY(STATE, SIZE, LASTRANK)""") - filedb.execute("""CREATE TABLE DIRLIST ( -ID INTEGER PRIMARY KEY AUTOINCREMENT, -DIRNAME TEXT, -MTIME INTEGER, -ATIME INTEGER)""") # Table to hold program arguments filedb.execute("""CREATE TABLE ARGUMENTS( @@ -288,35 +283,39 @@ def sanitycheck(sourcedir, destdir): print "Exiting." Abort() -def scantree(sourcedir, statedb): - """Scans sourcedir recursively and returns a list of filestate objects and - a list of directories.""" +def scantree(sourcedir, destdir, statedb): + """walk the src file tree, create the destination directories and put the + files to be copied into the database.""" + totaldirs = 0 - print "R%i: Scanning list of files to copy..." % (rank) - if not os.path.isdir(sourcedir): - print "R%i: Error: %s not a directory" % (rank, sourcedir) - Abort() - startime = time.time() - statedb.execute("""INSERT INTO DIRLIST (DIRNAME) VALUES(?)""", - (sourcedir,)) - for source, dirs, files in fastwalk.fastwalk(sourcedir): - dirlist = [(os.path.join(source, d),) for d in dirs ] - statedb.executemany("""INSERT INTO DIRLIST (DIRNAME) VALUES (?)""", - dirlist) - fullpath = [ (os.path.join(source, f),) for f in files ] - statedb.executemany("""INSERT INTO FILECPY (FILENAME) VALUES (?)""", - fullpath) + if rank == 0: + startime = time.time() + if not os.path.isdir(sourcedir): + print "R%i: Error: %s not a directory" % (rank, sourcedir) + Abort() - endtime = time.time() - walltime = endtime - startime - totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] - totaldirs = statedb.execute("SELECT COUNT(*) FROM DIRLIST").fetchone()[0] - rate = (totalfiles + totaldirs) / walltime - walltime = time.strftime("%H hrs %M mins %S secs", - time.gmtime(walltime)) - print ("R%i: Scan Done. Did %i files, %i dirs in %s" - " (%.0f items/sec)." - % (rank, totalfiles, totaldirs, walltime, rate)) + # results are ([directories][files]) + walker = copydirtree(comm, results=([],[])) + listofpaths = walker.Execute(sourcedir) + + if rank == 0: + for l in listofpaths: + for f in l[1]: + statedb.execute("""INSERT INTO FILECPY (FILENAME) VALUES (?)""", + (f,)) + + totaldirs += len(l[0]) + + endtime = time.time() + walltime = endtime - startime + totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] + + rate = (totalfiles + totaldirs) / walltime + walltime = time.strftime("%H hrs %M mins %S secs", + time.gmtime(walltime)) + print ("Scan Done. Did %i files, %i dirs in %s" + " (%.0f items/sec)." + % (totalfiles, totaldirs, walltime, rate)) return() def fadviseSeqNoCache(fileD): @@ -528,31 +527,6 @@ def ConsumeWork(sourcedir, destdir): return(0) -def copyDirectories(sourcedir, destdir, statedb): - """Copy a list of directories in the statedb from sourcedir to destdir""" - global WARNINGS - for idx, d in statedb.execute("SELECT ID, DIRNAME FROM DIRLIST"): - destination = mungePath(sourcedir, destdir, d) - if DRYRUN: - if VERBOSE: - print "R%i: %s mkdir %s" % (rank, timestamp(), destination) - else: - i = 0 - while i < MAXTRIES: - try: - createDir(d, destination, statedb, idx) - break - except IOError, error: - i += 1 - print "R%i %s WARNING: mkdir error on %s attempt %i" \ - % (rank, timestamp(), destdir, i) - WARNINGS += 1 - if i == MAXTRIES: - print "R%i %s ERROR: Max number of retries on %s exceeded" \ - % (rank, timestamp(), sourcedir) - raise - - def checkAlive(rank, workers, timeout): """Quirky farm nodes can cause the MPI runtime to lock up during the task spawn. This routine checks whether nodes can exchange messages. If a node @@ -934,14 +908,12 @@ def ShutdownWorkers(starttime): time.gmtime(totalelapsedtime))) print "Warnings %i" % WARNINGS -def createDir(sourcedir, destdir, statedb, idx): +def copyDir(sourcedir, destdir): """Create destdir, setting stripe attributes to be the - same as sourcedir. Timestamps are save in the statedb for use later.""" + same as sourcedir.""" global WARNINGS # Don't worry is the destination directory already exists - if VERBOSE: - print "R%i: %s mkdir %s" % (rank, timestamp(), destdir), try: os.mkdir(destdir) @@ -952,8 +924,6 @@ def createDir(sourcedir, destdir, statedb, idx): if PRESERVE: try: stat = safestat(sourcedir) - statedb.execute("UPDATE DIRLIST SET ATIME = ? , MTIME = ? WHERE ID =?", - (stat.st_atime, stat.st_mtime, idx)) os.chmod(destdir, stat.st_mode) os.chown(destdir, stat.st_uid, stat.st_gid) # Don't worry if we can't set the permissions/uid to be the same @@ -971,16 +941,10 @@ def createDir(sourcedir, destdir, statedb, idx): if LSTRIPE: layout = lustreapi.getstripe(sourcedir) if (LSTRIPE and layout.isstriped()) or ( FORCESTRIPE and not NODIRSTRIPE ): - if VERBOSE: - print "(striped)" lustreapi.setstripe(destdir, stripecount=-1) else: - if VERBOSE: - print "(non striped)" lustreapi.setstripe(destdir, stripecount=1) - else: - if VERBOSE: - print "" + except IOError, error: if error.errno != 13: raise @@ -989,10 +953,10 @@ def createDir(sourcedir, destdir, statedb, idx): % (rank, destdir) -def fixupDirTimeStamp(sourcedir, destdir, statedb): - for dirname, atime, mtime in statedb.execute("""SELECT DIRNAME, ATIME, MTIME FROM DIRLIST"""): - destination = mungePath(sourcedir, destdir, dirname) - os.utime(destination, (atime, mtime)) +def fixupDirTimeStamp(sourcedir): + walker = fixtimestamp(comm) + walker.Execute(sourcedir) + def mungePath(src, dst, f): """Convert the sourcepath to the desinationpath""" @@ -1042,9 +1006,12 @@ def copyFile (src, dst, chunk): if LSTRIPE or FORCESTRIPE: stripestatus = createstripefile(src, dst, size) # Create a spare file to fill in later. - outfile = open(dst, "wb") - outfile.truncate(size) - outfile.close() + + if not DRYRUN: + outfile = open(dst, "wb") + outfile.truncate(size) + outfile.close() + return(size, 0, 0, stripestatus, 6) else: @@ -1169,6 +1136,28 @@ def distribArgs(args): args = comm.bcast(args, root=0) return(args) +class copydirtree(parallelwalk.ParallelWalk): + """Walk the source directory tree in parallel, creating the destination tree + as we go. Return the list of files we encountered.""" + def ProcessFile(self, filename): + self.results[1].append(filename) + + def ProcessDir(self, directoryname): + newdir = mungePath(sourcedir, destdir, directoryname) + self.results[0].append(newdir) + if not DRYRUN: + copyDir(directoryname, newdir) + + +class fixtimestamp(parallelwalk.ParallelWalk): + """Walk the source directory tree and copy the timestamps to the + destination tree.""" + def ProcessDir(self, directoryname): + stat = safestat(directoryname) + newdir = mungePath(sourcedir, destdir, directoryname) + os.utime(newdir, (stat.st_atime, stat.st_mtime)) + + class MPIargparse(argparse.ArgumentParser): """Subclass argparse so we can add a call to Abort, to tidy up MPI bits and pieces.""" def error(self,message): @@ -1223,6 +1212,7 @@ try: else: timeout = 0 args = 0 + statedb = None # Check the workers are alive and send them the runtime arguments. checkAlive(rank, workers, timeout) @@ -1287,9 +1277,12 @@ try: sanitycheck(sourcedir, destdir) starttime = time.time() + print "Scanning directory structure..." + # All ranks take part in the scan + scantree(sourcedir, destdir, statedb) + if rank == 0: if not resumed: - scantree(sourcedir, statedb) if glob: totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] results = statedb.execute("DELETE FROM FILECPY WHERE NOT FILENAME GLOB ?", @@ -1298,23 +1291,24 @@ try: print "Will only copy files matching %s (%i of %i)" \ % (glob, matchingfiles, totalfiles) - print "R0: Copying directories..." - copyDirectories(sourcedir, destdir, statedb) STARTEDCOPY = True print "Copying files..." DispatchWork(statedb) + ShutdownWorkers(starttime) + if PRESERVE: print "R0: %s Setting directory timestamps..." %timestamp() - fixupDirTimeStamp(sourcedir, destdir, statedb) + fixupDirTimeStamp(sourcedir) print "RO: %s Done." %timestamp() - ShutdownWorkers(starttime) print "Master process done." exit(0) else: # file copy workers ConsumeWork(sourcedir, destdir) + if PRESERVE: + fixupDirTimeStamp(sourcedir) exit(0) # We need to call MPI ABORT in our exception handler, From 2591c267ac3cd8031883f6da42fc46a35f9af3c2 Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Fri, 14 Nov 2014 18:06:36 +0000 Subject: [PATCH 2/7] Set default chunk size to 500MB --- README | 2 +- pcp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README b/README index 60a2489..9d6d4f5 100644 --- a/README +++ b/README @@ -75,7 +75,7 @@ pcp has a number of useful options; use pcp -h to see a description. Chunking -------- -pcp will split files up into 100MB chunks, which can then be copied in +pcp will split files up into 500MB chunks, which can then be copied in parallel. Files smaller than the chunk size will be copied in one go. You can alter the chunk size with the -b flag. The size is in megabytes. For optimal performance the chunk size should be a whole multiple of the lustre stripe size diff --git a/pcp b/pcp index 0b7adda..34255db 100755 --- a/pcp +++ b/pcp @@ -170,7 +170,7 @@ machines as possible to prevent local network bottlenecks. parser.add_argument("-b", help="Copy files larger than C Mbytes in C Mbyte chunks", - default=50, type=int, metavar="C") + default=500, type=int, metavar="C") parser.add_argument("-c", help="verify copy with checksum", default=False, action="store_true") From 07286f74bda3fafcba54e1fa3ec60a89e92b7112 Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Mon, 17 Nov 2014 14:15:41 +0000 Subject: [PATCH 3/7] Shuffle the files to stop chunk-copying un-striped files from creating hot OSTs. --- pcp | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/pcp b/pcp index 34255db..24da87e 100755 --- a/pcp +++ b/pcp @@ -28,6 +28,7 @@ import ctypes import sqlite3 import pickle import math +import random import signal import gzip @@ -92,6 +93,7 @@ def createDB(): filedb.text_factory = str filedb.execute("""CREATE TABLE FILECPY( ID INTEGER PRIMARY KEY AUTOINCREMENT, +SORTORDER INTEGER DEFAULT -1, FILENAME TEXT, STATE INTEGER DEFAULT 0, SRCMD5 TEXT, @@ -99,9 +101,7 @@ SIZE INTEGER, CHUNKS INTEGER DEFAULT -1, ATTEMPTS INTEGER DEFAULT 0, LASTRANK INTEGER DEFAULT 0)""") - filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK)""") - filedb.execute("""CREATE INDEX MD5_IDX ON FILECPY(STATE, SIZE, LASTRANK)""") - + filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK, SORTORDER)""") # Table to hold program arguments filedb.execute("""CREATE TABLE ARGUMENTS( ID INTEGER PRIMARY KEY AUTOINCREMENT, @@ -316,6 +316,10 @@ def scantree(sourcedir, destdir, statedb): print ("Scan Done. Did %i files, %i dirs in %s" " (%.0f items/sec)." % (totalfiles, totaldirs, walltime, rate)) + # Shuffle rows. If we don't do this, chunks of files tend to be copied at + # the same time, causing hot OSTs in the case of unstriped files. + statedb.execute("""UPDATE FILECPY SET SORTORDER = ABS(RANDOM() % ?)""", + (totalfiles,)) return() def fadviseSeqNoCache(fileD): @@ -564,6 +568,7 @@ def DispatchWork(statedb): global CHECKPOINTNOW global COPYREMAINS global MD5REMAINS + global TOTALROWS # Queue containing worker who are ready for work. idleworkers = deque() @@ -572,6 +577,8 @@ def DispatchWork(statedb): if DUMPDB: cptimer = Timer() cptimer.start() + TOTALROWS = statedb.execute \ + ("""SELECT COUNT(*) FROM FILECPY""").fetchone()[0] COPYREMAINS = statedb.execute \ ("""SELECT COUNT(*) FROM FILECPY WHERE STATE == 0""").fetchone()[0] @@ -628,23 +635,20 @@ def DispatchWork(statedb): lastrank = -1 else: lastrank = worker - - task = statedb.execute("""SELECT * FROM FILECPY WHERE STATE == 0 AND - LASTRANK <> ? LIMIT 1""",(lastrank, )).fetchone() + task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 0 AND + LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone() if task: - statedb.execute("""UPDATE FILECPY SET STATE = 1 WHERE ID = ?""",(task[0],)) - msg = ("COPY", (task[1], task[0], task[5])) + statedb.execute("""UPDATE FILECPY SET STATE = 1 WHERE ID = ?""",(task[1],)) + msg = ("COPY", (task[0], task[1], task[2])) comm.send(msg, dest=worker, tag=1) continue if MD5SUM: - # We now know the size, and can improve load balancing between workers by - # selecting the largest files first. - task = statedb.execute("""SELECT * FROM FILECPY WHERE STATE == 2 AND - LASTRANK <> ? ORDER BY SIZE DESC LIMIT 1""",(lastrank, )).fetchone() + task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 2 AND + LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone() if task: - statedb.execute("""UPDATE FILECPY SET STATE = 3 WHERE ID = ?""",(task[0],)) - msg = ("MD5", (task[1], task[0], task[5])) + statedb.execute("""UPDATE FILECPY SET STATE = 3 WHERE ID = ?""",(task[1],)) + msg = ("MD5", (task[0], task[1], task[2])) comm.send(msg, dest=worker, tag=1) continue # There is work, but not for this worker. Send to the back of the queue @@ -726,7 +730,8 @@ def processCopy(statedb, payload): global WARNINGS global COPYREMAINS global MD5REMAINS - + global TOTALROWS + md5sum = payload[0] idx = payload[1] workerrank = payload[2] @@ -834,10 +839,12 @@ def processCopy(statedb, payload): chunks = int(math.ceil(size / float(CHUNKSIZE))) with statedb: for i in range(chunks): - statedb.execute("INSERT INTO FILECPY (FILENAME, CHUNKS) VALUES (?,?)", - (filename, i)) + sortid = random.randint(0, TOTALROWS + chunks) + statedb.execute("INSERT INTO FILECPY (FILENAME, SORTORDER, CHUNKS) VALUES (?,?,?)", + (filename, sortid, i)) statedb.execute("DELETE FROM FILECPY WHERE ID = ?", (idx,)) COPYREMAINS += chunks-1 + TOTALROWS += chunks if MD5SUM: MD5REMAINS += chunks-1 @@ -1231,6 +1238,7 @@ try: DUMPDB = args.K # Checkpoint to this directory. DUMPINTERVAL = args.Km * 60 # Checkpoint period CHECKPOINTNOW = False + TOTALROWS = 0 # The total number of files and chunks to be copied. COPYREMAINS = 0 # remaining number of items to copy. MD5REMAINS = 0 # remaining number of items to md5. sourcedir = args.SOURCE.rstrip(os.path.sep) # source @@ -1278,8 +1286,10 @@ try: sanitycheck(sourcedir, destdir) starttime = time.time() print "Scanning directory structure..." + # All ranks take part in the scan - scantree(sourcedir, destdir, statedb) + if not resumed: + scantree(sourcedir, destdir, statedb) if rank == 0: if not resumed: From 9d853851a1633c91f760dbf81c300abd203730fe Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Mon, 17 Nov 2014 16:19:02 +0000 Subject: [PATCH 4/7] Fix the index --- pcp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pcp b/pcp index 24da87e..0639f8a 100755 --- a/pcp +++ b/pcp @@ -101,7 +101,7 @@ SIZE INTEGER, CHUNKS INTEGER DEFAULT -1, ATTEMPTS INTEGER DEFAULT 0, LASTRANK INTEGER DEFAULT 0)""") - filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK, SORTORDER)""") + filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, SORTORDER, LASTRANK)""") # Table to hold program arguments filedb.execute("""CREATE TABLE ARGUMENTS( ID INTEGER PRIMARY KEY AUTOINCREMENT, From 37ca2985900f06e381066a872d02e281b0cd27ea Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Mon, 17 Nov 2014 17:05:56 +0000 Subject: [PATCH 5/7] Update the algorithm description. --- pcp | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/pcp b/pcp index 0639f8a..d7ecfba 100755 --- a/pcp +++ b/pcp @@ -1,17 +1,26 @@ #!/usr/bin/env python -""" This program copies a directory tree in parallel. - -Algorithm: - -pcp implements a master-slave pattern. R0 is the master and R1...RX are the -slaves. R0 scan the source directory tree and put files to be copied on a -queue on the master. The master dispatches files in the queue to ranks R1..RX, -which do the copy.""" # Parallel cp program # Copyright (c) Genome Research Ltd 2012 # Author Guy Coates # This program is released under GNU Public License V2 (GPLv2) +""" This program copies a directory tree in parallel. + +Algorithm: + +pcp runs in two phases. Phase I is a parallel walk of the file tree, involving all +MPI ranks in a peer-to-peer algorithm. The walk constructs the list of files to be +copied and creates the destination directory higherarcy. + +In phase II, the actual files are copied. Phase II uses a master-slave algorithm. +R0 is the master and dispatches file copy instructions to the slaves (R1...Rn). +Although slightly less efficient than the peer-to-peer algorithm in phase I, +using master-slave simplifies the checkpoint/restore implementation. + +As stat is a relatively slow operation on lustre, the code jumps through some +hoops to avoid doing stats during Phase I. + +""" #import rpdb2 #rpdb2.start_embedded_debugger("XXXX", fAllowRemote=True,timeout=10) From 142f04593942a6d45b7568ecca5f2682e0c4361b Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Wed, 26 Nov 2014 10:35:11 +0000 Subject: [PATCH 6/7] Add copy/update functionality --- pcp | 93 ++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/pcp b/pcp index d7ecfba..71323f2 100755 --- a/pcp +++ b/pcp @@ -214,6 +214,8 @@ machines as possible to prevent local network bottlenecks. parser.add_argument("-ld", help="Do not stripe diretories.", default=False, action="store_true") + parser.add_argument("-u", + help="Copy only when the source file is newer than the destination file, or the destination file is missing.", default=False, action="store_true") parser.add_argument("-R", help=("Restart a copy from a checkpoint file DUMPFILE."), @@ -322,7 +324,7 @@ def scantree(sourcedir, destdir, statedb): rate = (totalfiles + totaldirs) / walltime walltime = time.strftime("%H hrs %M mins %S secs", time.gmtime(walltime)) - print ("Scan Done. Did %i files, %i dirs in %s" + print ("Phase I done: Did %i files, %i dirs in %s" " (%.0f items/sec)." % (totalfiles, totaldirs, walltime, rate)) # Shuffle rows. If we don't do this, chunks of files tend to be copied at @@ -892,8 +894,7 @@ def ShutdownWorkers(starttime): totalelapsedtime = endtime - starttime print "" - print "Transfer Statisics:" - print "" + print "Copy Statisics:" for r in range(1, workers): filescopied, md5done, bytescopied, byteschksummed, copytime, \ @@ -1156,7 +1157,22 @@ class copydirtree(parallelwalk.ParallelWalk): """Walk the source directory tree in parallel, creating the destination tree as we go. Return the list of files we encountered.""" def ProcessFile(self, filename): - self.results[1].append(filename) + if UPDATE: + # check to see if the destination exists + destination = mungePath(sourcedir, destdir, filename) + try: + dststat = safestat(destination) + # We can't access the file at the destination, so copy it. + except OSError, error: + self.results[1].append(filename) + return() + srcstat = safestat(filename) + # source is newer, so copy the file + if srcstat.st_mtime > dststat.st_mtime: + self.results[1].append(filename) + else: + self.results[1].append(filename) + return() def ProcessDir(self, directoryname): newdir = mungePath(sourcedir, destdir, directoryname) @@ -1172,7 +1188,6 @@ class fixtimestamp(parallelwalk.ParallelWalk): stat = safestat(directoryname) newdir = mungePath(sourcedir, destdir, directoryname) os.utime(newdir, (stat.st_atime, stat.st_mtime)) - class MPIargparse(argparse.ArgumentParser): """Subclass argparse so we can add a call to Abort, to tidy up MPI bits and pieces.""" @@ -1197,18 +1212,15 @@ workers = comm.size hostname = os.uname()[1] INFINITY = float("inf") STARTEDCOPY = False # flag to see whether we can start checkpointing. +resumed = False +# Signal handler to checkpoint on SIGUSR1 signal.signal(signal.SIGUSR1, handler) + try: + args = parseargs() + # Basic sanity checks for MPI. if rank == 0: - - args = parseargs() - timeout = args.d # dead worker timeout - # If we are a restored job, read in our arguments - # from the restoredb instead. - - # Older openMPIs are buggy checkVersion() - if workers < 2: print ("ERROR: Only %i processes running. Did you invoke me via" " mpirun?") % workers @@ -1216,6 +1228,13 @@ try: " correctly.") exit(0) + # Check that we are actually alive + timeout = args.d + checkAlive(rank, workers, timeout) + + # Check to see if we are resumed from checkpoint, and restore the runtime + # options to all ranks if we are. + if rank == 0: if args.R: statedb, args = restoreDB(args.R) resumed = True @@ -1225,15 +1244,10 @@ try: pargs = pickle.dumps(args) statedb.execute("""INSERT OR REPLACE INTO ARGUMENTS (ARGS, ID) VALUES(?,1)""", (pargs,)) - else: - timeout = 0 - args = 0 + args, resumed = distribArgs((args, resumed)) + if rank > 0: statedb = None - # Check the workers are alive and send them the runtime arguments. - checkAlive(rank, workers, timeout) - args = distribArgs(args) - MD5SUM = args.c # checksum copy DRYRUN = args.dry_run # Dry run MAXTRIES = args.t # number of retries on IO error @@ -1253,7 +1267,7 @@ try: sourcedir = args.SOURCE.rstrip(os.path.sep) # source destdir = args.DEST.rstrip(os.path.sep) # destination glob = args.g # only copy files matching glob - + UPDATE = args.u # Are we doing an update copy? CHUNKSIZE = 1024 * 1024 * args.b # Set the final state of process @@ -1272,8 +1286,12 @@ try: print "SOURCE %s" %sourcedir print "DESTINATION %s" %destdir + if UPDATE: + print "Will only copy files if source is newer than destination" + print " or destination does not exist." + if DUMPDB: - print "Will checkpoint every %i minutes to %s" %(DUMPINTERVAL, DUMPDB) + print "Will checkpoint every %i minutes to %s" %(args.Km, DUMPDB) if LSTRIPE: print "Will copy lustre stripe information." @@ -1294,10 +1312,12 @@ try: sanitycheck(sourcedir, destdir) starttime = time.time() - print "Scanning directory structure..." # All ranks take part in the scan if not resumed: + if rank == 0: + print "" + print "Starting phase I: Scanning and copying directory structure..." scantree(sourcedir, destdir, statedb) if rank == 0: @@ -1310,25 +1330,36 @@ try: print "Will only copy files matching %s (%i of %i)" \ % (glob, matchingfiles, totalfiles) - STARTEDCOPY = True - print "Copying files..." + print "" + if resumed: + print "Resuming phase II: Copying files..." + else: + print "Starting phase II: Copying files..." + DispatchWork(statedb) + print "Phase II done." + + STARTEDCOPY = False ShutdownWorkers(starttime) if PRESERVE: - print "R0: %s Setting directory timestamps..." %timestamp() + print + print "Starting phase III: Setting directory timestamps..." + starttime = time.time() fixupDirTimeStamp(sourcedir) - print "RO: %s Done." %timestamp() - print "Master process done." + endtime = time.time() + walltime = time.strftime("%H hrs %M mins %S secs", + time.gmtime(endtime-starttime)) + print "Phase III Done. %s" %walltime exit(0) else: # file copy workers ConsumeWork(sourcedir, destdir) - if PRESERVE: - fixupDirTimeStamp(sourcedir) - exit(0) + if PRESERVE: + fixupDirTimeStamp(sourcedir) + exit(0) # We need to call MPI ABORT in our exception handler, # otherwise the other MPI processes spin forever. From 5ab4eaf97df5911464e96f776f65f1f2b6138e30 Mon Sep 17 00:00:00 2001 From: Guy Coates Date: Wed, 26 Nov 2014 13:46:38 +0000 Subject: [PATCH 7/7] Update readme file --- README | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/README b/README index 9d6d4f5..ab0cf00 100644 --- a/README +++ b/README @@ -71,6 +71,17 @@ parallel. pcp has a number of useful options; use pcp -h to see a description. +The program runs in three phases: + +In phase I, the source directory tree is crawled, the destination source +tree is created and files are marked for copying depending on the runtime +parameters (see below). + +In phase II, the files themselves are copied, and optionally checksummed. + +If the "preserve" option has been selected, phase III runs and directory +timestamps are copied. + Chunking -------- @@ -119,12 +130,22 @@ If -ld is specified, destination directories will not be striped. (The contents themselves may still be striped). +Update copy +----------- + +If run with the -u flag, pcp will only copy a file if the source file is +newer than the destination file (mtime) or if the destination files does not +exist. + + Checkpointing ------------- pcp support serveral checkpointing methods. The checkpoint allows a copy, -interrupted for any reason, to be restarted. Files which were in the process of -being copied will be recopied in their entirety. +interrupted for any reason, to be restarted. + +Note that checkpoints will only be taken once the program has entered phase II +(copy) stage. If you specify a dumpfile with -K, a checkpoint will be written every 60 minutes. The checkpoint period can be varied with the -Km option. If the @@ -140,8 +161,8 @@ To resume from checkpoint, start pcp with the -R option. All pcp command line parameters will be taken from the dumpfile; any other command line arguments you give to pcp will be ignored. -Note that you can safely resume a checkpoint with a different number of -processes than the original. +Note that you can safely resume a checkpoint using a different number of +MPI processes than the original. Other Useful Options @@ -150,8 +171,8 @@ Other Useful Options A dead worker timeout can be specified with -d; if workers do not respond within timeout seconds of the job starting, the job will terminate. -If run with -p, pcp will attempt to preserve the ownership, permissions and -timestamps of the copied files. +If run with -p (preserve), pcp will attempt to preserve the ownership, +permissions and timestamps of the copied files. Invocation