Skip to content

Commit

Permalink
NXDRIVE-1026: Retry in case of connection timeout
Browse files Browse the repository at this point in the history
Also:
  - Bump version to 2.5.7
  - Several references leak fixes
  • Loading branch information
BoboTiG authored Nov 6, 2017
1 parent 60fd99b commit 963a3a6
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 143 deletions.
3 changes: 3 additions & 0 deletions docs/changes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# dev
Release date: `2017-??-??`

### Core
- [NXDRIVE-1026](https://jira.nuxeo.com/browse/NXDRIVE-1026): Retry in case of connection timeout


# 2.5.6
Release date: `2017-11-02`
Expand Down
4 changes: 2 additions & 2 deletions docs/technical_changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
[//]: # (Note 3: keywords ordered [Added, Changed, Moved, Removed])

# dev

- Removed `BaseAutomationClient.get_download_buffer()`. Use `FILE_BUFFER_SIZE` attribute instead.

# 2.5.6
- Added `BaseAutomationClient.check_access()`
- Added `BaseAutomationClient.server_reachable()`
- Added `Manager.open_metadata_window()`
- Removed `LocalWatcher.get_windows_queue_threshold()`
- Removed `LocalWatcher.set_windows_queue_threshold()`
- Added `Manager.open_metadata_window()`
- Removed `WindowsIntegration.register_desktop_link()`
- Removed `WindowsIntegration.unregister_desktop_link()`
- Added utils.py::`get_device()`
Expand Down
2 changes: 1 addition & 1 deletion nuxeo-drive-client/nxdrive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
"""

__author__ = 'Nuxeo'
__version__ = '2.5.6'
__version__ = '2.5.7'
23 changes: 10 additions & 13 deletions nuxeo-drive-client/nxdrive/client/base_automation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ def fetch_api(self):
self.is_event_log_id = 'lowerBound' in [
param['name'] for param in change_summary_op['params']]


def execute(self, command, url=None, op_input=None, timeout=-1,
check_params=False, void_op=False, extra_headers=None,
enrichers=None, file_out=None, **params):
Expand Down Expand Up @@ -407,12 +406,11 @@ def execute(self, command, url=None, op_input=None, timeout=-1,
if self.check_suspended is not None:
self.check_suspended('File download: %s'
% file_out)
buffer_ = resp.read(self.get_download_buffer())
buffer_ = resp.read(FILE_BUFFER_SIZE)
if buffer_ == '':
break
if current_action:
current_action.progress += (
self.get_download_buffer())
current_action.progress += FILE_BUFFER_SIZE
f.write(buffer_)
return None, file_out
finally:
Expand Down Expand Up @@ -758,8 +756,8 @@ def _check_params(self, command, params):
# TODO: add typechecking

def _read_response(self, response, url):
info = response.info()
s = response.read()
info, s = response.info(), response.read()
del response # Fix reference leak
content_type = info.get('content-type', '')
cookies = self._get_cookies()
if content_type.startswith("application/json"):
Expand Down Expand Up @@ -797,7 +795,10 @@ def _log_details(e):
message = detail
log.error(message)
if isinstance(e, urllib2.HTTPError):
return e.code, None, message, None
code = e.code
del e # Fix reference leak
return code, None, message, None
del e # Fix reference leak
return None

@staticmethod
Expand Down Expand Up @@ -857,12 +858,11 @@ def do_get(self, url, file_out=None, digest=None, digest_algorithm=None):
if self.check_suspended is not None:
self.check_suspended('File download: %s'
% file_out)
buffer_ = response.read(self.get_download_buffer())
buffer_ = response.read(FILE_BUFFER_SIZE)
if buffer_ == '':
break
if current_action:
current_action.progress += (
self.get_download_buffer())
current_action.progress += FILE_BUFFER_SIZE
f.write(buffer_)
if h is not None:
h.update(buffer_)
Expand Down Expand Up @@ -897,6 +897,3 @@ def do_get(self, url, file_out=None, digest=None, digest_algorithm=None):
e.msg = base_error_message + ": " + e.msg
raise

@staticmethod
def get_download_buffer():
return FILE_BUFFER_SIZE
37 changes: 22 additions & 15 deletions nuxeo-drive-client/nxdrive/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import os
import shutil
import sqlite3
import socket
from threading import Lock
from time import sleep
from urllib2 import HTTPError
from urllib2 import HTTPError, URLError

from PyQt4.QtCore import pyqtSignal

Expand Down Expand Up @@ -125,11 +126,13 @@ def _unlock_path(self, path):
def get_current_pair(self):
return self._current_doc_pair

"""
def _clean(self, reason, e=None):
super(Processor, self)._clean(reason, e)
if reason == 'exception' and self._current_doc_pair is not None:
# Add it back to the queue ? Add the error delay
self.increase_error(self._current_doc_pair, 'EXCEPTION', exception=e)
"""

@staticmethod
def check_pair_state(doc_pair):
Expand Down Expand Up @@ -268,19 +271,6 @@ def _execute(self):
self.pairSync.emit(doc_pair, self._current_metrics)
except ThreadInterrupt:
raise
except PairInterrupt:
# Wait one second to avoid retrying to quickly
self._current_doc_pair = None
log.debug('PairInterrupt wait 1s and requeue on %r',
doc_pair)
sleep(1)
self._engine.get_queue_manager().push(doc_pair)
continue
except DuplicationDisabledError:
self.giveup_error(doc_pair, 'DEDUP')
log.trace('Removing local_path on %r', doc_pair)
self._dao.remove_local_path(doc_pair.id)
continue
except HTTPError as exc:
if exc.code == 409: # Conflict
# It could happen on multiple files drag'n drop
Expand All @@ -290,10 +280,25 @@ def _execute(self):
else:
self._handle_pair_handler_exception(
doc_pair, handler_name, exc)
del exc # Fix reference leak
continue
except (URLError, socket.error, PairInterrupt) as exc:
# socket.error for SSLError
log.debug('%s on %r, wait 1s and requeue',
type(exc).__name__, doc_pair)
sleep(1)
self._engine.get_queue_manager().push(doc_pair)
del exc # Fix reference leak
continue
except DuplicationDisabledError:
self.giveup_error(doc_pair, 'DEDUP')
log.trace('Removing local_path on %r', doc_pair)
self._dao.remove_local_path(doc_pair.id)
continue
except Exception as exc:
self._handle_pair_handler_exception(
doc_pair, handler_name, exc)
del exc # Fix reference leak
continue
except ThreadInterrupt:
self._engine.get_queue_manager().push(doc_pair)
Expand All @@ -303,6 +308,7 @@ def _execute(self):
self.increase_error(doc_pair, 'EXCEPTION', exception=e)
raise e
finally:
self._current_doc_pair = None # Fix reference leak
if soft_lock is not None:
self._unlock_soft_path(soft_lock)
self._dao.release_state(self._thread_id)
Expand All @@ -311,7 +317,8 @@ def _execute(self):
def _handle_pair_handler_exception(self, doc_pair, handler_name, e):
if isinstance(e, IOError) and e.errno == 28:
self._engine.noSpaceLeftOnDevice.emit()
log.exception(repr(e))
self._engine.suspend()
log.exception('Unknown error')
self.increase_error(doc_pair, "SYNC_HANDLER_%s" % handler_name, exception=e)

def _synchronize_conflicted(self, doc_pair, local_client, remote_client):
Expand Down
Loading

0 comments on commit 963a3a6

Please sign in to comment.