diff --git a/pcp b/pcp index dfa975f..80cba35 100755 --- a/pcp +++ b/pcp @@ -123,6 +123,12 @@ ATTEMPTS INTEGER DEFAULT 0, LASTRANK INTEGER DEFAULT 0)""") filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK)""") filedb.execute("""CREATE INDEX MD5_IDX ON FILECPY(STATE, SIZE, LASTRANK)""") + filedb.execute("""CREATE TABLE DIRLIST ( +ID INTEGER PRIMARY KEY AUTOINCREMENT, +DIRNAME TEXT, +MTIME REAL, +ATIME REAL)""") + # Table to hold program arguments filedb.execute("""CREATE TABLE ARGUMENTS( ID INTEGER PRIMARY KEY AUTOINCREMENT, @@ -295,44 +301,38 @@ def sanitycheck(sourcedir, destdir): " %s is not a lustre directory.") % sourcedir print "Exiting." Abort() - try: - createDir(sourcedir, destdir) - except IOError, error: - if error.errno == 25: - print ("ERROR: %s is not a lustre directory but lustre stripe" - " options are set.") % destdir - print "Exiting." - Abort() - def scantree(sourcedir, statedb): """Scans sourcedir recursively and returns a list of filestate objects and a list of directories.""" - dirlist = [] print "R%i: Scanning list of files to copy..." % (rank) if not os.path.isdir(sourcedir): print "R%i: Error: %s not a directory" % (rank, sourcedir) Abort() startime = time.time() + statedb.execute("""INSERT INTO DIRLIST (DIRNAME) VALUES(?)""", + (sourcedir,)) for source, dirs, files in fastwalk(sourcedir): - dirlist.extend([(os.path.join(source, d)) for d in dirs ]) - fullpath = [ (os.path.join(source,f),) for f in files ] + dirlist = [(os.path.join(source, d),) for d in dirs ] + statedb.executemany("""INSERT INTO DIRLIST (DIRNAME) VALUES (?)""", + dirlist) + fullpath = [ (os.path.join(source, f),) for f in files ] statedb.executemany("""INSERT INTO FILECPY (FILENAME) VALUES (?)""", fullpath) endtime = time.time() walltime = endtime - startime totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] - totaldirs = len(dirlist) + totaldirs = statedb.execute("SELECT COUNT(*) FROM DIRLIST").fetchone()[0] rate = (totalfiles + totaldirs) / walltime walltime = time.strftime("%H hrs %M mins %S secs", time.gmtime(walltime)) print ("R%i: Scan Done. Did %i files, %i dirs in %s" " (%.0f items/sec)." % (rank, totalfiles, totaldirs, walltime, rate)) - return(dirlist) + return() def fastwalk (sourcedir): """Improved version of os.walk: generates a tuple of (sourcedir,[dirs], @@ -510,11 +510,29 @@ def ConsumeWork(sourcedir, destdir): copytimer.read(), md5timer.read()), root=0) return(0) -def copyDirectories(sourcedir, destdir, dirlist): - """Copy a list of directories (dirlist) from sourcedir to destdir""" - for d in dirlist: +def copyDirectories(sourcedir, destdir, statedb): + """Copy a list of directories in the statedb from sourcedir to destdir""" + global WARNINGS + for idx, d in statedb.execute("SELECT ID, DIRNAME FROM DIRLIST"): destination = mungePath(sourcedir, destdir, d) - attemptCreateDir(d, destination) + if DRYRUN: + if VERBOSE: + print "R%i: %s mkdir %s" % (rank, timestamp(), destination) + else: + i = 0 + while i < MAXTRIES: + try: + createDir(sourcedir, destination, statedb, idx) + break + except IOError, error: + i += 1 + print "R%i %s WARNING: mkdir error on %s attempt %i" \ + % (rank, timestamp(), destdir, i) + WARNINGS += 1 + if i == MAXTRIES: + print "R%i %s ERROR: Max number of retries on %s exceeded" \ + % (rank, timestamp(), sourcedir) + raise def checkAlive(rank, workers, timeout): @@ -832,9 +850,9 @@ def ShutdownWorkers(starttime): time.gmtime(totalelapsedtime))) print "Warnings %i" % WARNINGS -def createDir(sourcedir, destdir): - """Create destdir, setting permissions and stripe attributes to be the - same as sourcedir""" +def createDir(sourcedir, destdir, statedb, idx): + """Create destdir, setting stripe attributes to be the + same as sourcedir. Timestamps are save in the statedb for use later.""" global WARNINGS # Don't worry is the destination directory already exists @@ -850,9 +868,10 @@ def createDir(sourcedir, destdir): if PRESERVE: try: stat = safestat(sourcedir) + statedb.execute("UPDATE DIRLIST SET ATIME = ? , MTIME = ? WHERE ID =?", + (stat.st_atime, stat.st_mtime, idx)) os.chmod(destdir, stat.st_mode) os.chown(destdir, stat.st_uid, stat.st_gid) - os.utime(destdir, (stat.st_atime, stat.st_mtime)) # Don't worry if we can't set the permissions/uid to be the same # as the previous side; we might be copying someone else's data. except OSError, error: @@ -885,29 +904,11 @@ def createDir(sourcedir, destdir): print "R%i WARNING: Unable to set striping on %s" \ % (rank, destdir) -def attemptCreateDir(sourcedir, destdir): - """Attempt to copy a directory. Retry the copy MAXTRIES in case of - IO error.""" - global WARNINGS - if DRYRUN: - if VERBOSE: - print "R%i: %s mkdir %s" % (rank, timestamp(), destdir) - else: - i = 0 - while i < MAXTRIES: - try: - createDir(sourcedir, destdir) - break - except IOError, error: - i += 1 - print "R%i %s WARNING: mkdir error on %s attempt %i" \ - % (rank, timestamp(), destdir, i) - WARNINGS += 1 - if i == MAXTRIES: - print "R%i %s ERROR: Max number of retries on %s exceeded" \ - % (rank, timestamp(), sourcedir) - raise - return + +def fixupDirTimeStamp(sourcedir, destdir, statedb): + for dirname, atime, mtime in statedb.execute("""SELECT DIRNAME, ATIME, MTIME FROM DIRLIST"""): + destination = mungePath(sourcedir, destdir, dirname) + os.utime(destination, (atime, mtime)) def mungePath(src, dst, f): """Convert the sourcepath to the desinationpath""" @@ -1198,7 +1199,7 @@ try: starttime = time.time() if not resumed: - dirlist = scantree(sourcedir, statedb) + scantree(sourcedir, statedb) if glob: totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0] results = statedb.execute("DELETE FROM FILECPY WHERE NOT FILENAME GLOB ?", @@ -1207,12 +1208,16 @@ try: print "Will only copy files matching %s (%i of %i)" \ % (glob, matchingfiles, totalfiles) - print "Copying directories..." - copyDirectories(sourcedir, destdir, dirlist) + print "R0: Copying directories..." + copyDirectories(sourcedir, destdir, statedb) STARTEDCOPY = True print "Copying files..." DispatchWork(statedb) + if PRESERVE: + print "R0: %s Setting directory timestamps..." %timestamp() + fixupDirTimeStamp(sourcedir, destdir, statedb) + print "RO: %s Done." %timestamp() ShutdownWorkers(starttime) print "Master process done." exit(0)