From f0e06381c8f0319f610a3a5ed77a92aa0bee7d99 Mon Sep 17 00:00:00 2001 From: Norman Hill Date: Thu, 12 Jun 2014 11:38:34 -0700 Subject: [PATCH] Fixed some node caching bugs. Removed the second layer of Node caching from VOFS. --- runtest | 20 +- scripts/vlock | 2 +- test/scripts/vospace-lock-atest.tcsh | 12 +- test/scripts/vospace-mountvospace-atest.tcsh | 15 +- vos/NodeCache.py | 124 +++++ vos/__version__.py | 2 +- vos/test/TestAll.py | 26 + vos/test/TestCacheMetaData.py | 8 +- vos/test/TestCadcCache.py | 41 +- vos/test/TestNodeCache.py | 167 +++++++ vos/test/Test_vofile.py | 8 +- vos/test/Test_vofs.py | 81 ++-- vos/test/Test_vos.py | 64 +++ vos/vofs.py | 48 +- vos/vos.py | 486 +++++++++++-------- 15 files changed, 795 insertions(+), 309 deletions(-) create mode 100644 vos/NodeCache.py create mode 100644 vos/test/TestAll.py create mode 100644 vos/test/TestNodeCache.py create mode 100644 vos/test/Test_vos.py diff --git a/runtest b/runtest index 89d11b388..8ee1f04f7 100755 --- a/runtest +++ b/runtest @@ -16,7 +16,22 @@ else endif python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ - --ignore-module=test_cadcCache vos/test/TestCadcCache.py vos/test/Test_vofs.py + --ignore-module=BitVector --ignore-module=fuse --ignore-module=html2text \ + --ignore-module=TestAll --ignore-module=Test_vos \ + --ignore-module=TestNodeCache --ignore-module=TestCadcCache \ + --ignore-module=Test_vofs --ignore-module=Test_vofile \ + --ignore-module=TestCacheMetaData \ + vos/test/TestAll.py + +#python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ + #--ignore-module=Test_vofs vos/test/TestCadcCache.py vos/test/Test_vofs.py + +#python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ +# --ignore-module=Test_vos vos/test/Test_vos.py + +#python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ + #--ignore-module=Test_vos vos/test/Test_vos.py vos/test/Test_vos.py + #python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ #--ignore-module=Test_vofs vos/test/Test_vofs.py @@ -25,3 +40,6 @@ python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ #python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ # --ignore-module=TestCacheMetaData vos/test/TestCacheMetaData.py + +#python2.7 -m trace --count -s -m -C cover --ignore-dir=/usr/lib64:/usr/lib \ + #--ignore-module=TestNodeCache vos/test/TestNodeCache.py diff --git a/scripts/vlock b/scripts/vlock index 2f58a5b39..c0fcc0252 100755 --- a/scripts/vlock +++ b/scripts/vlock @@ -6,11 +6,11 @@ A node is locked by setting the islocked property. When a node is locked, it cannot be copied to, moved or deleted. """ -from vos import Node import logging import optparse import sys import vos +from vos.vos import Node import os import signal diff --git a/test/scripts/vospace-lock-atest.tcsh b/test/scripts/vospace-lock-atest.tcsh index 6cb1decab..de25eb0e6 100755 --- a/test/scripts/vospace-lock-atest.tcsh +++ b/test/scripts/vospace-lock-atest.tcsh @@ -28,12 +28,12 @@ foreach pythonVersion ($CADC_PYTHON_TEST_TARGETS) set RMCMD = "$pythonVersion $CADC_ROOT/scripts/vrm" set CPCMD = "$pythonVersion $CADC_ROOT/scripts/vcp" - set MVCMD = "python $CADC_ROOT/scripts/vmv" - set RMDIRCMD = "python $CADC_ROOT/scripts/vrmdir" - set CHMODCMD = "python $CADC_ROOT/scripts/vchmod" - set TAGCMD = "python $CADC_ROOT/scripts/vtag" - set LNCMD = "python $CADC_ROOT/scripts/vln" - set LOCKCMD = "python $CADC_ROOT/scripts/vlock" + set MVCMD = "$pythonVersion $CADC_ROOT/scripts/vmv" + set RMDIRCMD = "$pythonVersion $CADC_ROOT/scripts/vrmdir" + set CHMODCMD = "$pythonVersion $CADC_ROOT/scripts/vchmod" + set TAGCMD = "$pythonVersion $CADC_ROOT/scripts/vtag" + set LNCMD = "$pythonVersion $CADC_ROOT/scripts/vln" + set LOCKCMD = "$pythonVersion $CADC_ROOT/scripts/vlock" set CERT = " --cert=$A/test-certificates/x509_CADCRegtest1.pem" diff --git a/test/scripts/vospace-mountvospace-atest.tcsh b/test/scripts/vospace-mountvospace-atest.tcsh index eb45b87cc..c75ce8238 100755 --- a/test/scripts/vospace-mountvospace-atest.tcsh +++ b/test/scripts/vospace-mountvospace-atest.tcsh @@ -43,7 +43,7 @@ foreach pythonVersion ($CADC_PYTHON_TEST_TARGETS) ## we cannot feasibly test the --xsv option, but it is here to fiddle with in development set LSCMD = "$pythonVersion $CADC_ROOT/scripts/vls" - set MOUNTCMD = "$pythonVersion $CADC_ROOT/scripts/mountvofs --cache_limit=$CACHETEST_LIMIT" + set MOUNTCMD = "$pythonVersion $CADC_ROOT/scripts/mountvofs --cache_limit=$CACHETEST_LIMIT --cache_nodes" set MKDIRCMD = "$pythonVersion $CADC_ROOT/scripts/vmkdir" set RMCMD = "$pythonVersion $CADC_ROOT/scripts/vrm" set CPCMD = "$pythonVersion $CADC_ROOT/scripts/vcp" @@ -143,7 +143,7 @@ foreach pythonVersion ($CADC_PYTHON_TEST_TARGETS) # --- test exceeding the local cache --- echo -n "copy cache test data to container" rm foo.dat >& /dev/null - truncate -s $CACHETEST_FSIZE foo.dat >& /dev/null + cat /dev/zero | head -c $CACHETEST_FSIZE_BYTES /dev/zero > foo.dat foreach i ( `seq $CACHETEST_NFILES` ) echo -n "." $CPCMD $CERTFILE foo.dat $CONTAINER/foo$i.dat >& /dev/null || echo " [FAIL]" && exit -1 @@ -176,14 +176,15 @@ foreach pythonVersion ($CADC_PYTHON_TEST_TARGETS) echo -n "delete non-empty container " - rm $MCONTAINER >& /dev/null && echo " [FAIL]" && exit -1 - ls $MCONTAINER/foo >& /dev/null || echo " [FAIL]" && exit -1 - ls $MCONTAINER >& /dev/null || echo " [FAIL]" && exit -1 + ls $MCONTAINER + rm -rf $MCONTAINER || echo " [FAIL]" && exit -1 + ls $MCONTAINER/foo >& /dev/null && echo " [FAIL]" && exit -1 + ls $MCONTAINER >& /dev/null && echo " [FAIL]" && exit -1 + $LSCMD $CERT $CONTAINER >& /dev/null && echo " [FAIL]" && exit -1 echo " [OK]" echo -n "delete empty container " - rmdir $MCONTAINER/* >& /dev/null || echo " [FAIL]" && exit -1 - #rm $MCONTAINER/* >& /dev/null || echo " [FAIL]" && exit -1 #TODO activate when cp succeeds + mkdir $MCONTAINER rmdir $MCONTAINER > /dev/null || echo " [FAIL]" && exit -1 $LSCMD $CERT $CONTAINER >& /dev/null && echo " [FAIL]" && exit -1 echo " [OK]" diff --git a/vos/NodeCache.py b/vos/NodeCache.py new file mode 100644 index 000000000..a88c94eb0 --- /dev/null +++ b/vos/NodeCache.py @@ -0,0 +1,124 @@ +# A node cache class, extended from dict. + +import threading +import vos + + +class NodeCache(dict): + """ usage: + # Create a node cache: + nodeCache = NodeCache() + + with nodeCache.volatile(nodeURI): + # Do things which make the nodes cached under nodeURI unreliable. + # The cache will be cleared on entry + + with nodeCache.watch(nodeURI) as watch: + # Do things which shouldn't be cached when the node is + # volatile. + watch.insert(node) + # The node will not be cached if the tree became volatile + # at any point while the nodeURI was being watched. + """ + + def __init__(self, *args): + """ Initialize the node cache.""" + dict.__init__(self, args) + self.lock = threading.Lock() + self.watchedNodes = [] + self.volatileNodes = [] + + def watch(self, uri): + """Factory for watch objects""" + return self.Watch(self, uri.rstrip('/')) + + def volatile(self, uri): + """Factory for volatile objects.""" + return self.Volatile(self, uri.rstrip('/')) + + def __missing__(self, key): + """Attempting to access a non-cached node returns None rather than + raising an exception.""" + return None + + def __setitem__(self, key, object): + """If an node is directly inserted into the cache, automatically create + a watch.""" + with self.watch(key) as w: + w.insert(object) + + def __getitem__(self, key): + return dict.__getitem__(self, key.rstrip('/')) + + def __contains__(self, key): + return dict.__contains__(self, key.rstrip('/')) + + class Volatile(object): + """ Objects that mark a code segment where a uri is volatile and + shouldn't be used from the cache.""" + + def __init__(self, nodeCache, uri): + self.nodeCache = nodeCache + self.uri = uri.rstrip('/') + + def __enter__(self): + """ Mark any sub-trees being watched as being dirty + add to self.nodeCache.volatileNodes. + Remove any cached nodes in the volatile subtree. + """ + + with self.nodeCache.lock: + # Add this volatile objecty to a list of all active volatile + # objects. + self.nodeCache.volatileNodes.append(self) + + # Remove any cached nodes in the volatile sub-tree. + for uri in self.nodeCache.keys(): + if uri.startswith(self.uri): + del self.nodeCache[uri] + + # Mark any watched nodes in the volatile sub-tree dirty + for watchedNode in self.nodeCache.watchedNodes: + if watchedNode.uri.startswith(self.uri): + watchedNode.dirty = True + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """ Remove this volitile object from the list of active volatiles. + """ + with self.nodeCache.lock: + self.nodeCache.volatileNodes.remove(self) + + class Watch(object): + """ Objects that mark a code segment where a node has been read from + vospace, and is intended to be cached. + """ + + def __init__(self, nodeCache, uri): + self.nodeCache = nodeCache + self.uri = uri + self.dirty = False + + def __enter__(self): + with self.nodeCache.lock: + # Add this watch object to the list of active watch objects. + self.nodeCache.watchedNodes.append(self) + + # Check to see if this watch object is in an existing volatile + # tree. If it is, mark this watch object as dirty. + for thisVolatile in self.nodeCache.volatileNodes: + if self.uri.startswith(thisVolatile.uri): + self.dirty = True + return self + return self + + def __exit__(self, exc_type, exc_value, traceback): + with self.nodeCache.lock: + self.nodeCache.watchedNodes.remove(self) + + def insert(self, object): + """ Insert an object in the cache, but only if the watch is not + dirty.""" + if not self.dirty: + dict.__setitem__(self.nodeCache, self.uri, object) diff --git a/vos/__version__.py b/vos/__version__.py index 93b8af562..ff0b160d2 100644 --- a/vos/__version__.py +++ b/vos/__version__.py @@ -1 +1 @@ -version = "1.10.0-c" +version = "1.10.0-d" diff --git a/vos/test/TestAll.py b/vos/test/TestAll.py new file mode 100644 index 000000000..148e18a5b --- /dev/null +++ b/vos/test/TestAll.py @@ -0,0 +1,26 @@ +import Test_vos +import TestNodeCache +import TestCadcCache +import Test_vofs +import Test_vofile +import TestCacheMetaData + +if not Test_vos.run().wasSuccessful(): + print "FAIL" + exit() +if not TestNodeCache.run().wasSuccessful(): + print "FAIL" + exit() +if not TestCadcCache.run().wasSuccessful(): + print "FAIL" + exit() +if not Test_vofs.run().wasSuccessful(): + print "FAIL" + exit() +if not Test_vofile.run().wasSuccessful(): + print "FAIL" + exit() +if not TestCacheMetaData.run().wasSuccessful(): + print "FAIL" + exit() +print "SUCCESS" diff --git a/vos/test/TestCacheMetaData.py b/vos/test/TestCacheMetaData.py index 73fdc70c2..a611c00b7 100644 --- a/vos/test/TestCacheMetaData.py +++ b/vos/test/TestCacheMetaData.py @@ -142,9 +142,13 @@ def testRange(self): +def run(): + suite = unittest.TestLoader().loadTestsFromTestCase(TestCacheMetaData) + return unittest.TextTestRunner(verbosity=2).run(suite) + +if __name__=='__main__': + run() -suite = unittest.TestLoader().loadTestsFromTestCase(TestCacheMetaData) -unittest.TextTestRunner(verbosity=2).run(suite) if __name__ == '__main__': diff --git a/vos/test/TestCadcCache.py b/vos/test/TestCadcCache.py index e712abd95..ed3abbf43 100644 --- a/vos/test/TestCadcCache.py +++ b/vos/test/TestCadcCache.py @@ -1272,7 +1272,7 @@ def test_03_flushNode2(self): fh.flushNode() self.assertTrue(fh.flushException[0] is IOError) - @unittest.skipIf(skipTests, "Individual tests") + #@unittest.skipIf(skipTests, "Individual tests") def test_03_flushNode3(self): """flush node with an exception raised by checkCacheSpace The test is that no exception is raised""" @@ -1281,10 +1281,10 @@ def test_03_flushNode3(self): ioObject = IOProxyForTest() fh = testObject.open("/dir1/dir2/file", False, False, ioObject, False) + fh.writerLock.acquire(shared=False) testObject.checkCacheSpace = Mock(side_effect=OSError(errno.ENOENT, "checkCacheSpaceError *EXPECTED*")) fh.flushNode() - self.assertTrue(fh.flushException[0] is IOError) @unittest.skipIf(skipTests, "Individual tests") @@ -1585,7 +1585,7 @@ def notifyAfter1S(self,cond,fh): cond.notify_all() - #@unittest.skipIf(skipTests, "Individual tests") + @unittest.skipIf(skipTests, "Individual tests") def test_00_determineCacheSize(self): """ Test checking the cache space """ if os.path.exists(testDir): @@ -1687,7 +1687,7 @@ def test_04_removeEmptyDirs(self): with self.assertRaises(OSError): testCache.removeEmptyDirs(testDir + "/dir1/dir2/dir4") - #@unittest.skipIf(skipTests, "Individual tests") + @unittest.skipIf(skipTests, "Individual tests") def test_04_truncate(self): """ Test file truncate""" testIOProxy = IOProxyForTest() @@ -1976,18 +1976,21 @@ def test_isNewReadBest(self): self.assertEquals(mandatoryEnd, crt.mandatoryEnd) -logging.getLogger('CadcCache').setLevel(logging.DEBUG) -logging.getLogger('CadcCache').addHandler(logging.StreamHandler()) - -suite1 = unittest.TestLoader().loadTestsFromTestCase(TestCacheCondtion) -suite2 = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock) -suite3 = unittest.TestLoader().loadTestsFromTestCase(TestCacheError) -suite4 = unittest.TestLoader().loadTestsFromTestCase(TestCacheRetry) -suite5 = unittest.TestLoader().loadTestsFromTestCase(TestCacheAborted) -suite6 = unittest.TestLoader().loadTestsFromTestCase(TestIOProxy) -suite7 = unittest.TestLoader().loadTestsFromTestCase(TestCadcCacheReadThread) -suite8 = unittest.TestLoader().loadTestsFromTestCase(TestCadcCache) -alltests = unittest.TestSuite([suite1, suite2, suite3, suite4, suite5, - suite6, suite7, suite8]) -unittest.TextTestRunner(verbosity=2).run(alltests) - +def run(): + logging.getLogger('CadcCache').setLevel(logging.DEBUG) + logging.getLogger('CadcCache').addHandler(logging.StreamHandler()) + + suite1 = unittest.TestLoader().loadTestsFromTestCase(TestCacheCondtion) + suite2 = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock) + suite3 = unittest.TestLoader().loadTestsFromTestCase(TestCacheError) + suite4 = unittest.TestLoader().loadTestsFromTestCase(TestCacheRetry) + suite5 = unittest.TestLoader().loadTestsFromTestCase(TestCacheAborted) + suite6 = unittest.TestLoader().loadTestsFromTestCase(TestIOProxy) + suite7 = unittest.TestLoader().loadTestsFromTestCase(TestCadcCacheReadThread) + suite8 = unittest.TestLoader().loadTestsFromTestCase(TestCadcCache) + alltests = unittest.TestSuite([suite1, suite2, suite3, suite4, suite5, + suite6, suite7, suite8]) + return(unittest.TextTestRunner(verbosity=2).run(alltests)) + +if __name__ == "__main__": + run() diff --git a/vos/test/TestNodeCache.py b/vos/test/TestNodeCache.py new file mode 100644 index 000000000..572b7b467 --- /dev/null +++ b/vos/test/TestNodeCache.py @@ -0,0 +1,167 @@ +# Test the NodeCache class + +import unittest +from mock import Mock, MagicMock, patch +from vos.NodeCache import NodeCache + +class TestNodeCache(unittest.TestCase): + """Test the NodeCache class. + """ + + def test_00_constructor(self): + """Test basic operation of the NodeCache as a dict.""" + nodeCache = NodeCache() + self.assertEqual(len(nodeCache.watchedNodes), 0) + self.assertEqual(len(nodeCache.volatileNodes), 0) + + nodeCache['a'] = 'b' + nodeCache['b'] = 'c' + self.assertEqual(nodeCache['a'], 'b') + self.assertEqual(len(nodeCache), 2) + self.assertTrue('a' in nodeCache) + del nodeCache['a'] + self.assertFalse('a' in nodeCache) + self.assertEqual(len(nodeCache), 1) + self.assertEqual(nodeCache['a'], None) + + + def test_01_volatile(self): + """ test marking part of the tree as volatile""" + + nodeCache = NodeCache() + + nodeCache['/a/b'] = 'a' + nodeCache['/a/b/c'] = 'b' + nodeCache['/a/b/c/'] = 'c' + nodeCache['/a/b/c/d'] = 'd' + self.assertTrue('/a/b' in nodeCache) + self.assertTrue('/a/b/c' in nodeCache) + self.assertTrue('/a/b/c/' in nodeCache) + self.assertTrue('/a/b/c/d' in nodeCache) + + with nodeCache.volatile('/a/b/c/') as v: + self.assertTrue('/a/b' in nodeCache) + self.assertFalse('/a/b/c' in nodeCache) + self.assertFalse('/a/b/c/' in nodeCache) + self.assertFalse('/a/b/c/d' in nodeCache) + self.assertTrue(v in nodeCache.volatileNodes) + # Nested with the same path + with nodeCache.volatile('/a/b/c') as v2: + self.assertTrue(v in nodeCache.volatileNodes) + self.assertTrue(v2 in nodeCache.volatileNodes) + self.assertTrue(v in nodeCache.volatileNodes) + self.assertFalse(v2 in nodeCache.volatileNodes) + + self.assertFalse(v in nodeCache.volatileNodes) + self.assertEqual(len(nodeCache.volatileNodes), 0) + + with self.assertRaises(IOError): + with nodeCache.volatile('/a/b/c') as v: + self.assertTrue(v in nodeCache.volatileNodes) + raise IOError('atest') + self.assertTrue(False) + self.assertFalse(v in nodeCache.volatileNodes) + self.assertEqual(len(nodeCache.volatileNodes), 0) + + def test_01_watch(self): + """Test creating a watch on a node.""" + + nodeCache = NodeCache() + + with nodeCache.watch('/a/b/c') as w: + self.assertTrue(w in nodeCache.watchedNodes) + self.assertFalse(w.dirty) + with nodeCache.watch('/a/b/c') as w2: + self.assertFalse(w2.dirty) + self.assertTrue(w in nodeCache.watchedNodes) + self.assertTrue(w2 in nodeCache.watchedNodes) + self.assertTrue(w in nodeCache.watchedNodes) + self.assertFalse(w2 in nodeCache.watchedNodes) + self.assertFalse(w in nodeCache.watchedNodes) + self.assertFalse(w2 in nodeCache.watchedNodes) + self.assertEqual(len(nodeCache.watchedNodes), 0) + + with self.assertRaises(IOError): + with nodeCache.watch('/a/b/c') as w: + self.assertTrue(w in nodeCache.watchedNodes) + raise IOError('atest') + self.assertTrue(False) + self.assertFalse(w in nodeCache.watchedNodes) + self.assertEqual(len(nodeCache.watchedNodes), 0) + + with nodeCache.watch('/a/b/c') as w: + w.insert('d') + self.assertEqual( nodeCache['/a/b/c'], 'd') + self.assertEqual(len(nodeCache.watchedNodes), 0) + + + def test_02_watchnvolatile(self): + """test watch and volitile working together.""" + + nodeCache = NodeCache() + + with nodeCache.watch('/a/b/c/') as w: + w.insert('d') + self.assertEqual( nodeCache['/a/b/c'], 'd') + + # Make a sub-tree volatile. This should not effect the watched + # directory. + with nodeCache.volatile('/a/b/c/d'): + self.assertEqual( nodeCache['/a/b/c'], 'd') + + self.assertTrue('/a/b/c' in nodeCache) + with nodeCache.volatile('/a/b/c'): + self.assertFalse('/a/b/c' in nodeCache) + w.insert('d') + self.assertFalse('/a/b/c' in nodeCache) + w.insert('d') + self.assertFalse('/a/b/c' in nodeCache) + + # Set up a watch and then make a parent node volatile. Caching should be + # disabled on the watched tree. + with nodeCache.watch('/a/b/c') as w: + self.assertFalse('/a/b/c' in nodeCache) + w.insert('d') + + self.assertTrue('/a/b/c' in nodeCache) + with nodeCache.volatile('/a/b/'): + pass + self.assertFalse('/a/b/c' in nodeCache) + w.insert('d') + self.assertFalse('/a/b/c' in nodeCache) + + # Watches are gone, it should now be possible to cache nodes again. + with nodeCache.watch('/a/b/c') as w: + self.assertFalse('/a/b/c' in nodeCache) + w.insert('d') + + self.assertTrue('/a/b/c' in nodeCache) + + # Set up a volatile block first and ensure the cache is disabled. + + with nodeCache.volatile('/a/b/c'): + self.assertFalse('/a/b/c' in nodeCache) + with nodeCache.watch('/a/b/c') as w: + w.insert('d') + self.assertFalse('/a/b/c' in nodeCache) + + with nodeCache.watch('/a/b/c/d') as w: + w.insert('d') + self.assertFalse('/a/b/c/d' in nodeCache) + + with nodeCache.watch('/a/e/f/g') as w: + w.insert('d') + self.assertTrue('/a/e/f/g' in nodeCache) + + + +def run(): + suite1 = unittest.TestLoader().loadTestsFromTestCase(TestNodeCache) + allTests = unittest.TestSuite([suite1]) + return unittest.TextTestRunner(verbosity=2).run(allTests) + +if __name__ == "__main__": + run() + + + diff --git a/vos/test/Test_vofile.py b/vos/test/Test_vofile.py index 7b4501df0..221d69be7 100644 --- a/vos/test/Test_vofile.py +++ b/vos/test/Test_vofile.py @@ -222,5 +222,9 @@ def __call__(self, *args, **keywords): str(self.controller) + "***") -suite = unittest2.TestLoader().loadTestsFromTestCase(TestVOFile) -unittest2.TextTestRunner(verbosity=2).run(suite) +def run(): + suite = unittest2.TestLoader().loadTestsFromTestCase(TestVOFile) + return unittest2.TextTestRunner(verbosity=2).run(suite) + +if __name__ == "__main__": + run() diff --git a/vos/test/Test_vofs.py b/vos/test/Test_vofs.py index 9478fb2fc..5650c5e3e 100644 --- a/vos/test/Test_vofs.py +++ b/vos/test/Test_vofs.py @@ -8,8 +8,10 @@ from vos import vofs, vos from mock import Mock, MagicMock, patch from vos.fuse import FuseOSError -from vos.CadcCache import Cache, CacheRetry, CacheAborted, FileHandle, IOProxy +from vos.CadcCache import Cache, CacheRetry, CacheAborted, FileHandle, \ + IOProxy, FlushNodeQueue from vos.vofs import HandleWrapper +from vos.NodeCache import NodeCache from errno import EIO, EAGAIN, EPERM, ENOENT skipTests = False @@ -279,6 +281,10 @@ def test_create(self): , name="testfs.getNode")) with self.assertRaises(FuseOSError) as e: testfs.create(file, os.O_RDWR) + + testfs.getNode = Mock(side_effect=FuseOSError) + with self.assertRaises(FuseOSError) as e: + testfs.create(file, os.O_RDWR) node.props.get = Mock(return_value=False) @@ -322,18 +328,6 @@ def mockRelease(): myVofs.release(file, fh.getId()) self.assertEqual(e.exception.errno, EIO) - # when file modified locally, release should remove it from the list of - # accessed files in VOFS - basefh = Object() - basefh.path = file - basefh.fileModified = True - basefh.release = Mock(return_value = True) - fh = HandleWrapper(basefh, False) - myVofs.node[file] = Mock(name="fileInfo") - self.assertEqual(1, len(myVofs.node)) - myVofs.release(file, fh.getId()) - self.assertEqual(0, len(myVofs.node)) - # Release an invalid file descriptor with self.assertRaises(FuseOSError) as e: myVofs.release(file, -1) @@ -372,14 +366,11 @@ def test_unlink(self): testfs.getNode = Mock(return_value = None) testfs.cache.unlinkFile = Mock() testfs.client.delete = Mock() - testfs.delNode = Mock() - mocks = (testfs.getNode, testfs.cache.unlinkFile, testfs.client.delete, - testfs.delNode) + mocks = (testfs.getNode, testfs.cache.unlinkFile, testfs.client.delete) testfs.unlink(path) testfs.getNode.assert_called_once_with(path, force=False, limit=1) testfs.cache.unlinkFile.assert_called_once_with(path) self.assertFalse(testfs.client.delete.called) - testfs.delNode.assert_called_once_with(path, force=True) for mock in mocks: mock.reset_mock() @@ -392,7 +383,6 @@ def test_unlink(self): testfs.getNode.assert_called_once_with(path, force=False, limit=1) testfs.cache.unlinkFile.assert_called_once_with(path) testfs.client.delete.assert_called_once_with(path) - testfs.delNode.assert_called_once_with(path, force=True) for mock in mocks: mock.reset_mock() @@ -407,13 +397,6 @@ def test_unlink(self): testfs.getNode.assert_called_once_with(path, force=False, limit=1) self.assertFalse(testfs.cache.unlinkFile.called) self.assertFalse(testfs.client.delete.called) - self.assertFalse(testfs.delNode.called) - - @unittest.skipIf(skipTests, "Individual tests") - def test_delNode(self): - testfs = vofs.VOFS(self.testMountPoint, self.testCacheDir, opt) - path = "/a/file/path" - testfs.delNode(path, force=True) @unittest.skipIf(skipTests, "Individual tests") def test_mkdir(self): @@ -457,7 +440,6 @@ def test_mkdir(self): def test_rmdir(self): path="/a/file/path" testfs = vofs.VOFS(self.testMountPoint, self.testCacheDir, opt) - testfs.delNode = Mock(wraps=testfs.delNode) testfs.client = Object() testfs.client.delete = Mock() node = Object() @@ -471,20 +453,17 @@ def test_rmdir(self): testfs.client.getNode = Mock(return_value = node) testfs.rmdir(path) testfs.client.delete.assert_called_once_with(path) - testfs.delNode.assert_called_once_with(path,force=True) # Try deleting a node which is locked. node.props.get = Mock(side_effect=SideEffect({ ('islocked', False): True, }, name="node.props.get") ) testfs.client.delete.reset_mock() - testfs.delNode.reset_mock() with self.assertRaises(FuseOSError) as e: testfs.rmdir(path) self.assertEqual(e.exception.errno, EPERM) self.assertFalse(testfs.client.delete.called) - self.assertFalse(testfs.delNode.called) testfs.client = Object() @@ -562,8 +541,7 @@ def test_access(self): # File exists. testfs = vofs.VOFS(self.testMountPoint, self.testCacheDir, opt) node = Object - node.isdir = Mock(return_value=False) - testfs.node[file] = node + testfs.getNode = Mock(return_value=node) self.assertEqual(testfs.access(file, stat.S_IRUSR), 0) @@ -728,6 +706,8 @@ def mock_read(block_size): vos_VOFILE.close = Mock() vos_VOFILE.read = Mock(side_effect=mock_read) testfs.client.open = Mock(return_value = vos_VOFILE) + testfs.client.nodeCache = Object() + testfs.client.nodeCache = NodeCache() # Truncate a non-open file to 0 bytes testfs.cache.open = Mock(wraps=testfs.cache.open) @@ -844,6 +824,30 @@ def mock_read(block_size): testfs2.truncate(file, 20, -1) self.assertEqual(e.exception.errno, EIO) + def test_getNode(self): + file = "/dir1/dir2/file" + testfs = vofs.VOFS(self.testMountPoint, self.testCacheDir, opt) + node = Mock(spec=vos.Node) + testfs.client = Object() + testfs.client.getNode = Mock(return_value = node) + node = testfs.getNode(file, force=True, limit=10) + testfs.client.getNode.assert_called_once_with(file, force=True, limit=10) + + err = IOError() + err.errno = 1 + testfs.client.getNode = Mock(side_effect=err) + with self.assertRaises(FuseOSError): + node = testfs.getNode(file, force=True, limit=10) + + def test_init(self): + testfs = vofs.VOFS(self.testMountPoint, self.testCacheDir, opt) + testfs.init("/") + self.assertTrue(isinstance(testfs.cache.flushNodeQueue, FlushNodeQueue)) + testfs.destroy("/") + self.assertEqual(testfs.cache.flushNodeQueue, None) + + + class SideEffect(object): """ The controller is a dictionary with a list as a key and a value. When @@ -1074,11 +1078,12 @@ def testAll(self): self.assertTrue( handle is vofs.HandleWrapper. findHandle(handle.getId())) +def run(): + suite1 = unittest.TestLoader().loadTestsFromTestCase(TestVOFS) + suite2 = unittest.TestLoader().loadTestsFromTestCase(TestMyIOProxy) + suite3 = unittest.TestLoader().loadTestsFromTestCase(TestHandleWrapper) + alltests = unittest.TestSuite([suite1, suite2, suite3]) + return unittest.TextTestRunner(verbosity=2).run(alltests) -suite1 = unittest.TestLoader().loadTestsFromTestCase(TestVOFS) -suite2 = unittest.TestLoader().loadTestsFromTestCase(TestMyIOProxy) -suite3 = unittest.TestLoader().loadTestsFromTestCase(TestHandleWrapper) -alltests = unittest.TestSuite([suite1, suite2, suite3]) -unittest.TextTestRunner(verbosity=2).run(alltests) - - +if __name__ == "__main__": + run() diff --git a/vos/test/Test_vos.py b/vos/test/Test_vos.py new file mode 100644 index 000000000..8f09f7015 --- /dev/null +++ b/vos/test/Test_vos.py @@ -0,0 +1,64 @@ +# Test the NodeCache class + +import os +import unittest +from mock import Mock, MagicMock, patch +import vos +from vos.vos import Client + +class Object(object): + pass + +class TestVos(unittest.TestCase): + """Test the NodeCache class. + """ + + def test_getNode(self): + client = Client() + uri = 'vos://cadc.nrc.ca!vospace' + myNode = client.getNode(uri, limit=0, force=False) + self.assertEqual(uri, myNode.uri) + self.assertEqual(len(myNode.getNodeList()), 0) + + myNode = client.getNode(uri, limit=10, force=True) + self.assertEqual(uri, myNode.uri) + self.assertEqual(len(myNode.getNodeList()), 10) + + myNode = client.getNode(uri, limit=10, force=False) + self.assertEqual(uri, myNode.uri) + self.assertEqual(len(myNode.getNodeList()), 10) + + def test_move(self): + client = Client() + uri1 = 'notvos://cadc.nrc.ca!vospace/nosuchfile1' + uri2 = 'notvos://cadc.nrc.ca!vospace/nosuchfile2' + + with patch('vos.vos.VOFile') as mockVOFile: + mockVOFile.write=Mock() + mockVOFile.read=Mock() + client.getTransferError=Mock(return_value=False) + self.assertTrue(client.move(uri1, uri2)) + client.getTransferError=Mock(return_value=True) + self.assertFalse(client.move(uri1, uri2)) + + def test_delete(self): + client = Client() + uri1 = 'notvos://cadc.nrc.ca!vospace/nosuchfile1' + + myObject=Object() + myObject.close = Mock() + client.open = Mock(return_value=myObject) + client.delete(uri1) + client.open.assert_called_once_with(uri1, mode=os.O_TRUNC) + myObject.close.assert_called_once_with() + + + + +def run(): + suite1 = unittest.TestLoader().loadTestsFromTestCase(TestVos) + allTests = unittest.TestSuite([suite1]) + return(unittest.TextTestRunner(verbosity=2).run(allTests)) + +if __name__ == "__main__": + run() diff --git a/vos/vofs.py b/vos/vofs.py index 89e87f15b..0b20b04da 100755 --- a/vos/vofs.py +++ b/vos/vofs.py @@ -183,6 +183,7 @@ def findHandle(id): theHandle = HandleWrapper.handleList[id] return theHandle + @logExceptions() def release(self): with HandleWrapper.myLock: del HandleWrapper.handleList[id(self)] @@ -219,9 +220,6 @@ def __init__(self, root, cache_dir, options, conn=None, self.cache_nodes = cache_nodes - # This dictionary contains the Node data about the VOSpace node in - # question - self.node = {} # Standard attribtutes of the Node # Where in the file system this Node is currently located self.loading_dir = {} @@ -253,14 +251,6 @@ def __init__(self, root, cache_dir, options, conn=None, def __call__(self, op, path, *args): return super(VOFS, self).__call__(op, path, *args) - def __del__(self): - self.node = None - - def delNode(self, path, force=False): - """Delete the references associated with this Node""" - if not self.cache_nodes or force: - self.node.pop(path, None) - #@logExceptions() def access(self, path, mode): """Check if path is accessible. @@ -369,6 +359,7 @@ def destroy(self, path): if self.cache.flushNodeQueue is None: raise CacheError("flushNodeQueue has not been initialized") self.cache.flushNodeQueue.join() + self.cache.flushNodeQueue = None #@logExceptions() def fsync(self, path, datasync, id): @@ -396,22 +387,11 @@ def getNode(self, path, force=False, limit=0): """ vos.logger.debug("force? -> %s path -> %s" % (force, path)) - ### force if this is a container we've not looked in before - if path in self.node and not force: - node = self.node[path] - if (node.isdir() and limit != 0 and - len(node.getNodeList()) == 0): - force = True - if not force: - vos.logger.debug("Sending back cached metadata for %s" % - (path)) - return node ## Pull the node meta data from VOSpace. try: vos.logger.debug("requesting node %s from VOSpace" % (path)) - self.node[path] = self.client.getNode(path, force=True, - limit=limit) + node = self.client.getNode(path, force=force, limit=limit) except Exception as e: vos.logger.debug(str(e)) vos.logger.debug(type(e)) @@ -421,11 +401,7 @@ def getNode(self, path, force=False, limit=0): vos.logger.debug("failing with errno = %d" % ex.errno) raise ex - if self.node[path].isdir() and self.node[path]._nodeList is not None: - for node in self.node[path]._nodeList: - subPath = os.path.join(path, node.name) - self.node[subPath] = node - return self.node[path] + return node #@logExceptions() def getattr(self, path, id=None): @@ -451,7 +427,6 @@ def init(self, path): self.cache.flushNodeQueue = \ FlushNodeQueue(maxFlushThreads=self.cache.maxFlushThreads) - #@logExceptions() def mkdir(self, path, mode): """Create a container node in the VOSpace at the correct location. @@ -561,7 +536,7 @@ def open(self, path, flags, *mode): # new file in cache library or if no node information (node not in # vospace). - handle = self.cache.open(path, flags & os.O_WRONLY != 0, mustExist, + handle = self.cache.open(path, flags & os.O_WRONLY != 0, mustExist, myProxy, self.cache_nodes) if flags & os.O_TRUNC != 0: handle.truncate(0) @@ -640,7 +615,7 @@ def load_dir(self, path): This should always be run in a thread.""" try: vos.logger.debug("Starting getNodeList thread") - self.getNode(path, force=not self.opt.cache_nodes, + self.getNode(path, force=True, limit=None).getNodeList() vos.logger.debug("Got listing for %s" % (path)) finally: @@ -666,12 +641,11 @@ def release(self, path, id): except CacheRetry: vos.logger.debug("Timeout Waiting for file release: %s", fh.cacheFileHandle.path) + if fh.cacheFileHandle.fileModified: + # This makes the node disapear from the nodeCache. + with self.client.nodeCache.volatile(path): + pass fh.release() - if ((fh.cacheFileHandle.fileModified) and - (fh.cacheFileHandle.path in self.node)): - #local copy modified. remove from list - vos.logger.debug("deleting old node %s " % path) - del self.node[fh.cacheFileHandle.path] except Exception, e: #unexpected problem raise FuseOSError(EIO) @@ -711,7 +685,6 @@ def rmdir(self, path): vos.logger.info("%s is locked." % path) raise FuseOSError(EPERM) self.client.delete(path) - self.delNode(path, force=True) #@logExceptions() def statfs(self, path): @@ -777,7 +750,6 @@ def unlink(self, path): self.cache.unlinkFile(path) if node: self.client.delete(path) - self.delNode(path, force=True) @logExceptions() def write(self, path, data, size, offset, id=None): diff --git a/vos/vos.py b/vos/vos.py index f3aaa6bda..8f44826d6 100644 --- a/vos/vos.py +++ b/vos/vos.py @@ -1,13 +1,15 @@ -"""A set of Python Classes for connecting to and interacting with a VOSpace service. - - Connections to VOSpace are made using a SSL X509 certificat which is stored in a .pem file. - The certificate is supplied by the user or by the CADC credential server +"""A set of Python Classes for connecting to and interacting with a VOSpace + service. + Connections to VOSpace are made using a SSL X509 certificat which is + stored in a .pem file. The certificate is supplied by the user or by the + CADC credential server """ import copy import errno import hashlib +from contextlib import nested from cStringIO import StringIO import html2text import httplib @@ -25,6 +27,7 @@ from xml.etree import ElementTree from logExceptions import logExceptions from copy import deepcopy +from NodeCache import NodeCache from __version__ import version @@ -33,29 +36,34 @@ if sys.version_info[1] > 6: logger.addHandler(logging.NullHandler()) + def get_logger(verbose=False, debug=False, quiet=False): """Sets up the logging for vos library. - vos clients should call get_logger with an OptionParser object to set the reporting level.""" + vos clients should call get_logger with an OptionParser object to set the + reporting level.""" log_format = "%(module)s: %(levelname)s: %(message)s" log_level = logging.ERROR - log_level = (debug and logging.DEBUG) or (verbose and logging.INFO) or ( quiet and logging.FATAL) or logging.ERROR + log_level = ((debug and logging.DEBUG) or (verbose and logging.INFO) or + (quiet and logging.FATAL) or logging.ERROR) if log_level == logging.DEBUG: - log_format = "%(levelname)s: @(%(asctime)s) thread:%(thread)d - %(module)s.%(funcName)s %(lineno)d: %(message)s" + log_format = "%(levelname)s: @(%(asctime)s) thread:%(thread)d - " \ + "%(module)s.%(funcName)s %(lineno)d: %(message)s" logger.setLevel(log_level) stream_handler = logging.StreamHandler() stream_handler.setFormatter(logging.Formatter(fmt=log_format)) logger.addHandler(stream_handler) return logger - + BUFSIZE = 8388608 # Size of read/write buffer MAX_RETRY_DELAY = 128 # maximum delay between retries -DEFAULT_RETRY_DELAY = 30 # start delay between retries when Try_After not specified by server +DEFAULT_RETRY_DELAY = 30 # start delay between retries when Try_After not + # specified by server MAX_RETRY_TIME = 900 # maximum time for retries before giving up... CONNECTION_TIMEOUT = 600 # seconds before HTTP connection should drop. SERVER = os.getenv('VOSPACE_WEBSERVICE', 'www.canfar.phys.uvic.ca') @@ -65,7 +73,8 @@ def get_logger(verbose=False, debug=False, quiet=False): class URLparse: """ Parse out the structure of a URL. - There is a difference between the 2.5 and 2.7 version of the urlparse.urlparse command, so here I roll my own... + There is a difference between the 2.5 and 2.7 version of the + urlparse.urlparse command, so here I roll my own... """ def __init__(self, url): @@ -73,14 +82,16 @@ def __init__(self, url): self.netloc = None self.args = None self.path = None - m = re.match("(^(?P[a-zA-Z]*):)?(//(?P[^/]*))?(?P/?[^?]*)?(?P\?.*)?", url) + m = re.match("(^(?P[a-zA-Z]*):)?(//(?P[^/]*))?" + "(?P/?[^?]*)?(?P\?.*)?", url) self.scheme = m.group('scheme') self.netloc = m.group('netloc') self.path = (m.group('path') is not None and m.group('path')) or '' self.args = (m.group('args') is not None and m.group('args')) or '' def __str__(self): - return "[scheme: %s, netloc: %s, path: %s]" % (self.scheme, self.netloc, self.path) + return "[scheme: %s, netloc: %s, path: %s]" % (self.scheme, + self.netloc, self.path) class Connection: @@ -90,7 +101,8 @@ def __init__(self, certfile=None, http_debug=False): """Setup the Certificate for later usage cerdServerURL -- the location of the cadc proxy certificate server - certfile -- where to store the certificate, if None then ${HOME}/.ssl or a temporary filename + certfile -- where to store the certificate, if None then + ${HOME}/.ssl or a temporary filename http_debug -- set True to generate httplib debug statements The user must supply a valid certificate. @@ -99,12 +111,14 @@ def __init__(self, certfile=None, http_debug=False): ## allow anonymous access if no certfile is specified... if certfile is not None and not os.access(certfile, os.F_OK): raise EnvironmentError(errno.EACCES, - "No certificate file found at %s\n (Perhaps use getCert to pull one)" % certfile) + "No certificate file found at %s\n" + " (Perhaps use getCert to pull one)" % certfile) self.certfile = certfile self.http_debug = http_debug def get_connection(self, url): - """Create an HTTPSConnection object and return. Uses the client certificate if None given. + """Create an HTTPSConnection object and return. Uses the client + certificate if None given. uri -- a VOSpace uri (vos://cadc.nrc.ca~vospace/path) certFilename -- the name of the certificate pem file. @@ -113,18 +127,22 @@ def get_connection(self, url): parts = URLparse(url) certfile = self.certfile - logger.debug("Trying to connect to %s://%s using %s" % (parts.scheme, parts.netloc, certfile)) + logger.debug("Trying to connect to %s://%s using %s" % + (parts.scheme, parts.netloc, certfile)) try: if parts.scheme == "https": - connection = httplib.HTTPSConnection(parts.netloc, key_file=certfile, cert_file=certfile, - timeout=CONNECTION_TIMEOUT) + connection = httplib.HTTPSConnection(parts.netloc, + key_file=certfile, cert_file=certfile, + timeout=CONNECTION_TIMEOUT) else: - connection = httplib.HTTPConnection(parts.netloc, timeout=CONNECTION_TIMEOUT) + connection = httplib.HTTPConnection(parts.netloc, + timeout=CONNECTION_TIMEOUT) except httplib.NotConnected as e: logger.error("HTTP connection to %s failed \n" % parts.netloc) logger.error("%s \n" % (str(e))) - raise OSError(errno.ECONNREFUSED, "VOSpace connection failed", parts.netloc) + raise OSError(errno.ECONNREFUSED, "VOSpace connection failed", + parts.netloc) if self.http_debug: connection.set_debuglevel(1) @@ -172,7 +190,8 @@ class Node: def __init__(self, node, node_type=None, properties=None, subnodes=None): """Create a Node object based on the DOM passed to the init method - if node is a string then create a node named node of nodeType with properties + if node is a string then create a node named node of nodeType with + properties """ if not subnodes: subnodes = [] @@ -236,7 +255,7 @@ def set_property(self, key, value): properties = self.node.find(Node.PROPERTIES) uri = "%s#%s" % (Node.IVOAURL, key) ElementTree.SubElement(properties, Node.PROPERTY, - attrib={'uri': uri, 'readOnly': 'false'}).text = value + attrib={'uri': uri, 'readOnly': 'false'}).text = value def __str__(self): class dummy: @@ -249,7 +268,8 @@ class dummy: return "".join(data) def setattr(self, attr={}): - """return a dictionary of attributes associated with the file stored at node + """return a dictionary of attributes associated with the file stored + at node These attributes are determind from the node on VOSpace. """ @@ -258,15 +278,19 @@ def setattr(self, attr={}): self.attr = {} node = self - ## Only one date provided by VOSpace, so use this as all possible dates. + ## Only one date provided by VOSpace, so use this as all possible + ## dates. sdate = node.props.get('date', None) atime = time.time() if not sdate: mtime = atime else: - ### mktime is expecting a localtime but we're sending a UT date, so some correction will be needed - mtime = time.mktime(time.strptime(sdate[0:-4], '%Y-%m-%dT%H:%M:%S')) - mtime = mtime - time.mktime(time.gmtime()) + time.mktime(time.localtime()) + ### mktime is expecting a localtime but we're sending a UT date, so + ### some correction will be needed + mtime = time.mktime(time.strptime(sdate[0:-4], + '%Y-%m-%dT%H:%M:%S')) + mtime = mtime - time.mktime(time.gmtime()) + \ + time.mktime(time.localtime()) self.attr['st_ctime'] = attr.get('st_ctime', mtime) self.attr['st_mtime'] = attr.get('st_mtime', mtime) self.attr['st_atime'] = atime @@ -283,7 +307,6 @@ def setattr(self, attr={}): else: st_mode |= stat.S_IFREG - ## Set the OWNER permissions ## All files are read/write/execute by owner... st_mode |= stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR @@ -303,11 +326,14 @@ def setattr(self, attr={}): self.attr['st_mode'] = attr.get('st_mode', st_mode) - ## We set the owner and group bits to be those of the currently running process. - ## This is a hack since we don't have an easy way to figure these out. TBD! + ## We set the owner and group bits to be those of the currently + ## running process. + ## This is a hack since we don't have an easy way to figure these out. + ## TBD! self.attr['st_uid'] = attr.get('st_uid', os.getuid()) self.attr['st_gid'] = attr.get('st_uid', os.getgid()) - self.attr['st_size'] = attr.get('st_size', int(node.props.get('length', 0))) + self.attr['st_size'] = attr.get('st_size', + int(node.props.get('length', 0))) self.attr['st_blocks'] = self.attr['st_size'] / 512 def setxattr(self, attrs={}): @@ -372,7 +398,7 @@ def fix_prop(self, prop): parts = URLparse(url) if parts.path is None or tag is None: - raise ValueError("Invalid VOSpace property uri: %s" % ( prop)) + raise ValueError("Invalid VOSpace property uri: %s" % (prop)) return prop @@ -386,7 +412,7 @@ def changeProp(self, key, value): This function should be split into 'set' and 'delete' """ - #logger.debug("Before change node XML\n %s" % ( self)) + #logger.debug("Before change node XML\n %s" % (self)) uri = self.fix_prop(key) changed = 0 found = False @@ -417,10 +443,9 @@ def changeProp(self, key, value): propertyNode.attrib['uri'] = uri propertyNode.text = value self.props[self.getPropName(uri)] = value - #logger.debug("After change node XML\n %s" %( self)) + #logger.debug("After change node XML\n %s" %(self)) return 1 - def chmod(self, mode): """Set the MODE of this Node... @@ -432,7 +457,7 @@ def chmod(self, mode): changed = 0 - #logger.debug("Changing mode to %d" % ( mode)) + #logger.debug("Changing mode to %d" % (mode)) if mode & (stat.S_IROTH): changed += self.setPublic('true') else: @@ -449,18 +474,19 @@ def chmod(self, mode): else: changed += self.chwgrp('') - #logger.debug("%d -> %s" % ( changed, changed>0)) + #logger.debug("%d -> %s" % (changed, changed>0)) return changed > 0 - def create(self, uri, nodeType="vos:DataNode", properties={}, subnodes=[]): - """Build the XML needed to represent a VOSpace node returns an ElementTree represenation of the XML + """Build the XML needed to represent a VOSpace node returns an + ElementTree represenation of the XML - nodeType -- the VOSpace node type, likely one of vos:DataNode or vos:ContainerNode - properties -- a dictionary of the node properties, all assumed to be single words from the IVOA list + nodeType -- the VOSpace node type, likely one of vos:DataNode or + vos:ContainerNode + properties -- a dictionary of the node properties, all assumed to be + single words from the IVOA list """ - ### Build the root node called 'node' node = ElementTree.Element("node") node.attrib["xmlns"] = Node.VOSNS @@ -469,12 +495,13 @@ def create(self, uri, nodeType="vos:DataNode", properties={}, subnodes=[]): node.attrib["uri"] = uri ### create a properties section - if not properties.has_key('type'): + if 'type' not in properties: properties['type'] = mimetypes.guess_type(uri)[0] #logger.debug("set type to %s" % (properties['type'])) propertiesNode = ElementTree.SubElement(node, Node.PROPERTIES) for property in properties.keys(): - propertyNode = ElementTree.SubElement(propertiesNode, Node.PROPERTY) + propertyNode = ElementTree.SubElement(propertiesNode, + Node.PROPERTY) propertyNode.attrib['readOnly'] = "false" ### There should be a '#' in there someplace... propertyNode.attrib["uri"] = "%s" % self.fix_prop(property) @@ -486,7 +513,6 @@ def create(self, uri, nodeType="vos:DataNode", properties={}, subnodes=[]): elif len(properties[property]) > 0: propertyNode.text = properties[property] - ## That's it for link nodes... if nodeType == "vos:LinkNode": return node @@ -494,18 +520,22 @@ def create(self, uri, nodeType="vos:DataNode", properties={}, subnodes=[]): ### create accepts accepts = ElementTree.SubElement(node, Node.ACCEPTS) - ElementTree.SubElement(accepts, "view").attrib['uri'] = "%s#%s" % (Node.IVOAURL, "defaultview") + ElementTree.SubElement(accepts, "view").attrib['uri'] = \ + "%s#%s" % (Node.IVOAURL, "defaultview") - ### create provides section provides = ElementTree.SubElement(node, Node.PROVIDES) - ElementTree.SubElement(provides, "view").attrib['uri'] = "%s#%s" % (Node.IVOAURL, 'defaultview') - ElementTree.SubElement(provides, "view").attrib['uri'] = "%s#%s" % (Node.CADCURL, 'rssview') + ElementTree.SubElement(provides, "view").attrib['uri'] = \ + "%s#%s" % (Node.IVOAURL, 'defaultview') + ElementTree.SubElement(provides, "view").attrib['uri'] = \ + "%s#%s" % (Node.CADCURL, 'rssview') ### Only DataNode can have a dataview... if nodeType == "vos:DataNode": - ElementTree.SubElement(provides, "view").attrib['uri'] = "%s#%s" % (Node.CADCURL, 'dataview') + ElementTree.SubElement(provides, "view").attrib['uri'] = \ + "%s#%s" % (Node.CADCURL, 'dataview') - ### if this is a container node then we need to add an empy directory contents area... + ### if this is a container node then we need to add an empy directory + ### contents area... if nodeType == "vos:ContainerNode": nodeList = ElementTree.SubElement(node, Node.NODES) for subnode in subnodes: @@ -533,13 +563,15 @@ def islocked(self): def getInfo(self): """Organize some information about a node and return as dictionary""" - date = time.mktime(time.strptime(self.props['date'][0:-4], '%Y-%m-%dT%H:%M:%S')) + date = time.mktime(time.strptime(self.props['date'][0:-4], + '%Y-%m-%dT%H:%M:%S')) #if date.tm_year==time.localtime().tm_year: # dateString=time.strftime('%d %b %H:%S',date) #else: # dateString=time.strftime('%d %b %Y',date) - creator = string.lower( - re.search('CN=([^,]*)', self.props.get('creator', 'CN=unknown_000,')).groups()[0].replace(' ', '_')) + creator = string.lower(re.search('CN=([^,]*)', + self.props.get('creator', 'CN=unknown_000,')) + .groups()[0].replace(' ', '_')) perm = [] writeGroup = "" readGroup = "" @@ -561,7 +593,7 @@ def getInfo(self): if readGroup != 'NONE': perm[4] = 'r' isLocked = self.props.get(Node.ISLOCKED, "false") - #logger.debug("%s: %s" %( self.name,self.props)) + #logger.debug("%s: %s" %(self.name,self.props)) return {"permissions": string.join(perm, ''), "creator": creator, "readGroup": readGroup, @@ -572,7 +604,8 @@ def getInfo(self): "target": self.target} def getNodeList(self): - """Get a list of all the nodes held to by a ContainerNode return a list of Node objects""" + """Get a list of all the nodes held to by a ContainerNode return a + list of Node objects""" if (self._nodeList is None): self._nodeList = [] for nodesNode in self.node.findall(Node.NODES): @@ -607,12 +640,13 @@ def getInfoList(self): return infoList.items() def setProps(self, props): - """Set the properties of node, given the properties element of that node""" + """Set the properties of node, given the properties element of that + node""" for propertyNode in props.findall(Node.PROPERTY): - self.props[self.getPropName(propertyNode.get('uri'))] = self.getPropValue(propertyNode) + self.props[self.getPropName(propertyNode.get('uri'))] = \ + self.getPropValue(propertyNode) return - def getPropName(self, prop): """parse the property uri and get the name of the property""" (url, propName) = urllib.splittag(prop) @@ -630,9 +664,11 @@ class VOFile: A class for managing http connections Attributes: - maxRetries - maximum number of retries when transient errors encountered. When set - too high (as the default value is) the number of retries are time limited (max 15min) - maxRetryTime - maximum time to retry for when transient errors are encountered + maxRetries - maximum number of retries when transient errors encountered. + When set too high (as the default value is) the number of + retries are time limited (max 15min) + maxRetryTime - maximum time to retry for when transient errors are + encountered """ errnos = {404: errno.ENOENT, @@ -642,8 +678,8 @@ class VOFile: ### if we get one of these codes, retry the command... ;-( retryCodes = (503, 408, 504, 412) - def __init__(self, url_list, connector, method, size=None, followRedirect=True, - range=None): + def __init__(self, url_list, connector, method, size=None, + followRedirect=True, range=None): self.closed = True self.connector = connector self.httpCon = None @@ -654,8 +690,9 @@ def __init__(self, url_list, connector, method, size=None, followRedirect=True, self.maxRetryTime = MAX_RETRY_TIME # TODO # Make all the calls to open send a list of URLs - # this should be redone during a cleanup. Basically, a GET might result in multiple - # URLs (list of URLs) but VOFile is also used to retrieve schema files and other info. + # this should be redone during a cleanup. Basically, a GET might + # result in multiple URLs (list of URLs) but VOFile is also used to + # retrieve schema files and other info. # All the calls should pass a list of URLs. Make sure that we # make a deep copy of the input list so that we don't @@ -675,7 +712,8 @@ def __init__(self, url_list, connector, method, size=None, followRedirect=True, self.retries = 0 self.fileSize = None - #logger.debug("Sending back VOFile object for file of size %s" % (str(self.size))) + #logger.debug("Sending back VOFile object for file of size %s" % + #(str(self.size))) def tell(self): return self._fpos @@ -689,7 +727,8 @@ def seek(self, offset, loc=os.SEEK_SET): self._fpos = int(self.size) - offset return - def close(self, code=(200, 201, 202, 206, 302, 303, 503, 416, 402, 408, 412, 504)): + def close(self, code=(200, 201, 202, 206, 302, 303, 503, 416, 402, 408, + 412, 504)): """close the connection""" if self.closed: return self.closed @@ -708,8 +747,8 @@ def close(self, code=(200, 201, 202, 206, 302, 303, 503, 416, 402, 408, 412, 504 self.httpCon.close() return self.closed - - def checkstatus(self, codes=(200, 201, 202, 206, 302, 303, 503, 416, 416, 402, 408, 412, 504)): + def checkstatus(self, codes=(200, 201, 202, 206, 302, 303, 503, 416, + 416, 402, 408, 412, 504)): """check the response status""" msgs = {404: "Node Not Found", 401: "Not Authorized", @@ -717,7 +756,8 @@ def checkstatus(self, codes=(200, 201, 202, 206, 302, 303, 503, 416, 416, 402, 4 408: "Connection Timeout"} logger.debug("status %d for URL %s" % (self.resp.status, self.url)) if self.resp.status not in codes: - logger.debug("Got status code: %s for %s" % (self.resp.status, self.url)) + logger.debug("Got status code: %s for %s" % + (self.resp.status, self.url)) msg = self.resp.read() if msg is not None: msg = html2text.html2text(msg, self.url).strip() @@ -727,7 +767,8 @@ def checkstatus(self, codes=(200, 201, 202, 206, 302, 303, 503, 416, 416, 402, 4 msg = msgs[self.resp.status] if self.resp.status == 401 and self.connector.certfile is None: msg += " using anonymous access " - raise IOError(VOFile.errnos.get(self.resp.status,self.resp.status), msg, self.url) + raise IOError(VOFile.errnos.get(self.resp.status, + self.resp.status), msg, self.url) self.size = self.resp.getheader("Content-Length", 0) if self.resp.status == 200: self.md5sum = self.resp.getheader("Content-MD5", None) @@ -751,7 +792,8 @@ def open(self, URL, method="GET", bytes=None): userAgent = 'vofs ' + version self.httpCon.putheader("User-Agent", userAgent) self.transEncode = None - #logger.debug("sending headers for file of size: %s " % (str(self.size))) + #logger.debug("sending headers for file of size: %s " % + #(str(self.size))) if method in ["PUT"]: try: self.size = int(self.size) @@ -788,12 +830,11 @@ def getFileInfo(self): """Return information harvested from the HTTP header""" return (self.totalFileSize, self.md5sum) - def read(self, size=None): """return size bytes from the connection response""" #logger.debug("Starting to read file by closing http(s) connection") - + read_error = None if not self.closed: try: @@ -819,29 +860,33 @@ def read(self, size=None): self.url = URL if not URL: #logger.debug("Raising error?") - raise IOError(errno.ENOENT, "No Location on redirect", self.url) + raise IOError(errno.ENOENT, "No Location on redirect", + self.url) if self.followRedirect: self.open(URL, "GET") #logger.debug("Following redirected URL: %s" % (URL)) return self.read(size) else: - #logger.debug("Got url:%s from redirect but not following" % (self.url)) + #logger.debug("Got url:%s from redirect but not following" % + #(self.url)) return self.url elif self.resp.status in VOFile.retryCodes: # Note: 404 (File Not Found) might be returned when: # 1. file deleted or replaced # 2. file migrated from cache # 3. hardware failure on storage node - # For 3. it is necessary to try the other URLs in the list otherwise this the - # failed URL might show up even after the caller tries to re-negotiate the transfer. - # For 1. and 2., calls to the other URLs in the list might or might not succeed. + # For 3. it is necessary to try the other URLs in the list + # otherwise this the failed URL might show up even after the + # caller tries to re-negotiate the transfer. + # For 1. and 2., calls to the other URLs in the list might or + # might not succeed. if self.urlIndex < len(self.URLs) - 1: # go to the next URL self.urlIndex += 1 self.open(self.URLs[self.urlIndex], "GET") return self.read(size) else: - self.URLs.pop(self.urlIndex) #remove url from list + self.URLs.pop(self.urlIndex) # remove url from list if len(self.URLs) == 0: # no more URLs to try... if read_error is not None: @@ -850,15 +895,16 @@ def read(self, size=None): raise IOError(errno.ENOENT, self.resp.read()) else: raise IOError(errno.EIO, - "unexpected server response %s (%d)" % - (self.resp.reason, self.resp.status), self.url) + "unexpected server response %s (%d)" % + (self.resp.reason, self.resp.status), self.url) if self.urlIndex < len(self.URLs): self.open(self.URLs[self.urlIndex], "GET") return self.read(size) ## start from top of URLs with a delay self.urlIndex = 0 - logger.error("Got %d: servers busy on %s" % (self.resp.status, self.URLs)) + logger.error("Got %d: servers busy on %s" % + (self.resp.status, self.URLs)) msg = self.resp.read() if msg is not None: msg = html2text.html2text(msg, self.url).strip() @@ -875,7 +921,8 @@ def read(self, size=None): else: self.currentRetryDelay = MAX_RETRY_DELAY - if (self.retries < self.maxRetries) and (self.totalRetryDelay < self.maxRetryTime): + if ((self.retries < self.maxRetries) and + (self.totalRetryDelay < self.maxRetryTime)): logger.error("retrying in %d seconds" % (ras)) self.totalRetryDelay = self.totalRetryDelay + ras self.retries = self.retries + 1 @@ -883,8 +930,10 @@ def read(self, size=None): self.open(self.URLs[self.urlIndex], "GET") return self.read(size) else: - raise IOError(self.resp.status, "failed to connect to server after multiple attempts %s (%d)" % ( - self.resp.reason, self.resp.status), self.url) + raise IOError(self.resp.status, + "failed to connect to server after multiple attempts" + " %s (%d)" % (self.resp.reason, self.resp.status), + self.url) def write(self, buf): """write buffer to the connection""" @@ -913,15 +962,15 @@ class Client: VO_HTTPSPUT_PROTOCOL = 'ivo://ivoa.net/vospace/core#httpsput' DWS = '/data/pub/' - ### reservered vospace properties, not to be used for extended property setting - vosProperties = ["description", "type", "encoding", "MD5", "length", "creator", "date", - "groupread", "groupwrite", "ispublic"] - + ### reservered vospace properties, not to be used for extended property + ### setting + vosProperties = ["description", "type", "encoding", "MD5", "length", + "creator", "date", "groupread", "groupwrite", "ispublic"] - def __init__(self, - certFile=os.path.join(os.getenv('HOME'), '.ssl/cadcproxy.pem'), - rootNode=None, conn=None, archive='vospace', - cadc_short_cut=False, http_debug=False, secure_get=False): + def __init__(self, certFile=os.path.join( + os.getenv('HOME'), '.ssl/cadcproxy.pem'), rootNode=None, + conn=None, archive='vospace', cadc_short_cut=False, + http_debug=False, secure_get=False): """This could/should be expanded to set various defaults certFile: CADC proxy certficate location. @@ -935,7 +984,8 @@ def __init__(self, if certFile is not None and not os.access(certFile, os.F_OK): ### can't get this certfile #logger.debug("Failed to access certfile %s " % (certFile)) - #logger.debug("Using anonymous mode, try getCert if you want to use authentication") + #logger.debug("Using anonymous mode, try getCert if you want to + # use authentication") certFile = None if certFile is None: self.protocol = "http" @@ -947,7 +997,7 @@ def __init__(self, self.VOSpaceServer = "cadc.nrc.ca!vospace" self.rootNode = rootNode self.archive = archive - self.nodeCache = {} + self.nodeCache = NodeCache() self.cadc_short_cut = cadc_short_cut self.secure_get = secure_get @@ -1014,8 +1064,9 @@ def copy(self, src, dest, sendMD5=False): raise OSError(errno.EIO, "MD5s don't match", src) return md5.hexdigest() - if destSize != srcSize and (srcNode is not None) and (srcNode.type != 'vos:LinkNode'): - logger.error("sizes don't match ( %s (%i) -> %s (%i)) " % + if (destSize != srcSize and (srcNode is not None) and + (srcNode.type != 'vos:LinkNode')): + logger.error("sizes don't match (%s (%i) -> %s (%i)) " % (src, srcSize, dest, destSize)) raise IOError(errno.EIO, "sizes don't match", src) return destSize @@ -1039,10 +1090,12 @@ def fixURI(self, uri): ## Check that path name compiles with the standard # Check for 'cutout' syntax values. - path = re.match("(?P[^\[]*)(?P(\[\d*\:?\d*\])?(\[\d*\:?\d*,?\d*\:?\d*\])?)", parts.path) + path = re.match("(?P[^\[]*)(?P(\[\d*\:?\d*\])?" + "(\[\d*\:?\d*,?\d*\:?\d*\])?)", parts.path) filename = os.path.basename(path.group('fname')) if not re.match("^[\_\-\(\)\=\+\!\,\;\:\@\&\*\$\.\w\~]*$", filename): - raise IOError(errno.EINVAL, "Illegal vospace container name", filename) + raise IOError(errno.EINVAL, "Illegal vospace container name", + filename) path = path.group('fname') ## insert the default VOSpace server if none given host = parts.netloc @@ -1051,49 +1104,61 @@ def fixURI(self, uri): path = os.path.normpath(path).strip('/') return "%s://%s/%s" % (parts.scheme, host, path) - def getNode(self, uri, limit=0, force=False): """connect to VOSpace and download the definition of vospace node uri -- a voSpace node in the format vos:/vospaceName/nodeName limit -- load children nodes in batches of limit """ - #logger.debug("Limit: %s " % ( str(limit))) - #logger.debug("Getting node %s" % ( uri)) + #logger.debug("Limit: %s " % (str(limit))) + logger.debug("Getting node %s" % (uri)) uri = self.fixURI(uri) - if force or uri not in self.nodeCache: - xml_file = StringIO(self.open(uri, os.O_RDONLY, limit=limit).read()) - xml_file.seek(0) - dom = ElementTree.parse(xml_file) - node = Node(dom.getroot()) - # IF THE CALLER KNOWS THEY DON'T NEED THE CHILDREN THEY - # CAN SET LIMIT=0 IN THE CALL Also, if the number of nodes - # on the firt call was less than 500, we likely got them - # all during the init - if limit != 0 and node.isdir() and len(node.getNodeList()) > 500: - nextURI = None - while nextURI != node.getNodeList()[-1].uri: - nextURI = node.getNodeList()[-1].uri - xml_file = StringIO(self.open(uri, os.O_RDONLY, nextURI=nextURI, limit=limit).read()) - xml_file.seek(0) - next_page = Node(ElementTree.parse(xml_file).getroot()) - if len(next_page.getNodeList()) > 0 and nextURI == next_page.getNodeList()[0].uri: - next_page.getNodeList().pop(0) - node.getNodeList().extend(next_page.getNodeList()) - logger.debug("Next URI currently %s" % ( nextURI)) - logger.debug("Last URI currently %s" % ( node.getNodeList()[-1].uri )) - self.nodeCache[uri] = node - for node in self.nodeCache[uri].getNodeList(): - self.nodeCache[node.uri] = node - return self.nodeCache[uri] - - - def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout=None, + node = None + if not force and uri in self.nodeCache: + node = self.nodeCache[uri] + if node is None: + logger.debug("Getting node from ws %s" % (uri)) + with self.nodeCache.watch(uri) as watch: + xml_file = StringIO(self.open(uri, os.O_RDONLY, + limit=limit).read()) + xml_file.seek(0) + dom = ElementTree.parse(xml_file) + node = Node(dom.getroot()) + watch.insert(node) + # IF THE CALLER KNOWS THEY DON'T NEED THE CHILDREN THEY + # CAN SET LIMIT=0 IN THE CALL Also, if the number of nodes + # on the firt call was less than 500, we likely got them + # all during the init + if (limit != 0 and node.isdir() and + len(node.getNodeList()) > 500): + nextURI = None + while nextURI != node.getNodeList()[-1].uri: + nextURI = node.getNodeList()[-1].uri + xml_file = StringIO(self.open(uri, os.O_RDONLY, + nextURI=nextURI, limit=limit).read()) + xml_file.seek(0) + next_page = Node(ElementTree.parse(xml_file).getroot()) + if (len(next_page.getNodeList()) > 0 and + nextURI == next_page.getNodeList()[0].uri): + next_page.getNodeList().pop(0) + node.getNodeList().extend(next_page.getNodeList()) + logger.debug("Next URI currently %s" % (nextURI)) + logger.debug("Last URI currently %s" % ( + node.getNodeList()[-1].uri)) + for childNode in node.getNodeList(): + logger.debug("child URI %s" % (childNode.uri)) + with self.nodeCache.watch(childNode.uri) as childWatch: + childWatch.insert(childNode) + return node + + def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, + cutout=None, full_negotiation=None): - """Split apart the node string into parts and return the correct URL for this node""" + """Split apart the node string into parts and return the correct + URL for this node""" uri = self.fixURI(uri) - logger.debug("Getting URL for: "+str(uri)) + logger.debug("Getting URL for: " + str(uri)) # full_negotiation is an override, so it can be used to # force either shortcut (false) or full negotiation (true) @@ -1102,7 +1167,8 @@ def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout else: do_shortcut = self.cadc_short_cut - logger.debug("do_shortcut=%i method=%s view=%s" % (do_shortcut, method, view)) + logger.debug("do_shortcut=%i method=%s view=%s" % (do_shortcut, + method, view)) if not do_shortcut and method == 'GET' and view in ['data', 'cutout']: return self._get(uri, view=view, cutout=cutout) @@ -1117,15 +1183,17 @@ def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout "[extension number][x1:x2,y1:y2]") parts = URLparse(uri) - logger.debug("parts: "+str(parts)) + logger.debug("parts: " + str(parts)) - # see if we have a VOSpace server that goes with this URI in our look up list + # see if we have a VOSpace server that goes with this URI in our + # look up list server = Client.VOServers.get(parts.netloc, None) if server is None: return uri URL = None - if do_shortcut and ((method == 'GET' and view in ['data', 'cutout']) or method == "PUT"): + if (do_shortcut and ((method == 'GET' and + view in ['data', 'cutout']) or method == "PUT")): ## only get here if do_shortcut == True # find out the URL to the CADC data server direction = {'GET': 'pullFromVoSpace', 'PUT': 'pushToVoSpace'} @@ -1155,7 +1223,8 @@ def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout if cutout is not None: args['cutout'] = cutout form = urllib.urlencode(args) - headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} + headers = {"Content-type": "application/x-www-form-urlencoded", + "Accept": "text/plain"} httpCon = self.conn.get_connection(url) httpCon.request("POST", Client.VOTransfer, form, headers) try: @@ -1168,18 +1237,19 @@ def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout # The file doesn't exist raise IOError(errno.ENOENT, "No location on redirect", url) else: - logger.error("GET/PUT shortcut not working. POST to %s returns: %s" % \ - (Client.VOTransfer, response.status)) - return self.getNodeURL(uri, method=method, view=view, limit=limit, nextURI=nextURI, cutout=cutout) + logger.error("GET/PUT shortcut not working. POST to %s" + " returns: %s" % + (Client.VOTransfer, response.status)) + return self.getNodeURL(uri, method=method, view=view, + limit=limit, nextURI=nextURI, cutout=cutout) except Exception as e: logger.error(str(e)) finally: httpCon.close() - logger.debug("Sending short cut url: %s" % ( URL)) + logger.debug("Sending short cut url: %s" % (URL)) return URL - ### this is a GET so we might have to stick some data onto the URL... fields = {} if limit is not None: @@ -1191,7 +1261,8 @@ def getNodeURL(self, uri, method='GET', view=None, limit=0, nextURI=None, cutout data = "" if len(fields) > 0: data = "?" + urllib.urlencode(fields) - URL = "%s://%s/vospace/nodes/%s%s" % (self.protocol, server, parts.path.strip('/'), data) + URL = "%s://%s/vospace/nodes/%s%s" % (self.protocol, server, + parts.path.strip('/'), data) logger.debug("URL: %s (%s)" % (URL, method)) return URL @@ -1200,13 +1271,13 @@ def link(self, srcURI, linkURI): if (self.isdir(linkURI)): linkURI = os.path.join(linkURI, os.path.basename(srcURI)) linkNode = Node(self.fixURI(linkURI), node_type="vos:LinkNode") - ElementTree.SubElement(linkNode.node, "target").text = self.fixURI(srcURI) + ElementTree.SubElement(linkNode.node, "target").text = \ + self.fixURI(srcURI) URL = self.getNodeURL(linkURI) f = VOFile(URL, self.conn, method="PUT", size=len(str(linkNode))) f.write(str(linkNode)) return f.close() - def move(self, srcURI, destURI): """Move srcUri to targetUri""" logger.debug("Moving %s to %s" % (srcURI, destURI)) @@ -1214,15 +1285,18 @@ def move(self, srcURI, destURI): transfer.attrib['xmlns'] = Node.VOSNS transfer.attrib['xmlns:vos'] = Node.VOSNS ElementTree.SubElement(transfer, "target").text = self.fixURI(srcURI) - ElementTree.SubElement(transfer, "direction").text = self.fixURI(destURI) + ElementTree.SubElement(transfer, "direction").text = \ + self.fixURI(destURI) ElementTree.SubElement(transfer, "keepBytes").text = "false" url = "%s://%s%s" % (self.protocol, SERVER, Client.VOTransfer) con = VOFile(url, self.conn, method="POST", followRedirect=False) - con.write(ElementTree.tostring(transfer)) - transURL = con.read() - if not self.getTransferError(transURL, srcURI): - return True + with nested(self.nodeCache.volatile(self.fixURI(srcURI)), + self.nodeCache.volatile(self.fixURI(destURI))): + con.write(ElementTree.tostring(transfer)) + transURL = con.read() + if not self.getTransferError(transURL, srcURI): + return True return False def _get(self, uri, view="defaultview", cutout=None): @@ -1237,7 +1311,7 @@ def transfer(self, uri, direction, view="defaultview", cutout=None): """Build the transfer XML document""" protocol = {"pullFromVoSpace": "%sget" % (self.protocol), "pushToVoSpace": "%sput" % (self.protocol)} - views = {"defaultview": "%s#%s" % ( Node.IVOAURL, "defaultview"), + views = {"defaultview": "%s#%s" % (Node.IVOAURL, "defaultview"), "data": "ivo://cadc.nrc.ca/vospace/view#data", "cutout": "ivo://cadc.nrc.ca/vospace/view#cutout" } @@ -1246,17 +1320,21 @@ def transfer(self, uri, direction, view="defaultview", cutout=None): transfer_xml.attrib['xmlns:vos'] = Node.VOSNS ElementTree.SubElement(transfer_xml, "target").text = uri ElementTree.SubElement(transfer_xml, "direction").text = direction - ElementTree.SubElement(transfer_xml, "view").attrib['uri'] = views.get(view, views["defaultview"]) + ElementTree.SubElement(transfer_xml, "view").attrib['uri'] = \ + views.get(view, views["defaultview"]) if cutout is not None: - ElementTree.SubElement(transfer_xml, "cutout").attrib['uri'] = cutout - ElementTree.SubElement(transfer_xml, "protocol").attrib['uri'] = "%s#%s" % (Node.IVOAURL, protocol[direction]) + ElementTree.SubElement(transfer_xml, "cutout").attrib['uri'] = \ + cutout + ElementTree.SubElement(transfer_xml, "protocol").attrib['uri'] = \ + "%s#%s" % (Node.IVOAURL, protocol[direction]) logger.debug(ElementTree.tostring(transfer_xml)) url = "%s://%s%s" % (self.protocol, SERVER, Client.VOTransfer) con = VOFile(url, self.conn, method="POST", followRedirect=False) con.write(ElementTree.tostring(transfer_xml)) transURL = con.read() logger.debug("Got back %s from trasnfer " % (transURL)) - con = StringIO(VOFile(transURL, self.conn, method="GET", followRedirect=True).read()) + con = StringIO(VOFile(transURL, self.conn, method="GET", + followRedirect=True).read()) con.seek(0) logger.debug(con.read()) con.seek(0) @@ -1289,20 +1367,22 @@ def getTransferError(self, url, uri): try: phaseURL = jobURL + "/phase" sleepTime = 1 - roller = ( '\\', '-', '/', '|', '\\', '-', '/', '|' ) - phase = VOFile(phaseURL, self.conn, method="GET", followRedirect=False).read() + roller = ('\\', '-', '/', '|', '\\', '-', '/', '|') + phase = VOFile(phaseURL, self.conn, method="GET", + followRedirect=False).read() # do not remove the line below. It is used for testing logger.debug("Job URL: " + jobURL + "/phase") while phase in ['PENDING', 'QUEUED', 'EXECUTING', 'UNKNOWN']: - # poll the job. Sleeping time in between polls is doubling each time - # until it gets to 32sec + # poll the job. Sleeping time in between polls is doubling + # each time until it gets to 32sec totalSlept = 0 if (sleepTime <= 32): sleepTime = 2 * sleepTime slept = 0 if logger.getEffectiveLevel() == logging.INFO: while slept < sleepTime: - sys.stdout.write("\r%s %s" % (phase, roller[totalSlept % len(roller)])) + sys.stdout.write("\r%s %s" % (phase, + roller[totalSlept % len(roller)])) sys.stdout.flush() slept += 1 totalSlept += 1 @@ -1310,16 +1390,20 @@ def getTransferError(self, url, uri): sys.stdout.write("\r \n") else: time.sleep(sleepTime) - phase = VOFile(phaseURL, self.conn, method="GET", followRedirect=False).read() - logger.debug("Async transfer Phase for url %s: %s " % (url, phase)) + phase = VOFile(phaseURL, self.conn, method="GET", + followRedirect=False).read() + logger.debug("Async transfer Phase for url %s: %s " % + (url, phase)) except KeyboardInterrupt: # abort the job when receiving a Ctrl-C/Interrupt from the client logger.error("Received keyboard interrupt") - con = VOFile(jobURL + "/phase", self.conn, method="POST", followRedirect=False) + con = VOFile(jobURL + "/phase", self.conn, method="POST", + followRedirect=False) con.write("PHASE=ABORT") con.read() raise KeyboardInterrupt - status = VOFile(phaseURL, self.conn, method="GET", followRedirect=False).read() + status = VOFile(phaseURL, self.conn, method="GET", + followRedirect=False).read() logger.debug("Phase: %s" % (status)) if status in ['COMPLETED']: return False @@ -1330,11 +1414,12 @@ def getTransferError(self, url, uri): con = VOFile(errorURL, self.conn, method="GET") errorMessage = con.read() logger.debug("Got transfer error %s on URI %s" % (errorMessage, uri)) - target = re.search("Unsupported link target:(?P .*)$", errorMessage) + target = re.search("Unsupported link target:(?P .*)$", + errorMessage) if target is not None: return target.group('target').strip() - raise OSError(errorCodes.get(errorMessage, errno.ENOENT), "%s: %s" % ( uri, errorMessage )) - + raise OSError(errorCodes.get(errorMessage, errno.ENOENT), + "%s: %s" % (uri, errorMessage)) def open(self, uri, mode=os.O_RDONLY, view=None, head=False, URL=None, limit=None, nextURI=None, size=None, cutout=None, range=None): @@ -1347,7 +1432,8 @@ def open(self, uri, mode=os.O_RDONLY, view=None, head=False, URL=None, if type(mode) == str: mode = os.O_RDONLY - # the URL of the connection depends if we are 'getting', 'putting' or 'posting' data + # the URL of the connection depends if we are 'getting', 'putting' or + # 'posting' data method = None if mode == os.O_RDONLY: method = "GET" @@ -1377,37 +1463,43 @@ def open(self, uri, mode=os.O_RDONLY, view=None, head=False, URL=None, raise e if URL is None: if target is not None: - logger.debug("%s is a link to %s" % ( node.uri, target)) - if re.search("^vos\://cadc\.nrc\.ca[!~]vospace", target) is not None: + logger.debug("%s is a link to %s" % (node.uri, target)) + if (re.search("^vos\://cadc\.nrc\.ca[!~]vospace", target) + is not None): # TODO - # the above re.search should use generic VOSpace uri search, not CADC specific. + # the above re.search should use generic VOSpace uri + # search, not CADC specific. i ## Since this is an CADC vospace link, just follow it. - return self.open(target, mode, view, head, URL, limit, nextURI, size, cutout, range) + return self.open(target, mode, view, head, URL, limit, + nextURI, size, cutout, range) else: # A target external to VOSpace, open the target directly # TODO # Need a way of passing along authentication. if cutout is not None: target = "{}?cutout={}".format(target, cutout) - return VOFile([target], self.conn, method=method, size=size, range=range) + return VOFile([target], self.conn, method=method, + size=size, range=range) else: - URL = self.getNodeURL(uri, method=method, view=view, limit=limit, nextURI=nextURI, - cutout=cutout) + URL = self.getNodeURL(uri, method=method, view=view, + limit=limit, nextURI=nextURI, cutout=cutout) return VOFile(URL, self.conn, method=method, size=size, range=range) - def addProps(self, node): - """Given a node structure do a POST of the XML to the VOSpace to update the node properties""" - #logger.debug("Updating %s" % ( node.name)) + """Given a node structure do a POST of the XML to the VOSpace to + update the node properties""" + #logger.debug("Updating %s" % (node.name)) #logger.debug(str(node.props)) ## Get a copy of what's on the server new_props = copy.deepcopy(node.props) old_props = self.getNode(node.uri, force=True).props for prop in old_props: - if prop in new_props and old_props[prop] == new_props[prop] and old_props[prop] is not None: + if (prop in new_props and old_props[prop] == new_props[prop] and + old_props[prop] is not None): del (new_props[prop]) - node.node = node.create(node.uri, nodeType=node.type, properties=new_props) + node.node = node.create(node.uri, nodeType=node.type, + properties=new_props) logger.debug(str(node)) f = self.open(node.uri, mode=os.O_APPEND, size=len(str(node))) f.write(str(node)) @@ -1420,20 +1512,23 @@ def create(self, node): return f.close() def update(self, node, recursive=False): - """Updates the node properties on the server. For non-recursive updates, node's - properties are updated on the server. For recursive updates, node should - only contain the properties to be changed in the node itself as well as - all its children. """ + """Updates the node properties on the server. For non-recursive + updates, node's properties are updated on the server. For + recursive updates, node should only contain the properties to + be changed in the node itself as well as all its children. """ ## Let's do this update using the async tansfer method URL = self.getNodeURL(node.uri) if recursive: - propURL = "%s://%s%s" % (self.protocol, SERVER, Client.VOProperties) - con = VOFile(propURL, self.conn, method="POST", followRedirect=False) + propURL = "%s://%s%s" % (self.protocol, SERVER, + Client.VOProperties) + con = VOFile(propURL, self.conn, method="POST", + followRedirect=False) con.write(str(node)) transURL = con.read() # logger.debug("Got back %s from $Client.VOProperties " % (con)) # Start the job - con = VOFile(transURL + "/phase", self.conn, method="POST", followRedirect=False) + con = VOFile(transURL + "/phase", self.conn, method="POST", + followRedirect=False) con.write("PHASE=RUN") con.close() self.getTransferError(transURL, node.uri) @@ -1455,8 +1550,9 @@ def mkdir(self, uri): def delete(self, uri): """Delete the node""" - # logger.debug("%s" % (uri)) - return self.open(uri, mode=os.O_TRUNC).close() + logger.debug("delete %s" % (uri)) + with self.nodeCache.volatile(self.fixURI(uri)): + return self.open(uri, mode=os.O_TRUNC).close() def getInfoList(self, uri): """Retrieve a list of tupples of (NodeName, Info dict)""" @@ -1483,7 +1579,7 @@ def listdir(self, uri, force=False): Walk through the directory structure a al os.walk. Setting force=True will make sure no caching of results are used. """ - #logger.debug("getting a listing of %s " % ( uri)) + #logger.debug("getting a listing of %s " % (uri)) names = [] logger.debug(str(uri)) node = self.getNode(uri, limit=None, force=force) @@ -1496,7 +1592,8 @@ def listdir(self, uri, force=False): return names def isdir(self, uri): - """Check to see if this given uri points at a containerNode or is a link to one.""" + """Check to see if this given uri points at a containerNode or is + a link to one.""" try: node = self.getNode(uri, limit=0) # logger.debug(node.type) @@ -1532,7 +1629,8 @@ def access(self, uri, mode=os.O_RDONLY): def status(self, uri, code=[200, 303, 302]): """Check to see if this given uri points at a containerNode. - This is done by checking the view=data header and seeing if you get an error. + This is done by checking the view=data header and seeing if you + get an error. """ return self.open(uri, view='data', head=True).close(code=code)