diff --git a/README b/README index 9d6d4f5..ab0cf00 100644 --- a/README +++ b/README @@ -71,6 +71,17 @@ parallel. pcp has a number of useful options; use pcp -h to see a description. +The program runs in three phases: + +In phase I, the source directory tree is crawled, the destination source +tree is created and files are marked for copying depending on the runtime +parameters (see below). + +In phase II, the files themselves are copied, and optionally checksummed. + +If the "preserve" option has been selected, phase III runs and directory +timestamps are copied. + Chunking -------- @@ -119,12 +130,22 @@ If -ld is specified, destination directories will not be striped. (The contents themselves may still be striped). +Update copy +----------- + +If run with the -u flag, pcp will only copy a file if the source file is +newer than the destination file (mtime) or if the destination files does not +exist. + + Checkpointing ------------- pcp support serveral checkpointing methods. The checkpoint allows a copy, -interrupted for any reason, to be restarted. Files which were in the process of -being copied will be recopied in their entirety. +interrupted for any reason, to be restarted. + +Note that checkpoints will only be taken once the program has entered phase II +(copy) stage. If you specify a dumpfile with -K, a checkpoint will be written every 60 minutes. The checkpoint period can be varied with the -Km option. If the @@ -140,8 +161,8 @@ To resume from checkpoint, start pcp with the -R option. All pcp command line parameters will be taken from the dumpfile; any other command line arguments you give to pcp will be ignored. -Note that you can safely resume a checkpoint with a different number of -processes than the original. +Note that you can safely resume a checkpoint using a different number of +MPI processes than the original. Other Useful Options @@ -150,8 +171,8 @@ Other Useful Options A dead worker timeout can be specified with -d; if workers do not respond within timeout seconds of the job starting, the job will terminate. -If run with -p, pcp will attempt to preserve the ownership, permissions and -timestamps of the copied files. +If run with -p (preserve), pcp will attempt to preserve the ownership, +permissions and timestamps of the copied files. Invocation diff --git a/pcp b/pcp index 0f9864d..71323f2 100755 --- a/pcp +++ b/pcp @@ -1,17 +1,26 @@ #!/usr/bin/env python -""" This program copies a directory tree in parallel. - -Algorithm: - -pcp implements a master-slave pattern. R0 is the master and R1...RX are the -slaves. R0 scan the source directory tree and put files to be copied on a -queue on the master. The master dispatches files in the queue to ranks R1..RX, -which do the copy.""" # Parallel cp program # Copyright (c) Genome Research Ltd 2012 # Author Guy Coates # This program is released under GNU Public License V2 (GPLv2) +""" This program copies a directory tree in parallel. + +Algorithm: + +pcp runs in two phases. Phase I is a parallel walk of the file tree, involving all +MPI ranks in a peer-to-peer algorithm. The walk constructs the list of files to be +copied and creates the destination directory higherarcy. + +In phase II, the actual files are copied. Phase II uses a master-slave algorithm. +R0 is the master and dispatches file copy instructions to the slaves (R1...Rn). +Although slightly less efficient than the peer-to-peer algorithm in phase I, +using master-slave simplifies the checkpoint/restore implementation. + +As stat is a relatively slow operation on lustre, the code jumps through some +hoops to avoid doing stats during Phase I. + +""" #import rpdb2 #rpdb2.start_embedded_debugger("XXXX", fAllowRemote=True,timeout=10) @@ -28,10 +37,11 @@ import ctypes import sqlite3 import pickle import math +import random import signal import gzip -from pcplib import fastwalk +from pcplib import parallelwalk from pcplib import lustreapi from collections import deque from mpi4py import MPI @@ -92,6 +102,7 @@ def createDB(): filedb.text_factory = str filedb.execute("""CREATE TABLE FILECPY( ID INTEGER PRIMARY KEY AUTOINCREMENT, +SORTORDER INTEGER DEFAULT -1, FILENAME TEXT, STATE INTEGER DEFAULT 0, SRCMD5 TEXT, @@ -99,14 +110,7 @@ SIZE INTEGER, CHUNKS INTEGER DEFAULT -1, ATTEMPTS INTEGER DEFAULT 0, LASTRANK INTEGER DEFAULT 0)""") - filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK)""") - filedb.execute("""CREATE INDEX MD5_IDX ON FILECPY(STATE, SIZE, LASTRANK)""") - filedb.execute("""CREATE TABLE DIRLIST ( -ID INTEGER PRIMARY KEY AUTOINCREMENT, -DIRNAME TEXT, -MTIME INTEGER, -ATIME INTEGER)""") - + filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, SORTORDER, LASTRANK)""") # Table to hold program arguments filedb.execute("""CREATE TABLE ARGUMENTS( ID INTEGER PRIMARY KEY AUTOINCREMENT, @@ -210,6 +214,8 @@ machines as possible to prevent local network bottlenecks. parser.add_argument("-ld", help="Do not stripe diretories.", default=False, action="store_true") + parser.add_argument("-u", + help="Copy only when the source file is newer than the destination file, or the destination file is missing.", default=False, action="store_true") parser.add_argument("-R", help=("Restart a copy from a checkpoint file DUMPFILE."), @@ -288,35 +294,43 @@ def sanitycheck(sourcedir, destdir): print "Exiting." Abort() -def scantree(sourcedir, statedb): - """Scans sourcedir recursively and returns a list of filestate objects and - a list of directories.""" +def scantree(sourcedir, destdir, statedb): + """walk the src file tree, create the destination directories and put the + files to be copied into the database.""" + totaldirs = 0 - print "R%i: Scanning list of files to copy..." % (rank) - if not os.path.isdir(sourcedir): - print "R%i: Error: %s not a directory" % (rank, sourcedir) - Abort() - startime = time.time() - statedb.execute("""INSERT INTO DIRLIST (DIRNAME) VALUES(?)""", - (sourcedir,)) - for source, dirs, files in fastwalk.fastwalk(sourcedir): - dirlist = [(os.path.join(source, d),) for d in dirs ] - statedb.executemany("""INSERT INTO DIRLIST (DIRNAME) VALUES (?)""", - dirlist) - fullpath = [ (os.path.join(source, f),) for f in files ] - statedb.executemany("""INSERT INTO FILECPY (FILENAME) VALUES (?)""", - fullpath) + if rank == 0: + startime = time.time() + if not os.path.isdir(sourcedir): + print "R%i: Error: %s not a directory" % (rank, sourcedir) + Abort() - endtime = time.time() - walltime = endtime - startime - totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] - totaldirs = statedb.execute("SELECT COUNT(*) FROM DIRLIST").fetchone()[0] - rate = (totalfiles + totaldirs) / walltime - walltime = time.strftime("%H hrs %M mins %S secs", - time.gmtime(walltime)) - print ("R%i: Scan Done. Did %i files, %i dirs in %s" - " (%.0f items/sec)." - % (rank, totalfiles, totaldirs, walltime, rate)) + # results are ([directories][files]) + walker = copydirtree(comm, results=([],[])) + listofpaths = walker.Execute(sourcedir) + + if rank == 0: + for l in listofpaths: + for f in l[1]: + statedb.execute("""INSERT INTO FILECPY (FILENAME) VALUES (?)""", + (f,)) + + totaldirs += len(l[0]) + + endtime = time.time() + walltime = endtime - startime + totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] + + rate = (totalfiles + totaldirs) / walltime + walltime = time.strftime("%H hrs %M mins %S secs", + time.gmtime(walltime)) + print ("Phase I done: Did %i files, %i dirs in %s" + " (%.0f items/sec)." + % (totalfiles, totaldirs, walltime, rate)) + # Shuffle rows. If we don't do this, chunks of files tend to be copied at + # the same time, causing hot OSTs in the case of unstriped files. + statedb.execute("""UPDATE FILECPY SET SORTORDER = ABS(RANDOM() % ?)""", + (totalfiles,)) return() def fadviseSeqNoCache(fileD): @@ -528,31 +542,6 @@ def ConsumeWork(sourcedir, destdir): return(0) -def copyDirectories(sourcedir, destdir, statedb): - """Copy a list of directories in the statedb from sourcedir to destdir""" - global WARNINGS - for idx, d in statedb.execute("SELECT ID, DIRNAME FROM DIRLIST"): - destination = mungePath(sourcedir, destdir, d) - if DRYRUN: - if VERBOSE: - print "R%i: %s mkdir %s" % (rank, timestamp(), destination) - else: - i = 0 - while i < MAXTRIES: - try: - createDir(d, destination, statedb, idx) - break - except IOError, error: - i += 1 - print "R%i %s WARNING: mkdir error on %s attempt %i" \ - % (rank, timestamp(), destdir, i) - WARNINGS += 1 - if i == MAXTRIES: - print "R%i %s ERROR: Max number of retries on %s exceeded" \ - % (rank, timestamp(), sourcedir) - raise - - def checkAlive(rank, workers, timeout): """Quirky farm nodes can cause the MPI runtime to lock up during the task spawn. This routine checks whether nodes can exchange messages. If a node @@ -590,6 +579,7 @@ def DispatchWork(statedb): global CHECKPOINTNOW global COPYREMAINS global MD5REMAINS + global TOTALROWS # Queue containing worker who are ready for work. idleworkers = deque() @@ -598,6 +588,8 @@ def DispatchWork(statedb): if DUMPDB: cptimer = Timer() cptimer.start() + TOTALROWS = statedb.execute \ + ("""SELECT COUNT(*) FROM FILECPY""").fetchone()[0] COPYREMAINS = statedb.execute \ ("""SELECT COUNT(*) FROM FILECPY WHERE STATE == 0""").fetchone()[0] @@ -654,23 +646,20 @@ def DispatchWork(statedb): lastrank = -1 else: lastrank = worker - - task = statedb.execute("""SELECT * FROM FILECPY WHERE STATE == 0 AND - LASTRANK <> ? LIMIT 1""",(lastrank, )).fetchone() + task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 0 AND + LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone() if task: - statedb.execute("""UPDATE FILECPY SET STATE = 1 WHERE ID = ?""",(task[0],)) - msg = ("COPY", (task[1], task[0], task[5])) + statedb.execute("""UPDATE FILECPY SET STATE = 1 WHERE ID = ?""",(task[1],)) + msg = ("COPY", (task[0], task[1], task[2])) comm.send(msg, dest=worker, tag=1) continue if MD5SUM: - # We now know the size, and can improve load balancing between workers by - # selecting the largest files first. - task = statedb.execute("""SELECT * FROM FILECPY WHERE STATE == 2 AND - LASTRANK <> ? ORDER BY SIZE DESC LIMIT 1""",(lastrank, )).fetchone() + task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 2 AND + LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone() if task: - statedb.execute("""UPDATE FILECPY SET STATE = 3 WHERE ID = ?""",(task[0],)) - msg = ("MD5", (task[1], task[0], task[5])) + statedb.execute("""UPDATE FILECPY SET STATE = 3 WHERE ID = ?""",(task[1],)) + msg = ("MD5", (task[0], task[1], task[2])) comm.send(msg, dest=worker, tag=1) continue # There is work, but not for this worker. Send to the back of the queue @@ -752,7 +741,8 @@ def processCopy(statedb, payload): global WARNINGS global COPYREMAINS global MD5REMAINS - + global TOTALROWS + md5sum = payload[0] idx = payload[1] workerrank = payload[2] @@ -860,10 +850,12 @@ def processCopy(statedb, payload): chunks = int(math.ceil(size / float(CHUNKSIZE))) with statedb: for i in range(chunks): - statedb.execute("INSERT INTO FILECPY (FILENAME, CHUNKS) VALUES (?,?)", - (filename, i)) + sortid = random.randint(0, TOTALROWS + chunks) + statedb.execute("INSERT INTO FILECPY (FILENAME, SORTORDER, CHUNKS) VALUES (?,?,?)", + (filename, sortid, i)) statedb.execute("DELETE FROM FILECPY WHERE ID = ?", (idx,)) COPYREMAINS += chunks-1 + TOTALROWS += chunks if MD5SUM: MD5REMAINS += chunks-1 @@ -902,8 +894,7 @@ def ShutdownWorkers(starttime): totalelapsedtime = endtime - starttime print "" - print "Transfer Statisics:" - print "" + print "Copy Statisics:" for r in range(1, workers): filescopied, md5done, bytescopied, byteschksummed, copytime, \ @@ -934,14 +925,12 @@ def ShutdownWorkers(starttime): time.gmtime(totalelapsedtime))) print "Warnings %i" % WARNINGS -def createDir(sourcedir, destdir, statedb, idx): +def copyDir(sourcedir, destdir): """Create destdir, setting stripe attributes to be the - same as sourcedir. Timestamps are save in the statedb for use later.""" + same as sourcedir.""" global WARNINGS # Don't worry is the destination directory already exists - if VERBOSE: - print "R%i: %s mkdir %s" % (rank, timestamp(), destdir), try: os.mkdir(destdir) @@ -952,8 +941,6 @@ def createDir(sourcedir, destdir, statedb, idx): if PRESERVE: try: stat = safestat(sourcedir) - statedb.execute("UPDATE DIRLIST SET ATIME = ? , MTIME = ? WHERE ID =?", - (stat.st_atime, stat.st_mtime, idx)) os.chmod(destdir, stat.st_mode) os.chown(destdir, stat.st_uid, stat.st_gid) # Don't worry if we can't set the permissions/uid to be the same @@ -971,16 +958,10 @@ def createDir(sourcedir, destdir, statedb, idx): if LSTRIPE: layout = lustreapi.getstripe(sourcedir) if (LSTRIPE and layout.isstriped()) or ( FORCESTRIPE and not NODIRSTRIPE ): - if VERBOSE: - print "(striped)" lustreapi.setstripe(destdir, stripecount=-1) else: - if VERBOSE: - print "(non striped)" lustreapi.setstripe(destdir, stripecount=1) - else: - if VERBOSE: - print "" + except IOError, error: if error.errno != 13: raise @@ -989,10 +970,10 @@ def createDir(sourcedir, destdir, statedb, idx): % (rank, destdir) -def fixupDirTimeStamp(sourcedir, destdir, statedb): - for dirname, atime, mtime in statedb.execute("""SELECT DIRNAME, ATIME, MTIME FROM DIRLIST"""): - destination = mungePath(sourcedir, destdir, dirname) - os.utime(destination, (atime, mtime)) +def fixupDirTimeStamp(sourcedir): + walker = fixtimestamp(comm) + walker.Execute(sourcedir) + def mungePath(src, dst, f): """Convert the sourcepath to the desinationpath""" @@ -1042,9 +1023,12 @@ def copyFile (src, dst, chunk): if LSTRIPE or FORCESTRIPE: stripestatus = createstripefile(src, dst, size) # Create a spare file to fill in later. - outfile = open(dst, "wb") - outfile.truncate(size) - outfile.close() + + if not DRYRUN: + outfile = open(dst, "wb") + outfile.truncate(size) + outfile.close() + return(size, 0, 0, stripestatus, 6) else: @@ -1169,6 +1153,42 @@ def distribArgs(args): args = comm.bcast(args, root=0) return(args) +class copydirtree(parallelwalk.ParallelWalk): + """Walk the source directory tree in parallel, creating the destination tree + as we go. Return the list of files we encountered.""" + def ProcessFile(self, filename): + if UPDATE: + # check to see if the destination exists + destination = mungePath(sourcedir, destdir, filename) + try: + dststat = safestat(destination) + # We can't access the file at the destination, so copy it. + except OSError, error: + self.results[1].append(filename) + return() + srcstat = safestat(filename) + # source is newer, so copy the file + if srcstat.st_mtime > dststat.st_mtime: + self.results[1].append(filename) + else: + self.results[1].append(filename) + return() + + def ProcessDir(self, directoryname): + newdir = mungePath(sourcedir, destdir, directoryname) + self.results[0].append(newdir) + if not DRYRUN: + copyDir(directoryname, newdir) + + +class fixtimestamp(parallelwalk.ParallelWalk): + """Walk the source directory tree and copy the timestamps to the + destination tree.""" + def ProcessDir(self, directoryname): + stat = safestat(directoryname) + newdir = mungePath(sourcedir, destdir, directoryname) + os.utime(newdir, (stat.st_atime, stat.st_mtime)) + class MPIargparse(argparse.ArgumentParser): """Subclass argparse so we can add a call to Abort, to tidy up MPI bits and pieces.""" def error(self,message): @@ -1192,18 +1212,15 @@ workers = comm.size hostname = os.uname()[1] INFINITY = float("inf") STARTEDCOPY = False # flag to see whether we can start checkpointing. +resumed = False +# Signal handler to checkpoint on SIGUSR1 signal.signal(signal.SIGUSR1, handler) + try: + args = parseargs() + # Basic sanity checks for MPI. if rank == 0: - - args = parseargs() - timeout = args.d # dead worker timeout - # If we are a restored job, read in our arguments - # from the restoredb instead. - - # Older openMPIs are buggy checkVersion() - if workers < 2: print ("ERROR: Only %i processes running. Did you invoke me via" " mpirun?") % workers @@ -1211,6 +1228,13 @@ try: " correctly.") exit(0) + # Check that we are actually alive + timeout = args.d + checkAlive(rank, workers, timeout) + + # Check to see if we are resumed from checkpoint, and restore the runtime + # options to all ranks if we are. + if rank == 0: if args.R: statedb, args = restoreDB(args.R) resumed = True @@ -1220,13 +1244,9 @@ try: pargs = pickle.dumps(args) statedb.execute("""INSERT OR REPLACE INTO ARGUMENTS (ARGS, ID) VALUES(?,1)""", (pargs,)) - else: - timeout = 0 - args = 0 - - # Check the workers are alive and send them the runtime arguments. - checkAlive(rank, workers, timeout) - args = distribArgs(args) + args, resumed = distribArgs((args, resumed)) + if rank > 0: + statedb = None MD5SUM = args.c # checksum copy DRYRUN = args.dry_run # Dry run @@ -1241,12 +1261,13 @@ try: DUMPDB = args.K # Checkpoint to this directory. DUMPINTERVAL = args.Km * 60 # Checkpoint period CHECKPOINTNOW = False + TOTALROWS = 0 # The total number of files and chunks to be copied. COPYREMAINS = 0 # remaining number of items to copy. MD5REMAINS = 0 # remaining number of items to md5. sourcedir = args.SOURCE.rstrip(os.path.sep) # source destdir = args.DEST.rstrip(os.path.sep) # destination glob = args.g # only copy files matching glob - + UPDATE = args.u # Are we doing an update copy? CHUNKSIZE = 1024 * 1024 * args.b # Set the final state of process @@ -1265,8 +1286,12 @@ try: print "SOURCE %s" %sourcedir print "DESTINATION %s" %destdir + if UPDATE: + print "Will only copy files if source is newer than destination" + print " or destination does not exist." + if DUMPDB: - print "Will checkpoint every %i minutes to %s" %(DUMPINTERVAL, DUMPDB) + print "Will checkpoint every %i minutes to %s" %(args.Km, DUMPDB) if LSTRIPE: print "Will copy lustre stripe information." @@ -1288,8 +1313,15 @@ try: sanitycheck(sourcedir, destdir) starttime = time.time() + # All ranks take part in the scan + if not resumed: + if rank == 0: + print "" + print "Starting phase I: Scanning and copying directory structure..." + scantree(sourcedir, destdir, statedb) + + if rank == 0: if not resumed: - scantree(sourcedir, statedb) if glob: totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] results = statedb.execute("DELETE FROM FILECPY WHERE NOT FILENAME GLOB ?", @@ -1298,24 +1330,36 @@ try: print "Will only copy files matching %s (%i of %i)" \ % (glob, matchingfiles, totalfiles) - print "R0: Copying directories..." - copyDirectories(sourcedir, destdir, statedb) - STARTEDCOPY = True - print "Copying files..." + print "" + if resumed: + print "Resuming phase II: Copying files..." + else: + print "Starting phase II: Copying files..." + DispatchWork(statedb) - if PRESERVE: - print "R0: %s Setting directory timestamps..." %timestamp() - fixupDirTimeStamp(sourcedir, destdir, statedb) - print "RO: %s Done." %timestamp() + print "Phase II done." + + STARTEDCOPY = False ShutdownWorkers(starttime) - print "Master process done." + + if PRESERVE: + print + print "Starting phase III: Setting directory timestamps..." + starttime = time.time() + fixupDirTimeStamp(sourcedir) + endtime = time.time() + walltime = time.strftime("%H hrs %M mins %S secs", + time.gmtime(endtime-starttime)) + print "Phase III Done. %s" %walltime exit(0) else: # file copy workers ConsumeWork(sourcedir, destdir) - exit(0) + if PRESERVE: + fixupDirTimeStamp(sourcedir) + exit(0) # We need to call MPI ABORT in our exception handler, # otherwise the other MPI processes spin forever.