-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
…ttempt to delete the same file - causing one of the threads to raise an unhandled exception. Fixed a bug where an exception received while writing a modified file to vospace would cause a thread to be lost in the write thread pool.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -250,7 +250,10 @@ def open(self, path, isNew, mustExist, ioObject, trustMetaData): | |
fileHandle.gotHeader = True | ||
fileHandle.fileSize = 0 | ||
|
||
self.checkCacheSpace() | ||
try: | ||
self.checkCacheSpace() | ||
except: | ||
pass | ||
|
||
return fileHandle | ||
|
||
|
@@ -288,6 +291,7 @@ def getFileHandle(self, path, createFile, ioObject): | |
newFileHandle.refCount += 1 | ||
return newFileHandle | ||
|
||
@logExceptions() | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
NormanHill
Author
Contributor
|
||
def checkCacheSpace(self): | ||
"""Clear the oldest files until cache_size < cache_limit""" | ||
|
||
|
@@ -350,10 +354,14 @@ def determineCacheSize(self): | |
with self.cacheLock: | ||
inFileHandleDict = (fp[len(self.dataDir):] not in | ||
self.fileHandleDict) | ||
if (inFileHandleDict and oldest_time > os.stat(fp).st_atime): | ||
oldest_time = os.stat(fp).st_atime | ||
try: | ||
osStat = os.stat(fp) | ||
except: | ||
continue | ||
This comment has been minimized.
Sorry, something went wrong.
ijiraq
Collaborator
|
||
if (inFileHandleDict and oldest_time > osStat.st_atime): | ||
oldest_time = osStat.st_atime | ||
oldest_file = fp | ||
total_size += os.path.getsize(fp) | ||
total_size += osStat.st_size | ||
return (oldest_file, total_size) | ||
|
||
def unlinkFile(self, path): | ||
|
@@ -890,26 +898,27 @@ def getFileInfo(self): | |
info = os.fstat(self.ioObject.cacheFileDescriptor) | ||
return info.st_size, info.st_mtime | ||
|
||
@logExceptions() | ||
def flushNode(self): | ||
"""Flush the file to the backing store. | ||
""" | ||
|
||
global _flush_thread_count | ||
|
||
if self.ioObject.exception is not None: | ||
raise self.ioObject.exception | ||
try: | ||
if self.ioObject.exception is not None: | ||
raise self.ioObject.exception | ||
|
||
_flush_thread_count = _flush_thread_count + 1 | ||
_flush_thread_count = _flush_thread_count + 1 | ||
|
||
vos.logger.debug("flushing node %s, working thread count is %i " \ | ||
% (self.path,_flush_thread_count)) | ||
self.flushException = None | ||
vos.logger.debug("flushing node %s, working thread count is %i " \ | ||
% (self.path,_flush_thread_count)) | ||
self.flushException = None | ||
|
||
# Now that the flush has started we want this thread to own | ||
# the lock | ||
self.writerLock.steal() | ||
# Now that the flush has started we want this thread to own | ||
# the lock | ||
self.writerLock.steal() | ||
|
||
try: | ||
# Get the md5sum of the cached file | ||
size, mtime = self.getFileInfo() | ||
|
||
|
@@ -933,7 +942,10 @@ def flushNode(self): | |
self.flushException = sys.exc_info() | ||
finally: | ||
self.flushQueued = None | ||
self.writerLock.release() | ||
try: | ||
self.writerLock.release() | ||
except: | ||
pass | ||
self.fileModified = False | ||
self.deref() | ||
_flush_thread_count = _flush_thread_count - 1 | ||
|
@@ -943,7 +955,10 @@ def flushNode(self): | |
with self.fileCondition: | ||
self.fileCondition.notify_all() | ||
|
||
self.cache.checkCacheSpace() | ||
try: | ||
self.cache.checkCacheSpace() | ||
except: | ||
pass | ||
|
||
return | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version = "1.10.0-b" | ||
version = "1.10.0-c" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1059,6 +1059,17 @@ def test_02_open2(self): | |
self.assertTrue(fh.gotHeader) | ||
testObject.flushNodeQueue.join() | ||
|
||
# checkCacheSpace throws an exception. The test is that open does | ||
# not throw an exception even though checkCacheSpace does. | ||
testObject.checkCacheSpace = Mock(side_effect=OSError(errno.ENOENT, | ||
"checkCacheSpaceError *EXPECTED*")) | ||
fh = testObject.open("/dir1/dir2/file3", False, False, | ||
ioObject, False) | ||
self.assertTrue(fh.fullyCached) | ||
This comment has been minimized.
Sorry, something went wrong.
ijiraq
Collaborator
|
||
self.assertEqual(fh.fileSize, 0) | ||
self.assertTrue(fh.gotHeader) | ||
testObject.flushNodeQueue.join() | ||
|
||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
def test_03_release1(self): | ||
|
@@ -1241,9 +1252,9 @@ def test_03_flushNode1(self): | |
ioObject = IOProxyForTest() | ||
fh = testObject.open("/dir1/dir2/file", False, False, ioObject, | ||
False) | ||
ioObject.exception = IOError() | ||
with self.assertRaises(IOError): | ||
fh.flushNode() | ||
ioObject.exception = IOError("Test Exception") | ||
fh.flushNode() | ||
self.assertEqual(fh.flushException[1], ioObject.exception) | ||
ioObject.exception = None | ||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
|
@@ -1261,6 +1272,20 @@ def test_03_flushNode2(self): | |
fh.flushNode() | ||
self.assertTrue(fh.flushException[0] is IOError) | ||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
def test_03_flushNode3(self): | ||
"""flush node with an exception raised by checkCacheSpace | ||
The test is that no exception is raised""" | ||
|
||
with CadcCache.Cache(testDir, 100) as testObject: | ||
ioObject = IOProxyForTest() | ||
fh = testObject.open("/dir1/dir2/file", False, False, ioObject, | ||
False) | ||
testObject.checkCacheSpace = Mock(side_effect=OSError(errno.ENOENT, | ||
"checkCacheSpaceError *EXPECTED*")) | ||
fh.flushNode() | ||
self.assertTrue(fh.flushException[0] is IOError) | ||
|
||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
def test_04_read1(self): | ||
|
@@ -1560,7 +1585,7 @@ def notifyAfter1S(self,cond,fh): | |
cond.notify_all() | ||
|
||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
#@unittest.skipIf(skipTests, "Individual tests") | ||
def test_00_determineCacheSize(self): | ||
""" Test checking the cache space """ | ||
if os.path.exists(testDir): | ||
|
@@ -1586,6 +1611,14 @@ def test_00_determineCacheSize(self): | |
cache.fileHandleDict[testVospaceFile2] = None | ||
#get the total size (5M) and but no files not in use | ||
self.assertEquals((None, 5*1024*1024), cache.determineCacheSize()) | ||
|
||
# os.stat returns errors. | ||
with patch('os.stat') as mockedStat: | ||
mockedStat.side_effect = OSError(-1,-1) | ||
|
||
self.assertEquals((None, 0), cache.determineCacheSize()) | ||
|
||
|
||
|
||
|
||
@unittest.skipIf(skipTests, "Individual tests") | ||
|
3 comments
on commit c63d1e6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NormanHill Code review:
The solution given here appears to be to wrap various parts of the file assessment in 'try/except' blocks and to pass on exception. This avoids the error where one process is trying to determine the size of the cache while another thread is deleting the cache.
What is the exception is legitimate? For example there is a real file error trying to get the size of the file to asses the size of the cache?
See also some in-line comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I accept that we can merge this into the main branch as is. Perhaps re-work the error handling as part of story, later.
@NormanHill I'm not familiar with this decorator? and I see that sometimes its commented out in our released code?