Skip to content

Commit

Permalink
Add exponential backoff retry to transaction retry decorator (#37)
Browse files Browse the repository at this point in the history
* Add exponential backoff retry to transaction retry decorator

* Added arguments `retries` and `backoff_factor` to `retry_transaction`
  decorator.
* Update pylint dependency to resolve crash during ci

* Use namekos retry decorator for transaction retry

* Revert back to own implementation of retry

- Use behavior and api (backoff argument names) from urllib3 retry
- Add explanation of backoff algorithm to README
- Fix incorrect test comments
  • Loading branch information
daviskirk authored and mattbennett committed Feb 26, 2019
1 parent 9871e33 commit da82b64
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 15 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Release Notes
=============

Unreleased
----------

* Add retry arguments to `retry_transaction` decorator.

Version 1.4.0
-------------
Expand Down Expand Up @@ -29,7 +33,7 @@ Released 2018-03-15
(fixes #25)
* Change default behaviour of context manager so sessions are closed
on worker teardown rather than context manager exit (closes #24)

Version 1.1.0
-------------

Expand Down Expand Up @@ -68,7 +72,7 @@ Version 0.0.3
Released 2016-08-26

* Added `pytest` fixtures for managing session in tests.

Version 0.0.2
-------------

Expand Down
23 changes: 22 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,28 @@ or using with the ``Database`` dependency provider
self.db.session.add(ExampleModel(data='hello'))
self.db.session.commit()
Optionally, the transaction may be retried multiple times with an exponential backoff delay.
If given, the backoff factor is applied between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
The applied delay will then be::

{backoff_factor} * (2 ** ({number of total retries} - 1))

seconds.
The delay will never be longer than `backoff_max` seconds.
By default, backoff is disabled (`backoff_factor` set to 0).
Finally, if the connection has still not recovered after `total` tries, the error is reraised.
The following code will thus wait for 0.0s, 0.2s, 0.4s, 0.8s, 1.0s before raising an error:

.. code-block:: python
@transaction_retry(total=5, backoff_factor=0.1, backoff_max=1.0)
def get_example_data():
db_session.query(ExampleModel).all()
example_data = get_example_data()
.. caution::

Using the decorator may cause unanticipated consequences when the decorated function uses more than one transaction.
Expand Down Expand Up @@ -320,4 +342,3 @@ If ``toxiproxy-api-url`` and ``toxiproxy-db-url`` parameters are provided the te
--toxiproxy-db-url="http://toxiproxy_server:3306"
if no ``toxiproxy-api-url`` and ``toxiproxy-db-url`` parameter was provided the tests that require toxiproxy will be skipped.

59 changes: 48 additions & 11 deletions nameko_sqlalchemy/transaction_retry.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,65 @@
from time import sleep
import functools
import operator

import wrapt
from sqlalchemy import exc


def transaction_retry(wrapped=None, session=None):
def transaction_retry(wrapped=None, session=None, total=1,
backoff_factor=0, backoff_max=None):

if wrapped is None:
return functools.partial(transaction_retry, session=session)
return functools.partial(
transaction_retry, session=session,
total=total,
backoff_factor=backoff_factor,
backoff_max=backoff_max)

@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):

try:
return wrapped(*args, **kwargs)
except exc.OperationalError as exception:
if exception.connection_invalidated:
if isinstance(session, operator.attrgetter):
session(instance).rollback()
elif session:
session.rollback()
@retry(total, backoff_factor, backoff_max, exc.OperationalError)
def run_or_rollback():
try:
return wrapped(*args, **kwargs)
else:
except exc.OperationalError as exception:
if exception.connection_invalidated:
if isinstance(session, operator.attrgetter):
session(instance).rollback()
elif session:
session.rollback()
raise

return run_or_rollback()

return wrapper(wrapped) # pylint: disable=E1120


def retry(total, backoff_factor, backoff_max, exceptions):

total = max(total, 1)
backoff_factor = max(backoff_factor, 0)
backoff_max = float('inf') if backoff_max is None else max(0, backoff_max)

@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
errors = 0
while True:
try:
return wrapped(*args, **kwargs)
except exceptions:
errors += 1

if errors > total:
raise

if errors >= 2:
backoff_value = backoff_factor * (2 ** (errors - 1))
else:
backoff_value = 0
backoff_value = min(backoff_value, backoff_max)

sleep(backoff_value)

return wrapper
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
'dev': [
"coverage==4.0.3",
"flake8==2.5.4",
"pylint==1.7.5",
"pylint==1.9.4",
"pytest==2.9.1",
"requests==2.18.4",
"PyMySQL",
Expand Down
116 changes: 116 additions & 0 deletions test/test_transaction_retry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import operator
import sys

import pytest
from mock import Mock
from nameko.exceptions import ExtensionNotFound
from nameko.testing.services import entrypoint_hook
from nameko.testing.services import dummy
Expand Down Expand Up @@ -299,3 +301,117 @@ def test_create_with_worker_scoped_session(
pass # not implemented in ExampleServiceWithDatabaseSession
else:
assert db_session.query(ExampleModel).count() == 2


def _op_exc(connection_invalidated=False):
return OperationalError(
None, None, None, connection_invalidated=connection_invalidated)


@pytest.mark.parametrize(
'retry_kwargs,'
'call_results,'
'expected_result,expected_sleeps',
[
# success on first try
({'total': 0, 'backoff_factor': 0.5}, [1], 1, []),
# success on first try
({'total': 1, 'backoff_factor': 0.5}, [1], 1, []),
# single retry + success
({'total': 1, 'backoff_factor': 0.5}, [_op_exc(), 2], 2, [0]),
# single retry + success, same even if more retries would have been
# possible
({'total': 2, 'backoff_factor': 0.5}, [_op_exc(), 2], 2, [0]),
# Unspecified exception
(
{'total': 1, 'backoff_factor': 0.5},
[ValueError()],
ValueError,
[]
),
# Specified exception, then unspecified exception
(
{'total': 10, 'backoff_factor': 0.5},
[_op_exc(), ValueError(), 3],
ValueError,
[0]
),
# Multiple specified exception, then success
(
{'total': 10, 'backoff_factor': 0.5},
[_op_exc() for _ in range(4)] + [5],
5,
[0.0, 1.0, 2.0, 4.0]
),
# Multiple specified exception with max backoff, then success
(
{'total': 10, 'backoff_factor': 0.5, 'backoff_max': 1.2},
[_op_exc() for _ in range(4)] + [5],
5,
[0.0, 1.0, 1.2, 1.2]
),
# Multiple specified exception without delay, then success
(
{'total': 10, 'backoff_factor': 0},
[_op_exc() for _ in range(4)] + [5],
5,
[0, 0, 0, 0]
),
# Max retries exceeded
(
{'total': 3, 'backoff_factor': 0.5},
[_op_exc() for _ in range(4)],
OperationalError, [0, 1.0, 2.0]
),
])
def test_retry_configuration(retry_kwargs, call_results,
expected_result, expected_sleeps,
monkeypatch):
sleeps = []

mocked_sleep = Mock(side_effect=lambda delay: sleeps.append(delay))
monkeypatch.setattr(sys.modules['nameko_sqlalchemy.transaction_retry'],
'sleep', mocked_sleep)

mocked_fcn = Mock()
mocked_fcn.side_effect = call_results

decorator = transaction_retry(**retry_kwargs)
decorated = decorator(mocked_fcn)

if (
isinstance(expected_result, type) and
issubclass(expected_result, Exception)
):
with pytest.raises(expected_result):
decorated()

else:
result = decorated()
assert expected_result == result

assert expected_sleeps == sleeps


def test_multiple_retries_with_disabled_connection(
toxiproxy_db_session, toxiproxy, disconnect
):
if not toxiproxy:
pytest.skip('Toxiproxy not installed')

state = {'calls': 0}

@transaction_retry(session=toxiproxy_db_session, total=3)
def get_model_count():
state['calls'] += 1
if state['calls'] >= 3:
toxiproxy.enable()
return toxiproxy_db_session.query(ExampleModel).count()

toxiproxy_db_session.add(ExampleModel(data='hello1'))
toxiproxy_db_session.add(ExampleModel(data='hello2'))
toxiproxy_db_session.commit()

toxiproxy.disable()
assert get_model_count() == 2
assert state['calls'] == 3

0 comments on commit da82b64

Please sign in to comment.