From 963a3a670f3f49d39b40ace3a46d13a8c21b40ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Mon, 6 Nov 2017 15:54:58 +0100 Subject: [PATCH] NXDRIVE-1026: Retry in case of connection timeout Also: - Bump version to 2.5.7 - Several references leak fixes --- docs/changes.md | 3 + docs/technical_changes.md | 4 +- nuxeo-drive-client/nxdrive/__init__.py | 2 +- .../nxdrive/client/base_automation_client.py | 23 +- .../nxdrive/engine/processor.py | 37 +-- .../nxdrive/engine/watcher/remote_watcher.py | 231 +++++++++--------- 6 files changed, 157 insertions(+), 143 deletions(-) diff --git a/docs/changes.md b/docs/changes.md index e5be84267f..212da985ee 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -1,6 +1,9 @@ # dev Release date: `2017-??-??` +### Core +- [NXDRIVE-1026](https://jira.nuxeo.com/browse/NXDRIVE-1026): Retry in case of connection timeout + # 2.5.6 Release date: `2017-11-02` diff --git a/docs/technical_changes.md b/docs/technical_changes.md index 2ae35f518a..88e826ded0 100644 --- a/docs/technical_changes.md +++ b/docs/technical_changes.md @@ -3,14 +3,14 @@ [//]: # (Note 3: keywords ordered [Added, Changed, Moved, Removed]) # dev - +- Removed `BaseAutomationClient.get_download_buffer()`. Use `FILE_BUFFER_SIZE` attribute instead. # 2.5.6 - Added `BaseAutomationClient.check_access()` - Added `BaseAutomationClient.server_reachable()` -- Added `Manager.open_metadata_window()` - Removed `LocalWatcher.get_windows_queue_threshold()` - Removed `LocalWatcher.set_windows_queue_threshold()` +- Added `Manager.open_metadata_window()` - Removed `WindowsIntegration.register_desktop_link()` - Removed `WindowsIntegration.unregister_desktop_link()` - Added utils.py::`get_device()` diff --git a/nuxeo-drive-client/nxdrive/__init__.py b/nuxeo-drive-client/nxdrive/__init__.py index cb1a1339ca..1be52d46dd 100644 --- a/nuxeo-drive-client/nxdrive/__init__.py +++ b/nuxeo-drive-client/nxdrive/__init__.py @@ -25,4 +25,4 @@ """ __author__ = 'Nuxeo' -__version__ = '2.5.6' +__version__ = '2.5.7' diff --git a/nuxeo-drive-client/nxdrive/client/base_automation_client.py b/nuxeo-drive-client/nxdrive/client/base_automation_client.py index 8859d5f558..d42d479154 100644 --- a/nuxeo-drive-client/nxdrive/client/base_automation_client.py +++ b/nuxeo-drive-client/nxdrive/client/base_automation_client.py @@ -336,7 +336,6 @@ def fetch_api(self): self.is_event_log_id = 'lowerBound' in [ param['name'] for param in change_summary_op['params']] - def execute(self, command, url=None, op_input=None, timeout=-1, check_params=False, void_op=False, extra_headers=None, enrichers=None, file_out=None, **params): @@ -407,12 +406,11 @@ def execute(self, command, url=None, op_input=None, timeout=-1, if self.check_suspended is not None: self.check_suspended('File download: %s' % file_out) - buffer_ = resp.read(self.get_download_buffer()) + buffer_ = resp.read(FILE_BUFFER_SIZE) if buffer_ == '': break if current_action: - current_action.progress += ( - self.get_download_buffer()) + current_action.progress += FILE_BUFFER_SIZE f.write(buffer_) return None, file_out finally: @@ -758,8 +756,8 @@ def _check_params(self, command, params): # TODO: add typechecking def _read_response(self, response, url): - info = response.info() - s = response.read() + info, s = response.info(), response.read() + del response # Fix reference leak content_type = info.get('content-type', '') cookies = self._get_cookies() if content_type.startswith("application/json"): @@ -797,7 +795,10 @@ def _log_details(e): message = detail log.error(message) if isinstance(e, urllib2.HTTPError): - return e.code, None, message, None + code = e.code + del e # Fix reference leak + return code, None, message, None + del e # Fix reference leak return None @staticmethod @@ -857,12 +858,11 @@ def do_get(self, url, file_out=None, digest=None, digest_algorithm=None): if self.check_suspended is not None: self.check_suspended('File download: %s' % file_out) - buffer_ = response.read(self.get_download_buffer()) + buffer_ = response.read(FILE_BUFFER_SIZE) if buffer_ == '': break if current_action: - current_action.progress += ( - self.get_download_buffer()) + current_action.progress += FILE_BUFFER_SIZE f.write(buffer_) if h is not None: h.update(buffer_) @@ -897,6 +897,3 @@ def do_get(self, url, file_out=None, digest=None, digest_algorithm=None): e.msg = base_error_message + ": " + e.msg raise - @staticmethod - def get_download_buffer(): - return FILE_BUFFER_SIZE diff --git a/nuxeo-drive-client/nxdrive/engine/processor.py b/nuxeo-drive-client/nxdrive/engine/processor.py index a0a1ce474f..79e412d15d 100644 --- a/nuxeo-drive-client/nxdrive/engine/processor.py +++ b/nuxeo-drive-client/nxdrive/engine/processor.py @@ -2,9 +2,10 @@ import os import shutil import sqlite3 +import socket from threading import Lock from time import sleep -from urllib2 import HTTPError +from urllib2 import HTTPError, URLError from PyQt4.QtCore import pyqtSignal @@ -125,11 +126,13 @@ def _unlock_path(self, path): def get_current_pair(self): return self._current_doc_pair + """ def _clean(self, reason, e=None): super(Processor, self)._clean(reason, e) if reason == 'exception' and self._current_doc_pair is not None: # Add it back to the queue ? Add the error delay self.increase_error(self._current_doc_pair, 'EXCEPTION', exception=e) + """ @staticmethod def check_pair_state(doc_pair): @@ -268,19 +271,6 @@ def _execute(self): self.pairSync.emit(doc_pair, self._current_metrics) except ThreadInterrupt: raise - except PairInterrupt: - # Wait one second to avoid retrying to quickly - self._current_doc_pair = None - log.debug('PairInterrupt wait 1s and requeue on %r', - doc_pair) - sleep(1) - self._engine.get_queue_manager().push(doc_pair) - continue - except DuplicationDisabledError: - self.giveup_error(doc_pair, 'DEDUP') - log.trace('Removing local_path on %r', doc_pair) - self._dao.remove_local_path(doc_pair.id) - continue except HTTPError as exc: if exc.code == 409: # Conflict # It could happen on multiple files drag'n drop @@ -290,10 +280,25 @@ def _execute(self): else: self._handle_pair_handler_exception( doc_pair, handler_name, exc) + del exc # Fix reference leak + continue + except (URLError, socket.error, PairInterrupt) as exc: + # socket.error for SSLError + log.debug('%s on %r, wait 1s and requeue', + type(exc).__name__, doc_pair) + sleep(1) + self._engine.get_queue_manager().push(doc_pair) + del exc # Fix reference leak + continue + except DuplicationDisabledError: + self.giveup_error(doc_pair, 'DEDUP') + log.trace('Removing local_path on %r', doc_pair) + self._dao.remove_local_path(doc_pair.id) continue except Exception as exc: self._handle_pair_handler_exception( doc_pair, handler_name, exc) + del exc # Fix reference leak continue except ThreadInterrupt: self._engine.get_queue_manager().push(doc_pair) @@ -303,6 +308,7 @@ def _execute(self): self.increase_error(doc_pair, 'EXCEPTION', exception=e) raise e finally: + self._current_doc_pair = None # Fix reference leak if soft_lock is not None: self._unlock_soft_path(soft_lock) self._dao.release_state(self._thread_id) @@ -311,7 +317,8 @@ def _execute(self): def _handle_pair_handler_exception(self, doc_pair, handler_name, e): if isinstance(e, IOError) and e.errno == 28: self._engine.noSpaceLeftOnDevice.emit() - log.exception(repr(e)) + self._engine.suspend() + log.exception('Unknown error') self.increase_error(doc_pair, "SYNC_HANDLER_%s" % handler_name, exception=e) def _synchronize_conflicted(self, doc_pair, local_client, remote_client): diff --git a/nuxeo-drive-client/nxdrive/engine/watcher/remote_watcher.py b/nuxeo-drive-client/nxdrive/engine/watcher/remote_watcher.py index b9176c0f1c..0eb1401822 100644 --- a/nuxeo-drive-client/nxdrive/engine/watcher/remote_watcher.py +++ b/nuxeo-drive-client/nxdrive/engine/watcher/remote_watcher.py @@ -220,6 +220,8 @@ def _scan_remote_scroll(self, doc_pair, remote_info, moved=False): remote_info.name, remote_info.uid, elapsed) scroll_id = scroll_res['scroll_id'] + del scroll_res['descendants'] # Fix reference leak + # Results are not necessarily sorted descendants_info = sorted(descendants_info, key=lambda x: x.path) @@ -473,13 +475,13 @@ def _handle_changes(self, first_pass=False): if full_scan is not None: self._partial_full_scan(full_scan) return None - else: + + paths = self._dao.get_paths_to_scan() + while paths: + remote_ref = paths[0].path + self._dao.update_config('remote_need_full_scan', remote_ref) + self._partial_full_scan(remote_ref) paths = self._dao.get_paths_to_scan() - while len(paths) > 0: - remote_ref = paths[0].path - self._dao.update_config('remote_need_full_scan', remote_ref) - self._partial_full_scan(remote_ref) - paths = self._dao.get_paths_to_scan() self._action = Action("Handle remote changes") self._update_remote_states() self._save_changes_state() @@ -544,17 +546,20 @@ def _update_remote_states(self): remote_path = '/' self._dao.add_path_to_scan(remote_path) self._dao.update_config('remote_need_full_scan', remote_path) + del summary['fileSystemChanges'] # Fix reference leak return if not summary['fileSystemChanges']: self._metrics['empty_polls'] += 1 self.noChangesFound.emit() + del summary['fileSystemChanges'] # Fix reference leak return # Fetch all events and consider the most recent folder first sorted_changes = sorted(summary['fileSystemChanges'], key=lambda x: x['eventDate'], reverse=True) n_changes = len(sorted_changes) + del summary['fileSystemChanges'] # Fix reference leak self._metrics['last_changes'] = n_changes self._metrics['empty_polls'] = 0 self.changesFound.emit(n_changes) @@ -563,7 +568,6 @@ def _update_remote_states(self): refreshed = set() delete_queue = [] for change in sorted_changes: - # Check if synchronization thread was suspended # TODO In case of pause or stop: save the last event id self._interact() @@ -575,9 +579,11 @@ def _update_remote_states(self): if refreshed_ref.endswith(remote_ref): processed = True break + if processed: # A more recent version was already processed continue + fs_item = change.get('fileSystemItem') new_info = self._client.file_to_info(fs_item) if fs_item else None @@ -599,110 +605,111 @@ def _update_remote_states(self): doc_pairs = [doc_pair] updated = False - if doc_pairs: - for doc_pair in doc_pairs: - doc_pair_repr = doc_pair.local_path if doc_pair.local_path is not None else doc_pair.remote_name - if event_id == 'deleted': - if fs_item is None: - if doc_pair.local_path == '': - log.debug("Delete pair from duplicate: %r", doc_pair) - self._dao.remove_state(doc_pair, remote_recursion=True) - continue - log.debug('Push doc_pair %r in delete queue', doc_pair_repr) - delete_queue.append(doc_pair) - else: - log.debug('Ignore delete on doc_pair %r as a fsItem is attached', doc_pair_repr) - # To ignore completely put updated to true - updated = True - break - elif fs_item is None: - if event_id == 'securityUpdated': - log.debug('Security has been updated for' - ' doc_pair %r denying Read access,' - ' marking it as deleted', - doc_pair_repr) - self._dao.delete_remote_state(doc_pair) - else: - log.debug('Unknown event: %r', event_id) + doc_pairs = doc_pairs or [] + for doc_pair in doc_pairs: + doc_pair_repr = doc_pair.local_path if doc_pair.local_path is not None else doc_pair.remote_name + if event_id == 'deleted': + if fs_item is None: + if doc_pair.local_path == '': + log.debug("Delete pair from duplicate: %r", doc_pair) + self._dao.remove_state(doc_pair, remote_recursion=True) + continue + log.debug('Push doc_pair %r in delete queue', doc_pair_repr) + delete_queue.append(doc_pair) else: - remote_parent_factory = doc_pair.remote_parent_ref.split('#', 1)[0] - new_info_parent_factory = new_info.parent_uid.split('#', 1)[0] - # Specific cases of a move on a locally edited doc - if event_id == 'documentMoved' and remote_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME: - # If moved from a non sync root to a sync root, - # break to creation case (updated is False). - # If moved from a sync root to a non sync root, - # break to noop (updated is True). - break - elif (event_id == 'documentMoved' - and new_info_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME): - # If moved from a sync root to a non sync root, delete from local sync root - log.debug('Marking doc_pair %r as deleted', doc_pair_repr) - self._dao.delete_remote_state(doc_pair) - else: - # Make new_info consistent with actual doc pair parent path for a doc member of a - # collection (typically the Locally Edited one) that is also under a sync root. - # Indeed, in this case, when adapted as a FileSystemItem, its parent path will be the one - # of the sync root because it takes precedence over the collection, - # see AbstractDocumentBackedFileSystemItem constructor. - consistent_new_info = new_info - if remote_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME: - new_info_parent_uid = doc_pair.remote_parent_ref - new_info_path = doc_pair.remote_parent_path + '/' + remote_ref - consistent_new_info = RemoteFileInfo( - new_info.name, - new_info.uid, - new_info_parent_uid, - new_info_path, - new_info.folderish, - new_info.last_modification_time, - new_info.last_contributor, - new_info.digest, - new_info.digest_algorithm, - new_info.download_url, - new_info.can_rename, - new_info.can_delete, - new_info.can_update, - new_info.can_create_child, - new_info.lock_owner, - new_info.lock_created, - new_info.can_scroll_descendants, - ) - # Perform a regular document update on a document - # that has been updated, renamed or moved - log.debug('Refreshing remote state info for ' - 'doc_pair=%r, event_id=%r ' - '(force_recursion=%d)', doc_pair_repr, - event_id, event_id == 'securityUpdated') - - # Force remote state update in case of a locked / unlocked event since lock info is not - # persisted, so not part of the dirty check - lock_update = event_id in ('documentLocked', - 'documentUnlocked') - if doc_pair.remote_state != 'created': - if (new_info.digest != doc_pair.remote_digest - or safe_filename(new_info.name) != doc_pair.remote_name - or new_info.parent_uid != doc_pair.remote_parent_ref - or event_id == 'securityUpdated' - or lock_update): - doc_pair.remote_state = 'modified' - remote_parent_path = os.path.dirname(new_info.path) - # TODO Add modify local_path and local_parent_path if needed - self._dao.update_remote_state(doc_pair, new_info, remote_parent_path=remote_parent_path, - force_update=lock_update) - if doc_pair.folderish: - log.trace('Force scan recursive on %r : %d', doc_pair, event_id == 'securityUpdated') - self._force_remote_scan(doc_pair, consistent_new_info, remote_path=new_info.path, - force_recursion=event_id == 'securityUpdated', - moved=event_id == 'documentMoved') - if lock_update: - doc_pair = self._dao.get_state_from_id(doc_pair.id) - try: - self._handle_readonly(self._local_client, doc_pair) - except (OSError, IOError) as ex: - log.trace('Cannot handle readonly for %r (%r)', doc_pair, ex) - updated = True - refreshed.add(remote_ref) + log.debug('Ignore delete on doc_pair %r as a fsItem is attached', doc_pair_repr) + # To ignore completely put updated to true + updated = True + break + elif fs_item is None: + if event_id == 'securityUpdated': + log.debug('Security has been updated for' + ' doc_pair %r denying Read access,' + ' marking it as deleted', + doc_pair_repr) + self._dao.delete_remote_state(doc_pair) + else: + log.debug('Unknown event: %r', event_id) + else: + remote_parent_factory = doc_pair.remote_parent_ref.split('#', 1)[0] + new_info_parent_factory = new_info.parent_uid.split('#', 1)[0] + # Specific cases of a move on a locally edited doc + if event_id == 'documentMoved' and remote_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME: + # If moved from a non sync root to a sync root, + # break to creation case (updated is False). + # If moved from a sync root to a non sync root, + # break to noop (updated is True). + break + elif (event_id == 'documentMoved' + and new_info_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME): + # If moved from a sync root to a non sync root, delete from local sync root + log.debug('Marking doc_pair %r as deleted', doc_pair_repr) + self._dao.delete_remote_state(doc_pair) + else: + # Make new_info consistent with actual doc pair parent path for a doc member of a + # collection (typically the Locally Edited one) that is also under a sync root. + # Indeed, in this case, when adapted as a FileSystemItem, its parent path will be the one + # of the sync root because it takes precedence over the collection, + # see AbstractDocumentBackedFileSystemItem constructor. + consistent_new_info = new_info + if remote_parent_factory == COLLECTION_SYNC_ROOT_FACTORY_NAME: + new_info_parent_uid = doc_pair.remote_parent_ref + new_info_path = doc_pair.remote_parent_path + '/' + remote_ref + consistent_new_info = RemoteFileInfo( + new_info.name, + new_info.uid, + new_info_parent_uid, + new_info_path, + new_info.folderish, + new_info.last_modification_time, + new_info.last_contributor, + new_info.digest, + new_info.digest_algorithm, + new_info.download_url, + new_info.can_rename, + new_info.can_delete, + new_info.can_update, + new_info.can_create_child, + new_info.lock_owner, + new_info.lock_created, + new_info.can_scroll_descendants, + ) + # Perform a regular document update on a document + # that has been updated, renamed or moved + log.debug('Refreshing remote state info for ' + 'doc_pair=%r, event_id=%r ' + '(force_recursion=%d)', doc_pair_repr, + event_id, event_id == 'securityUpdated') + + # Force remote state update in case of a locked / unlocked event since lock info is not + # persisted, so not part of the dirty check + lock_update = event_id in ('documentLocked', + 'documentUnlocked') + if doc_pair.remote_state != 'created': + if (new_info.digest != doc_pair.remote_digest + or safe_filename(new_info.name) != doc_pair.remote_name + or new_info.parent_uid != doc_pair.remote_parent_ref + or event_id == 'securityUpdated' + or lock_update): + doc_pair.remote_state = 'modified' + remote_parent_path = os.path.dirname(new_info.path) + # TODO Add modify local_path and local_parent_path if needed + self._dao.update_remote_state(doc_pair, new_info, remote_parent_path=remote_parent_path, + force_update=lock_update) + if doc_pair.folderish: + log.trace('Force scan recursive on %r : %d', doc_pair, event_id == 'securityUpdated') + self._force_remote_scan(doc_pair, consistent_new_info, remote_path=new_info.path, + force_recursion=event_id == 'securityUpdated', + moved=event_id == 'documentMoved') + if lock_update: + doc_pair = self._dao.get_state_from_id(doc_pair.id) + try: + self._handle_readonly(self._local_client, doc_pair) + except (OSError, IOError) as exc: + log.trace('Cannot handle readonly for %r (%r)', doc_pair, exc) + del exc # Fix reference leak + updated = True + refreshed.add(remote_ref) if new_info and not updated: # Handle new document creations @@ -736,8 +743,8 @@ def _update_remote_states(self): skip = False for processed in delete_processed: path = processed.local_path - if path[-1] != "/": - path = path + "/" + if path[-1] != '/': + path += '/' if delete_pair.local_path.startswith(path): skip = True break