Skip to content

Commit

Permalink
Correctly preserve timestamps on directories by setting them after the
Browse files Browse the repository at this point in the history
file copies have been done.
  • Loading branch information
Guy Coates committed Jun 27, 2014
1 parent 2e47b27 commit 3e344c6
Showing 1 changed file with 53 additions and 48 deletions.
101 changes: 53 additions & 48 deletions pcp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ ATTEMPTS INTEGER DEFAULT 0,
LASTRANK INTEGER DEFAULT 0)""")
filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, LASTRANK)""")
filedb.execute("""CREATE INDEX MD5_IDX ON FILECPY(STATE, SIZE, LASTRANK)""")
filedb.execute("""CREATE TABLE DIRLIST (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
DIRNAME TEXT,
MTIME REAL,
ATIME REAL)""")

# Table to hold program arguments
filedb.execute("""CREATE TABLE ARGUMENTS(
ID INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down Expand Up @@ -295,44 +301,38 @@ def sanitycheck(sourcedir, destdir):
" %s is not a lustre directory.") % sourcedir
print "Exiting."
Abort()
try:
createDir(sourcedir, destdir)
except IOError, error:
if error.errno == 25:
print ("ERROR: %s is not a lustre directory but lustre stripe"
" options are set.") % destdir
print "Exiting."
Abort()



def scantree(sourcedir, statedb):
"""Scans sourcedir recursively and returns a list of filestate objects and
a list of directories."""
dirlist = []

print "R%i: Scanning list of files to copy..." % (rank)
if not os.path.isdir(sourcedir):
print "R%i: Error: %s not a directory" % (rank, sourcedir)
Abort()
startime = time.time()
statedb.execute("""INSERT INTO DIRLIST (DIRNAME) VALUES(?)""",
(sourcedir,))
for source, dirs, files in fastwalk(sourcedir):
dirlist.extend([(os.path.join(source, d)) for d in dirs ])
fullpath = [ (os.path.join(source,f),) for f in files ]
dirlist = [(os.path.join(source, d),) for d in dirs ]
statedb.executemany("""INSERT INTO DIRLIST (DIRNAME) VALUES (?)""",
dirlist)
fullpath = [ (os.path.join(source, f),) for f in files ]
statedb.executemany("""INSERT INTO FILECPY (FILENAME) VALUES (?)""",
fullpath)

endtime = time.time()
walltime = endtime - startime
totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0]
totaldirs = len(dirlist)
totaldirs = statedb.execute("SELECT COUNT(*) FROM DIRLIST").fetchone()[0]
rate = (totalfiles + totaldirs) / walltime
walltime = time.strftime("%H hrs %M mins %S secs",
time.gmtime(walltime))
print ("R%i: Scan Done. Did %i files, %i dirs in %s"
" (%.0f items/sec)."
% (rank, totalfiles, totaldirs, walltime, rate))
return(dirlist)
return()

def fastwalk (sourcedir):
"""Improved version of os.walk: generates a tuple of (sourcedir,[dirs],
Expand Down Expand Up @@ -510,11 +510,29 @@ def ConsumeWork(sourcedir, destdir):
copytimer.read(), md5timer.read()), root=0)
return(0)

def copyDirectories(sourcedir, destdir, dirlist):
"""Copy a list of directories (dirlist) from sourcedir to destdir"""
for d in dirlist:
def copyDirectories(sourcedir, destdir, statedb):
"""Copy a list of directories in the statedb from sourcedir to destdir"""
global WARNINGS
for idx, d in statedb.execute("SELECT ID, DIRNAME FROM DIRLIST"):
destination = mungePath(sourcedir, destdir, d)
attemptCreateDir(d, destination)
if DRYRUN:
if VERBOSE:
print "R%i: %s mkdir %s" % (rank, timestamp(), destination)
else:
i = 0
while i < MAXTRIES:
try:
createDir(sourcedir, destination, statedb, idx)
break
except IOError, error:
i += 1
print "R%i %s WARNING: mkdir error on %s attempt %i" \
% (rank, timestamp(), destdir, i)
WARNINGS += 1
if i == MAXTRIES:
print "R%i %s ERROR: Max number of retries on %s exceeded" \
% (rank, timestamp(), sourcedir)
raise


def checkAlive(rank, workers, timeout):
Expand Down Expand Up @@ -832,9 +850,9 @@ def ShutdownWorkers(starttime):
time.gmtime(totalelapsedtime)))
print "Warnings %i" % WARNINGS

def createDir(sourcedir, destdir):
"""Create destdir, setting permissions and stripe attributes to be the
same as sourcedir"""
def createDir(sourcedir, destdir, statedb, idx):
"""Create destdir, setting stripe attributes to be the
same as sourcedir. Timestamps are save in the statedb for use later."""
global WARNINGS

# Don't worry is the destination directory already exists
Expand All @@ -850,9 +868,10 @@ def createDir(sourcedir, destdir):
if PRESERVE:
try:
stat = safestat(sourcedir)
statedb.execute("UPDATE DIRLIST SET ATIME = ? , MTIME = ? WHERE ID =?",
(stat.st_atime, stat.st_mtime, idx))
os.chmod(destdir, stat.st_mode)
os.chown(destdir, stat.st_uid, stat.st_gid)
os.utime(destdir, (stat.st_atime, stat.st_mtime))
# Don't worry if we can't set the permissions/uid to be the same
# as the previous side; we might be copying someone else's data.
except OSError, error:
Expand Down Expand Up @@ -885,29 +904,11 @@ def createDir(sourcedir, destdir):
print "R%i WARNING: Unable to set striping on %s" \
% (rank, destdir)

def attemptCreateDir(sourcedir, destdir):
"""Attempt to copy a directory. Retry the copy MAXTRIES in case of
IO error."""
global WARNINGS
if DRYRUN:
if VERBOSE:
print "R%i: %s mkdir %s" % (rank, timestamp(), destdir)
else:
i = 0
while i < MAXTRIES:
try:
createDir(sourcedir, destdir)
break
except IOError, error:
i += 1
print "R%i %s WARNING: mkdir error on %s attempt %i" \
% (rank, timestamp(), destdir, i)
WARNINGS += 1
if i == MAXTRIES:
print "R%i %s ERROR: Max number of retries on %s exceeded" \
% (rank, timestamp(), sourcedir)
raise
return

def fixupDirTimeStamp(sourcedir, destdir, statedb):
for dirname, atime, mtime in statedb.execute("""SELECT DIRNAME, ATIME, MTIME FROM DIRLIST"""):
destination = mungePath(sourcedir, destdir, dirname)
os.utime(destination, (atime, mtime))

def mungePath(src, dst, f):
"""Convert the sourcepath to the desinationpath"""
Expand Down Expand Up @@ -1198,7 +1199,7 @@ try:
starttime = time.time()

if not resumed:
dirlist = scantree(sourcedir, statedb)
scantree(sourcedir, statedb)
if glob:
totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0]
results = statedb.execute("DELETE FROM FILECPY WHERE NOT FILENAME GLOB ?",
Expand All @@ -1207,12 +1208,16 @@ try:

print "Will only copy files matching %s (%i of %i)" \
% (glob, matchingfiles, totalfiles)
print "Copying directories..."
copyDirectories(sourcedir, destdir, dirlist)
print "R0: Copying directories..."
copyDirectories(sourcedir, destdir, statedb)

STARTEDCOPY = True
print "Copying files..."
DispatchWork(statedb)
if PRESERVE:
print "R0: %s Setting directory timestamps..." %timestamp()
fixupDirTimeStamp(sourcedir, destdir, statedb)
print "RO: %s Done." %timestamp()
ShutdownWorkers(starttime)
print "Master process done."
exit(0)
Expand Down

0 comments on commit 3e344c6

Please sign in to comment.