From 3a8cd1fa6a2f795bd3fb262e21d16ec3a8ea281a Mon Sep 17 00:00:00 2001 From: aldraco Date: Fri, 7 Sep 2018 11:46:06 -0600 Subject: [PATCH 01/24] Log how long the penalty box waits for the client to reconnect. --- fluster/penalty_box.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fluster/penalty_box.py b/fluster/penalty_box.py index a606960..b3f9da8 100644 --- a/fluster/penalty_box.py +++ b/fluster/penalty_box.py @@ -35,12 +35,14 @@ def get(self): now = time.time() while self._clients and self._clients[0][0] < now: _, (client, last_wait) = heapq.heappop(self._clients) + s = time.time() try: client.echo('test') # reconnected if this succeeds. self._client_ids.remove(client.pool_id) yield client except (ConnectionError, TimeoutError): + timer = time.time() - s wait = min(int(last_wait * self._multiplier), self._max_wait) heapq.heappush(self._clients, (time.time() + wait, (client, wait))) - log.info('%r is still down. Retrying in %ss.', client, wait) + log.info('%r is still down after %s seconds. Retrying in %ss.', client, timer, wait) From 9466c25239ba8e3cfb46b4de70fa3389df910ea2 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 10 Sep 2018 11:43:34 -0600 Subject: [PATCH 02/24] Add method for manually penalizing a client. --- fluster/cluster.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 819b35a..bdd1629 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -73,10 +73,7 @@ def wrapper(*args, **kwargs): try: return fn(*args, **kwargs) except (ConnectionError, TimeoutError): # TO THE PENALTY BOX! - if client in self.active_clients: # hasn't been removed yet - log.warning('%r marked down.', client) - self.active_clients.remove(client) - self.penalty_box.add(client) + self.penalize_client(client) raise return functools.update_wrapper(wrapper, fn) @@ -125,6 +122,26 @@ def get_client(self, shard_key): pos = hashed % len(self.active_clients) return self.active_clients[pos] + def penalize_client(self, client): + """Place client in the penalty box manually. + + Useful for situations where clients are used via dependency injection, and + the methods cannot be wrapped safely to penalize automatically. + + n.b. the `get_client` method is the mechanism used to periodically check + the penalized clients to see if any are ready again. If your code is manually + penalizing clients, make sure it utilizes the `get_client` method to + refresh the penalty box as well. + + :param client: Client object + """ + if client in self.active_clients: # hasn't been removed yet + log.warning('%r marked down.', client) + self.active_clients.remove(client) + self.penalty_box.add(client) + else: + log.info("%r not in active client list.") + def zrevrange_with_int_score(self, key, max_score, min_score): """Get the zrevrangebyscore across the cluster. Highest score for duplicate element is returned. From 92a96d4f34e7a097ca556201da788f9be2fb6ca6 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 15:08:25 -0600 Subject: [PATCH 03/24] Add cycle_clients functionality for even client distribution and automatic penalty box handling. --- fluster/cluster.py | 48 ++++++++++++++++++++++++++ tests/fluster/test_cluster.py | 65 +++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/fluster/cluster.py b/fluster/cluster.py index bdd1629..b6f3023 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -1,4 +1,5 @@ from collections import defaultdict +from itertools import cycle import functools import logging @@ -39,6 +40,8 @@ def __init__(self, self.active_clients = self._prep_clients(clients) self.initial_clients = {c.pool_id: c for c in clients} self._sort_clients() + # maintain cycle trackers + self._requesters = {} def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" @@ -122,6 +125,51 @@ def get_client(self, shard_key): pos = hashed % len(self.active_clients) return self.active_clients[pos] + def get_client_cycle(self, requester, cycles=1): + """Yield clients, maintaining a pointer to the last used client for each requester. + + Will not generate the same client more than `cycles` times per use of the function. + + Also handles down nodes. + + :param requester: Hashable key for each requesting object. + :param cycles: Max times to return each client per call. + """ + if requester not in self._requesters: + # create a cycle for it + print("Making a cycle for this requester") + self._requesters[requester] = cycle(self.initial_clients.values()) + + # penalty box maintenance + added = False + for client in self.penalty_box.get(): + log.info('Client %r is back up.', client) + self.active_clients.append(client) + added = True + if added: + self._sort_clients() + + if len(self.active_clients) == 0: + raise ClusterEmptyError('All clients are down.') + + # yield clients if they are in the active clients list + conn_cycle = self._requesters[requester] + placemarker = None + rounds = 0 + for next_client in conn_cycle: + # either we've just started, or we completed a cycle + if not placemarker: + placemarker = next_client + elif next_client == placemarker: + rounds += 1 + + if rounds >= cycles: + raise StopIteration + + if next_client in self.active_clients: + yield next_client + + def penalize_client(self, client): """Place client in the penalty box manually. diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index 9b9cf34..a973e12 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -96,6 +96,71 @@ def test_consistent_hashing(self): # Bring it back up self.instances[0] = RedisInstance(10101) + def test_cycle_clients(self): + requester1 = "hashablething" + requester2 = "anotherhashablething" + # should cycle through clients the given number of times + desired_cycles = 2 + counter = 0 + returned_clients = set() + for client in self.cluster.get_client_cycle(requester1, cycles=desired_cycles): + counter += 1 + returned_clients.update([client]) + + assert counter == (desired_cycles * len(self.cluster.active_clients)) + assert len(returned_clients) == len(self.cluster.active_clients) + + # should not include inactive nodes + self.instances[0].terminate() + + counter = 0 + key = 'silly' + for client in self.cluster.get_client_cycle(requester1): + try: + client.incr(key, 1) + counter += 1 + except: + continue # handled by the cluster + + # Restart instance + self.instances[0] = RedisInstance(10101) + time.sleep(0.5) + + assert counter == 2 + assert counter == len(self.cluster.active_clients) + assert counter == len(self.cluster.initial_clients.values()) - 1 + + # should add restarted nodes back to the list after reported failure + counter = 0 + for client in self.cluster.get_client_cycle(requester1): + client.incr(key, 1) + counter += 1 + + assert counter == len(self.cluster.active_clients) + assert counter == 3 # to verify it added the node back + + # should track separate cycle entry points for each requester + client1 = client2 = None + for counter, client in enumerate(self.cluster.get_client_cycle(requester1)): + if counter == 0: + client1 = client + break + + for counter, client in enumerate(self.cluster.get_client_cycle(requester2)): + if counter == 0: + # make sure it counts from the beginning + assert client == client1 + + elif counter == 1: + client2 = client + + # requester1 should pick up where it left off + for counter, client in enumerate(self.cluster.get_client_cycle(requester1)): + if counter == 0: + assert client == client2 + break + + def test_zrevrange(self): """Add a sorted set, turn off the client, add to the set, turn the client back on, check results From bd3015053844af5864c27ff4d34bcfd8d1954d89 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 15:17:47 -0600 Subject: [PATCH 04/24] Extract method for clearing penalty box periodically. --- fluster/cluster.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index b6f3023..98bde65 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -92,11 +92,10 @@ def wrapper(*args, **kwargs): log.debug('Wrapping %s', name) setattr(client, name, wrap(obj)) - def get_client(self, shard_key): - """Get the client for a given shard, based on what's available. + def _prune_penalty_box(self): + """Restores clients that have reconnected. - If the proper client isn't available, the next available client - is returned. If no clients are available, an exception is raised. + This function should be called first for every public method. """ added = False for client in self.penalty_box.get(): @@ -106,6 +105,14 @@ def get_client(self, shard_key): if added: self._sort_clients() + def get_client(self, shard_key): + """Get the client for a given shard, based on what's available. + + If the proper client isn't available, the next available client + is returned. If no clients are available, an exception is raised. + """ + self._prune_penalty_box() + if len(self.active_clients) == 0: raise ClusterEmptyError('All clients are down.') @@ -140,14 +147,7 @@ def get_client_cycle(self, requester, cycles=1): print("Making a cycle for this requester") self._requesters[requester] = cycle(self.initial_clients.values()) - # penalty box maintenance - added = False - for client in self.penalty_box.get(): - log.info('Client %r is back up.', client) - self.active_clients.append(client) - added = True - if added: - self._sort_clients() + self._prune_penalty_box() if len(self.active_clients) == 0: raise ClusterEmptyError('All clients are down.') From 13f5d1b8d66266e5931914cf4fabd4146b0d02b6 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 15:23:07 -0600 Subject: [PATCH 05/24] Better names, cleanup. --- fluster/cluster.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 98bde65..7e223e9 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -40,8 +40,8 @@ def __init__(self, self.active_clients = self._prep_clients(clients) self.initial_clients = {c.pool_id: c for c in clients} self._sort_clients() - # maintain cycle trackers - self._requesters = {} + # maintain separate cycle trackers for each requester + self._requester_cycles = {} def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" @@ -133,27 +133,23 @@ def get_client(self, shard_key): return self.active_clients[pos] def get_client_cycle(self, requester, cycles=1): - """Yield clients, maintaining a pointer to the last used client for each requester. + """Yield clients, maintaining separate cycles for each requester. - Will not generate the same client more than `cycles` times per use of the function. - - Also handles down nodes. + Will not generate the same client more than `cycles` times, + per use of the function. Also handles down nodes. :param requester: Hashable key for each requesting object. :param cycles: Max times to return each client per call. """ - if requester not in self._requesters: - # create a cycle for it - print("Making a cycle for this requester") - self._requesters[requester] = cycle(self.initial_clients.values()) + if requester not in self._requester_cycles: + self._requester_cycles[requester] = cycle(self.initial_clients.values()) self._prune_penalty_box() if len(self.active_clients) == 0: raise ClusterEmptyError('All clients are down.') - # yield clients if they are in the active clients list - conn_cycle = self._requesters[requester] + conn_cycle = self._requester_cycles[requester] placemarker = None rounds = 0 for next_client in conn_cycle: @@ -169,7 +165,6 @@ def get_client_cycle(self, requester, cycles=1): if next_client in self.active_clients: yield next_client - def penalize_client(self, client): """Place client in the penalty box manually. From c6ef7c1c3bc7a65ea844bf2de8f8c6ddc86b9cf4 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 15:33:11 -0600 Subject: [PATCH 06/24] Separate tests into units to make them easier to understand (and possibly fix). --- tests/fluster/test_cluster.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index a973e12..68a17a8 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -33,6 +33,8 @@ def setUp(self): self.cluster = FlusterCluster([i.conn for i in self.instances], penalty_box_min_wait=0.5) self.keys = ['hi', 'redis', 'test'] # hashes to 3 separate values + self.requester1 = "hashablething" + self.requester2 = "anotherhashablething" def tearDown(self): for instance in self.instances: @@ -97,30 +99,28 @@ def test_consistent_hashing(self): self.instances[0] = RedisInstance(10101) def test_cycle_clients(self): - requester1 = "hashablething" - requester2 = "anotherhashablething" # should cycle through clients the given number of times desired_cycles = 2 counter = 0 returned_clients = set() - for client in self.cluster.get_client_cycle(requester1, cycles=desired_cycles): + for client in self.cluster.get_client_cycle(self.requester1, cycles=desired_cycles): counter += 1 returned_clients.update([client]) assert counter == (desired_cycles * len(self.cluster.active_clients)) assert len(returned_clients) == len(self.cluster.active_clients) + def test_cycle_clients_with_failures(self): # should not include inactive nodes self.instances[0].terminate() counter = 0 - key = 'silly' - for client in self.cluster.get_client_cycle(requester1): + for client in self.cluster.get_client_cycle(self.requester1): try: - client.incr(key, 1) + client.incr('key', 1) counter += 1 except: - continue # handled by the cluster + continue # exception handled by the cluster # Restart instance self.instances[0] = RedisInstance(10101) @@ -132,21 +132,22 @@ def test_cycle_clients(self): # should add restarted nodes back to the list after reported failure counter = 0 - for client in self.cluster.get_client_cycle(requester1): - client.incr(key, 1) + for client in self.cluster.get_client_cycle(self.requester1): + client.incr('key', 1) counter += 1 assert counter == len(self.cluster.active_clients) assert counter == 3 # to verify it added the node back + def test_cycle_clients_tracking(self): # should track separate cycle entry points for each requester client1 = client2 = None - for counter, client in enumerate(self.cluster.get_client_cycle(requester1)): + for counter, client in enumerate(self.cluster.get_client_cycle(self.requester1)): if counter == 0: client1 = client break - for counter, client in enumerate(self.cluster.get_client_cycle(requester2)): + for counter, client in enumerate(self.cluster.get_client_cycle(self.requester2)): if counter == 0: # make sure it counts from the beginning assert client == client1 @@ -154,13 +155,12 @@ def test_cycle_clients(self): elif counter == 1: client2 = client - # requester1 should pick up where it left off - for counter, client in enumerate(self.cluster.get_client_cycle(requester1)): + # self.requester1 should pick up where it left off + for counter, client in enumerate(self.cluster.get_client_cycle(self.requester1)): if counter == 0: assert client == client2 break - def test_zrevrange(self): """Add a sorted set, turn off the client, add to the set, turn the client back on, check results From 19b018686a3f4253c3f1c7ab5f2786beae0ccac3 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 17:27:34 -0600 Subject: [PATCH 07/24] Penalize should not be public. Prune penalty box for all public functions. --- fluster/cluster.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 7e223e9..81b18e8 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -76,7 +76,7 @@ def wrapper(*args, **kwargs): try: return fn(*args, **kwargs) except (ConnectionError, TimeoutError): # TO THE PENALTY BOX! - self.penalize_client(client) + self._penalize_client(client) raise return functools.update_wrapper(wrapper, fn) @@ -165,16 +165,8 @@ def get_client_cycle(self, requester, cycles=1): if next_client in self.active_clients: yield next_client - def penalize_client(self, client): - """Place client in the penalty box manually. - - Useful for situations where clients are used via dependency injection, and - the methods cannot be wrapped safely to penalize automatically. - - n.b. the `get_client` method is the mechanism used to periodically check - the penalized clients to see if any are ready again. If your code is manually - penalizing clients, make sure it utilizes the `get_client` method to - refresh the penalty box as well. + def _penalize_client(self, client): + """Place client in the penalty box. :param client: Client object """ @@ -190,6 +182,8 @@ def zrevrange_with_int_score(self, key, max_score, min_score): Highest score for duplicate element is returned. A faster method should be written if scores are not needed. """ + self._prune_penalty_box() + if len(self.active_clients) == 0: raise ClusterEmptyError('All clients are down.') From ea95b458dac2d1e7602a6b422922e029782a4d5e Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 17 Sep 2018 17:29:17 -0600 Subject: [PATCH 08/24] Clarify purpose of timing in penalty box function. --- fluster/penalty_box.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fluster/penalty_box.py b/fluster/penalty_box.py index b3f9da8..db4e142 100644 --- a/fluster/penalty_box.py +++ b/fluster/penalty_box.py @@ -35,14 +35,14 @@ def get(self): now = time.time() while self._clients and self._clients[0][0] < now: _, (client, last_wait) = heapq.heappop(self._clients) - s = time.time() + connect_start = time.time() try: client.echo('test') # reconnected if this succeeds. self._client_ids.remove(client.pool_id) yield client except (ConnectionError, TimeoutError): - timer = time.time() - s + timer = time.time() - connect_start wait = min(int(last_wait * self._multiplier), self._max_wait) heapq.heappush(self._clients, (time.time() + wait, (client, wait))) - log.info('%r is still down after %s seconds. Retrying in %ss.', client, timer, wait) + log.info('%r is still down after a %s second attempt to connect. Retrying in %ss.', client, timer, wait) From 2c37bd4cdb4a32ea3d49d98630800c8aea89ca98 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 1 Oct 2018 15:14:09 -0600 Subject: [PATCH 09/24] Move active client cycle to class and manage state there. --- fluster/cluster.py | 28 ++-------- fluster/utils.py | 61 +++++++++++++++++++++ tests/fluster/test_cluster.py | 65 ----------------------- tests/fluster/test_utils.py | 99 +++++++++++++++++++++++++++++++++++ 4 files changed, 164 insertions(+), 89 deletions(-) create mode 100644 fluster/utils.py create mode 100644 tests/fluster/test_utils.py diff --git a/fluster/cluster.py b/fluster/cluster.py index 81b18e8..57e7653 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -9,6 +9,7 @@ from .exceptions import ClusterEmptyError from .penalty_box import PenaltyBox +from .utils import ActiveClientCycle log = logging.getLogger(__name__) @@ -41,7 +42,7 @@ def __init__(self, self.initial_clients = {c.pool_id: c for c in clients} self._sort_clients() # maintain separate cycle trackers for each requester - self._requester_cycles = {} + #self._requester_cycles = {} def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" @@ -132,7 +133,7 @@ def get_client(self, shard_key): pos = hashed % len(self.active_clients) return self.active_clients[pos] - def get_client_cycle(self, requester, cycles=1): + def get_active_client_cycle(self, rounds=1): """Yield clients, maintaining separate cycles for each requester. Will not generate the same client more than `cycles` times, @@ -141,29 +142,8 @@ def get_client_cycle(self, requester, cycles=1): :param requester: Hashable key for each requesting object. :param cycles: Max times to return each client per call. """ - if requester not in self._requester_cycles: - self._requester_cycles[requester] = cycle(self.initial_clients.values()) + return ActiveClientCycle(self, rounds=rounds) - self._prune_penalty_box() - - if len(self.active_clients) == 0: - raise ClusterEmptyError('All clients are down.') - - conn_cycle = self._requester_cycles[requester] - placemarker = None - rounds = 0 - for next_client in conn_cycle: - # either we've just started, or we completed a cycle - if not placemarker: - placemarker = next_client - elif next_client == placemarker: - rounds += 1 - - if rounds >= cycles: - raise StopIteration - - if next_client in self.active_clients: - yield next_client def _penalize_client(self, client): """Place client in the penalty box. diff --git a/fluster/utils.py b/fluster/utils.py new file mode 100644 index 0000000..f6f62fe --- /dev/null +++ b/fluster/utils.py @@ -0,0 +1,61 @@ +from itertools import cycle + +from .exceptions import ClusterEmptyError + + +class ActiveClientCycle(object): + """Tracks last returned client, will not iterate more than `rounds` times. + + Useful when you need to evenly cycle through active connections to check + each once for work. + + Each user of the class should instantiate a separate object to correctly track + the last requested client for that requester. + """ + def __init__(self, cluster, rounds=1): + self.cluster = cluster + self.round_limit = rounds + self.clients = cycle(cluster.initial_clients.values()) + + # initialize pointers and trackers + self.current_client = None + self.round_start = None + self.rounds_completed = 0 + + def __iter__(self): + """Restarts the `rounds` tracker, and updates active clients.""" + self.round_start = None + self.rounds_completed = 0 + + self.cluster._prune_penalty_box() # XX TODO make public + + if len(self.cluster.active_clients) == 0: + raise ClusterEmptyError('All clients are down.') + + return self + + def __next__(self): + nxt = None + while nxt is None: + nxt = self._next_helper() + + return nxt + + def _next_helper(self): + """Returns an active connection, unless this iterable has already cycled + through too many times. + """ + curr = next(self.clients) + + # manage round counting + if not self.round_start: + self.round_start = curr + elif self.round_start == curr: + self.rounds_completed += 1 + + if self.rounds_completed >= self.round_limit: + raise StopIteration + + # check active clients + if curr in self.cluster.active_clients: + return curr diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index 68a17a8..9b9cf34 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -33,8 +33,6 @@ def setUp(self): self.cluster = FlusterCluster([i.conn for i in self.instances], penalty_box_min_wait=0.5) self.keys = ['hi', 'redis', 'test'] # hashes to 3 separate values - self.requester1 = "hashablething" - self.requester2 = "anotherhashablething" def tearDown(self): for instance in self.instances: @@ -98,69 +96,6 @@ def test_consistent_hashing(self): # Bring it back up self.instances[0] = RedisInstance(10101) - def test_cycle_clients(self): - # should cycle through clients the given number of times - desired_cycles = 2 - counter = 0 - returned_clients = set() - for client in self.cluster.get_client_cycle(self.requester1, cycles=desired_cycles): - counter += 1 - returned_clients.update([client]) - - assert counter == (desired_cycles * len(self.cluster.active_clients)) - assert len(returned_clients) == len(self.cluster.active_clients) - - def test_cycle_clients_with_failures(self): - # should not include inactive nodes - self.instances[0].terminate() - - counter = 0 - for client in self.cluster.get_client_cycle(self.requester1): - try: - client.incr('key', 1) - counter += 1 - except: - continue # exception handled by the cluster - - # Restart instance - self.instances[0] = RedisInstance(10101) - time.sleep(0.5) - - assert counter == 2 - assert counter == len(self.cluster.active_clients) - assert counter == len(self.cluster.initial_clients.values()) - 1 - - # should add restarted nodes back to the list after reported failure - counter = 0 - for client in self.cluster.get_client_cycle(self.requester1): - client.incr('key', 1) - counter += 1 - - assert counter == len(self.cluster.active_clients) - assert counter == 3 # to verify it added the node back - - def test_cycle_clients_tracking(self): - # should track separate cycle entry points for each requester - client1 = client2 = None - for counter, client in enumerate(self.cluster.get_client_cycle(self.requester1)): - if counter == 0: - client1 = client - break - - for counter, client in enumerate(self.cluster.get_client_cycle(self.requester2)): - if counter == 0: - # make sure it counts from the beginning - assert client == client1 - - elif counter == 1: - client2 = client - - # self.requester1 should pick up where it left off - for counter, client in enumerate(self.cluster.get_client_cycle(self.requester1)): - if counter == 0: - assert client == client2 - break - def test_zrevrange(self): """Add a sorted set, turn off the client, add to the set, turn the client back on, check results diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py new file mode 100644 index 0000000..83c1775 --- /dev/null +++ b/tests/fluster/test_utils.py @@ -0,0 +1,99 @@ +from __future__ import absolute_import, print_function +import time +import unittest +import sys + +from testinstances import RedisInstance + +from fluster import FlusterCluster + + +class FlusterClusterTests(unittest.TestCase): + + def assertCountEqual(self, a, b): + if sys.version_info > (3, 0): + super(FlusterClusterTests, self).assertCountEqual(a, b) + else: + self.assertItemsEqual(a, b) + + @classmethod + def setUpClass(cls): + cls.instances = [RedisInstance(10101), + RedisInstance(10102), + RedisInstance(10103)] + + @classmethod + def tearDownClass(cls): + for instance in cls.instances: + instance.terminate() + + def setUp(self): + self.cluster = FlusterCluster([i.conn for i in self.instances], + penalty_box_min_wait=0.5) + self.keys = ['hi', 'redis', 'test'] # hashes to 3 separate values + + def tearDown(self): + for instance in self.instances: + if hasattr(instance.conn, 'pool_id'): + delattr(instance.conn, 'pool_id') + + def test_cycle_clients(self): + # should cycle through clients the given number of times + desired_rounds = 2 + returned_clients = set() + active_clients_cycle = self.cluster.get_active_client_cycle(rounds=desired_rounds) + + assert True + + for idx, client in enumerate(active_clients_cycle): + returned_clients.update([client]) + assert client is not None + + assert (idx + 1) == (desired_rounds * len(self.cluster.active_clients)) + assert len(returned_clients) == len(self.cluster.active_clients) + + def test_cycle_clients_with_failures(self): + # should not include inactive nodes + self.instances[0].terminate() + + desired_rounds = 2 + counter = 0 + active_clients_cycle = self.cluster.get_active_client_cycle(rounds=desired_rounds) + + for client in active_clients_cycle: + assert client is not None + try: + client.incr('key', 1) + counter += 1 + except Exception as e: + print("oops", client, e) + continue # exception handled by the cluster + + # Restart instance + self.instances[0] = RedisInstance(10101) + time.sleep(0.5) + + assert counter == 4 # 2 rounds, 2 working clients each round + assert 2 == len(self.cluster.active_clients) + assert 2 == len(self.cluster.initial_clients.values()) - 1 + + # should add restarted nodes back to the list after reported failure + # calling __iter__ again checks the penalty box + counter = 0 + for client in active_clients_cycle: + client.incr('key', 1) + counter += 1 + + assert counter == len(self.cluster.active_clients) * desired_rounds + assert len(self.cluster.active_clients) == 3 # to verify it added the node back + + def test_cycle_clients_tracking(self): + # should track separate cycle entry points for each instance + active_cycle_1 = self.cluster.get_active_client_cycle(rounds=1) + active_cycle_2 = self.cluster.get_active_client_cycle(rounds=1) + + # advance cycle 1 + next(active_cycle_1) + + # should not start at the same point + assert next(active_cycle_1) != next(active_cycle_2) From dc69e5d43ffbf64dedfe3a859d328fe28d597fca Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 1 Oct 2018 15:48:28 -0600 Subject: [PATCH 10/24] Check for dead clients earlier to avoid infinite loops or silent errors. --- fluster/utils.py | 7 ++++--- tests/fluster/test_utils.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/fluster/utils.py b/fluster/utils.py index f6f62fe..9234628 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -29,12 +29,13 @@ def __iter__(self): self.cluster._prune_penalty_box() # XX TODO make public - if len(self.cluster.active_clients) == 0: - raise ClusterEmptyError('All clients are down.') - return self def __next__(self): + """Always returns a client, or raises an Exception if none are available.""" + if len(self.cluster.active_clients) == 0: + raise ClusterEmptyError('All clients are down.') + nxt = None while nxt is None: nxt = self._next_helper() diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index 83c1775..c74b293 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -5,7 +5,7 @@ from testinstances import RedisInstance -from fluster import FlusterCluster +from fluster import FlusterCluster, ClusterEmptyError class FlusterClusterTests(unittest.TestCase): @@ -97,3 +97,30 @@ def test_cycle_clients_tracking(self): # should not start at the same point assert next(active_cycle_1) != next(active_cycle_2) + + def test_dropped_connections_while_iterating(self): + # dropped connections in the middle of an iteration should not cause an infinite loop + # and should raise an exception + active_cycle = self.cluster.get_active_client_cycle(rounds=7) + + assert len(self.cluster.active_clients) == 3 + + drop_at_idx = (5, 6, 7) # at these points, kill a connection + killed = 0 + with self.assertRaises(ClusterEmptyError) as context: + for idx, client in enumerate(active_cycle): + if idx in drop_at_idx: + self.instances[killed].terminate() + killed += 1 + print('killed ', idx, killed) + try: + client.incr('key', 1) + except: + pass # mimic err handling + self.assertTrue('All clients are down.' in str(context.exception)) + + assert idx == 8 # the next iteration after the last client was killed + + # restart all the instances + for instance, port in enumerate(range(10101, 10104)): + self.instances[instance] = RedisInstance(port) From 3925354740e0f584d3bec8e381780af1354c17f3 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 1 Oct 2018 15:53:50 -0600 Subject: [PATCH 11/24] Cleanup, more docs. --- fluster/cluster.py | 11 ++--------- fluster/utils.py | 13 +++++++------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 57e7653..a05e6b9 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -41,8 +41,6 @@ def __init__(self, self.active_clients = self._prep_clients(clients) self.initial_clients = {c.pool_id: c for c in clients} self._sort_clients() - # maintain separate cycle trackers for each requester - #self._requester_cycles = {} def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" @@ -134,17 +132,12 @@ def get_client(self, shard_key): return self.active_clients[pos] def get_active_client_cycle(self, rounds=1): - """Yield clients, maintaining separate cycles for each requester. + """Create an ActiveClientCycle using this cluster. - Will not generate the same client more than `cycles` times, - per use of the function. Also handles down nodes. - - :param requester: Hashable key for each requesting object. - :param cycles: Max times to return each client per call. + :param rounds: max number of times to see each client per call to __iter__ """ return ActiveClientCycle(self, rounds=rounds) - def _penalize_client(self, client): """Place client in the penalty box. diff --git a/fluster/utils.py b/fluster/utils.py index 9234628..03ec36c 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -6,11 +6,11 @@ class ActiveClientCycle(object): """Tracks last returned client, will not iterate more than `rounds` times. - Useful when you need to evenly cycle through active connections to check - each once for work. + Useful when you need to evenly cycle through active connections, skipping + dead ones. Each user of the class should instantiate a separate object to correctly track - the last requested client for that requester. + the last requested client for that user. """ def __init__(self, cluster, rounds=1): self.cluster = cluster @@ -26,16 +26,17 @@ def __iter__(self): """Restarts the `rounds` tracker, and updates active clients.""" self.round_start = None self.rounds_completed = 0 - - self.cluster._prune_penalty_box() # XX TODO make public + self.cluster._prune_penalty_box() return self def __next__(self): """Always returns a client, or raises an Exception if none are available.""" + # raise Exception if no clients are available if len(self.cluster.active_clients) == 0: raise ClusterEmptyError('All clients are down.') + # always return something nxt = None while nxt is None: nxt = self._next_helper() @@ -57,6 +58,6 @@ def _next_helper(self): if self.rounds_completed >= self.round_limit: raise StopIteration - # check active clients + # only return active connections if curr in self.cluster.active_clients: return curr From d8dadce111eb925931fc24c286b1e8b3da49d180 Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 1 Oct 2018 16:00:34 -0600 Subject: [PATCH 12/24] Py2 is not dead yet. --- fluster/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fluster/utils.py b/fluster/utils.py index 03ec36c..51b8205 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -43,6 +43,9 @@ def __next__(self): return nxt + def next(self): + return self.__next__() + def _next_helper(self): """Returns an active connection, unless this iterable has already cycled through too many times. From 874444a5fe2240e55b3e4914caff3346ec49479f Mon Sep 17 00:00:00 2001 From: aldraco Date: Mon, 1 Oct 2018 16:06:59 -0600 Subject: [PATCH 13/24] Bit more cleanup, delete unused attrs. --- fluster/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fluster/utils.py b/fluster/utils.py index 51b8205..1946ecf 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -17,15 +17,15 @@ def __init__(self, cluster, rounds=1): self.round_limit = rounds self.clients = cycle(cluster.initial_clients.values()) - # initialize pointers and trackers - self.current_client = None + self._init_round_trackers() + + def _init_round_trackers(self): self.round_start = None self.rounds_completed = 0 def __iter__(self): """Restarts the `rounds` tracker, and updates active clients.""" - self.round_start = None - self.rounds_completed = 0 + self._init_round_trackers() self.cluster._prune_penalty_box() return self From d99a9b9a3c203dc9cf13d4e35dbdef9eec838031 Mon Sep 17 00:00:00 2001 From: aldraco Date: Tue, 20 Nov 2018 16:25:21 -0700 Subject: [PATCH 14/24] Remove concept of round counting from cycle. --- fluster/cluster.py | 6 ++---- fluster/utils.py | 19 +------------------ tests/fluster/test_utils.py | 36 ++++++++++++++++++++++-------------- 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index a05e6b9..65b72fe 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -131,12 +131,10 @@ def get_client(self, shard_key): pos = hashed % len(self.active_clients) return self.active_clients[pos] - def get_active_client_cycle(self, rounds=1): + def get_active_client_cycle(self): """Create an ActiveClientCycle using this cluster. - - :param rounds: max number of times to see each client per call to __iter__ """ - return ActiveClientCycle(self, rounds=rounds) + return ActiveClientCycle(self) def _penalize_client(self, client): """Place client in the penalty box. diff --git a/fluster/utils.py b/fluster/utils.py index 1946ecf..e524b74 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -12,20 +12,12 @@ class ActiveClientCycle(object): Each user of the class should instantiate a separate object to correctly track the last requested client for that user. """ - def __init__(self, cluster, rounds=1): + def __init__(self, cluster): self.cluster = cluster - self.round_limit = rounds self.clients = cycle(cluster.initial_clients.values()) - self._init_round_trackers() - - def _init_round_trackers(self): - self.round_start = None - self.rounds_completed = 0 - def __iter__(self): """Restarts the `rounds` tracker, and updates active clients.""" - self._init_round_trackers() self.cluster._prune_penalty_box() return self @@ -52,15 +44,6 @@ def _next_helper(self): """ curr = next(self.clients) - # manage round counting - if not self.round_start: - self.round_start = curr - elif self.round_start == curr: - self.rounds_completed += 1 - - if self.rounds_completed >= self.round_limit: - raise StopIteration - # only return active connections if curr in self.cluster.active_clients: return curr diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index c74b293..563f660 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -38,29 +38,30 @@ def tearDown(self): delattr(instance.conn, 'pool_id') def test_cycle_clients(self): - # should cycle through clients the given number of times - desired_rounds = 2 + # should cycle through clients indefinately returned_clients = set() - active_clients_cycle = self.cluster.get_active_client_cycle(rounds=desired_rounds) + limit = 15 + active_clients_cycle = self.cluster.get_active_client_cycle() assert True for idx, client in enumerate(active_clients_cycle): returned_clients.update([client]) assert client is not None + if idx >= limit: + break - assert (idx + 1) == (desired_rounds * len(self.cluster.active_clients)) + assert idx == 15 assert len(returned_clients) == len(self.cluster.active_clients) def test_cycle_clients_with_failures(self): # should not include inactive nodes self.instances[0].terminate() - - desired_rounds = 2 + limit = 6 # normally two rounds, but with 2 nodes, 3 rounds counter = 0 - active_clients_cycle = self.cluster.get_active_client_cycle(rounds=desired_rounds) + active_clients_cycle = self.cluster.get_active_client_cycle() - for client in active_clients_cycle: + for idx, client in enumerate(active_clients_cycle): assert client is not None try: client.incr('key', 1) @@ -68,29 +69,33 @@ def test_cycle_clients_with_failures(self): except Exception as e: print("oops", client, e) continue # exception handled by the cluster + if idx >= limit: + break # Restart instance self.instances[0] = RedisInstance(10101) time.sleep(0.5) - assert counter == 4 # 2 rounds, 2 working clients each round + assert counter == 6 # able to continue even when node is down assert 2 == len(self.cluster.active_clients) assert 2 == len(self.cluster.initial_clients.values()) - 1 # should add restarted nodes back to the list after reported failure # calling __iter__ again checks the penalty box counter = 0 - for client in active_clients_cycle: + for idx, client in enumerate(active_clients_cycle): + if idx >= limit: + break client.incr('key', 1) counter += 1 - assert counter == len(self.cluster.active_clients) * desired_rounds + assert counter == limit assert len(self.cluster.active_clients) == 3 # to verify it added the node back def test_cycle_clients_tracking(self): # should track separate cycle entry points for each instance - active_cycle_1 = self.cluster.get_active_client_cycle(rounds=1) - active_cycle_2 = self.cluster.get_active_client_cycle(rounds=1) + active_cycle_1 = self.cluster.get_active_client_cycle() + active_cycle_2 = self.cluster.get_active_client_cycle() # advance cycle 1 next(active_cycle_1) @@ -101,7 +106,8 @@ def test_cycle_clients_tracking(self): def test_dropped_connections_while_iterating(self): # dropped connections in the middle of an iteration should not cause an infinite loop # and should raise an exception - active_cycle = self.cluster.get_active_client_cycle(rounds=7) + limit = 21 + active_cycle = self.cluster.get_active_client_cycle() assert len(self.cluster.active_clients) == 3 @@ -109,6 +115,8 @@ def test_dropped_connections_while_iterating(self): killed = 0 with self.assertRaises(ClusterEmptyError) as context: for idx, client in enumerate(active_cycle): + if idx >= limit: + break # in case the test fails to stop if idx in drop_at_idx: self.instances[killed].terminate() killed += 1 From e235a5f1035336fd535fa1de6d771a88d5ff65bf Mon Sep 17 00:00:00 2001 From: aldraco Date: Tue, 20 Nov 2018 16:47:17 -0700 Subject: [PATCH 15/24] Add round controller function and tests. --- fluster/__init__.py | 1 + fluster/utils.py | 17 +++++++++++++++++ tests/fluster/test_utils.py | 24 ++++++++++++++++++++++-- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/fluster/__init__.py b/fluster/__init__.py index 0320303..dd17984 100644 --- a/fluster/__init__.py +++ b/fluster/__init__.py @@ -1,5 +1,6 @@ __version__ = '0.0.5' +from .utils import round_controlled from .cluster import FlusterCluster from .exceptions import ClusterEmptyError diff --git a/fluster/utils.py b/fluster/utils.py index e524b74..c3204a6 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -3,6 +3,23 @@ from .exceptions import ClusterEmptyError +def round_controlled(iterable, rounds=1): + """Raise StopIteration after passes through iterable.""" + round_start = None + rounds_completed = 0 + + for item in iterable: + if not round_start: + round_start = item + elif item == round_start: + rounds_completed += 1 + + if rounds_completed == rounds: + raise StopIteration + + yield item + + class ActiveClientCycle(object): """Tracks last returned client, will not iterate more than `rounds` times. diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index 563f660..12bf470 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -5,7 +5,7 @@ from testinstances import RedisInstance -from fluster import FlusterCluster, ClusterEmptyError +from fluster import FlusterCluster, ClusterEmptyError, round_controlled class FlusterClusterTests(unittest.TestCase): @@ -57,7 +57,7 @@ def test_cycle_clients(self): def test_cycle_clients_with_failures(self): # should not include inactive nodes self.instances[0].terminate() - limit = 6 # normally two rounds, but with 2 nodes, 3 rounds + limit = 6 counter = 0 active_clients_cycle = self.cluster.get_active_client_cycle() @@ -132,3 +132,23 @@ def test_dropped_connections_while_iterating(self): # restart all the instances for instance, port in enumerate(range(10101, 10104)): self.instances[instance] = RedisInstance(port) + + def test_round_controller(self): + # the round controller should track rounds and limit iterations + repeated_sublist = list(range(0, 3)) + lis = repeated_sublist * 5 + desired_rounds = 4 # don't iterate through the whole list + for idx, item in enumerate(round_controlled(lis, rounds=desired_rounds)): + pass + + assert idx == desired_rounds * len(repeated_sublist) + + # more specific application + active_cycle = self.cluster.get_active_client_cycle() + desired_rounds = 3 + + for idx, conn in enumerate(round_controlled(active_cycle, rounds=desired_rounds)): + pass + + # should raise stopiteration at appropriate time + assert idx == (desired_rounds * len(self.cluster.active_clients)) - 1 From 4b842aa01f22255aa8081538b24df303a3315152 Mon Sep 17 00:00:00 2001 From: aldraco Date: Tue, 20 Nov 2018 16:55:16 -0700 Subject: [PATCH 16/24] Docs and notes. --- fluster/utils.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluster/utils.py b/fluster/utils.py index c3204a6..5856f44 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -3,13 +3,13 @@ from .exceptions import ClusterEmptyError -def round_controlled(iterable, rounds=1): - """Raise StopIteration after passes through iterable.""" +def round_controlled(cycled_iterable, rounds=1): + """Raise StopIteration after passes through a cycled iterable.""" round_start = None rounds_completed = 0 - for item in iterable: - if not round_start: + for item in cycled_iterable: + if round_start is None: round_start = item elif item == round_start: rounds_completed += 1 @@ -21,7 +21,7 @@ def round_controlled(iterable, rounds=1): class ActiveClientCycle(object): - """Tracks last returned client, will not iterate more than `rounds` times. + """Only returns active clients. Useful when you need to evenly cycle through active connections, skipping dead ones. @@ -53,11 +53,11 @@ def __next__(self): return nxt def next(self): + """Python 2/3 compatibility.""" return self.__next__() def _next_helper(self): - """Returns an active connection, unless this iterable has already cycled - through too many times. + """Helper that only returns an active connection. """ curr = next(self.clients) From 1ea0b6765f2b61f29ba2d599b951746497e504cb Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 21 Nov 2018 09:20:44 -0700 Subject: [PATCH 17/24] Fix off by one error in tests. --- tests/fluster/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index 12bf470..a9c4ce4 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -141,7 +141,7 @@ def test_round_controller(self): for idx, item in enumerate(round_controlled(lis, rounds=desired_rounds)): pass - assert idx == desired_rounds * len(repeated_sublist) + assert idx == desired_rounds * len(repeated_sublist) - 1 # more specific application active_cycle = self.cluster.get_active_client_cycle() @@ -151,4 +151,4 @@ def test_round_controller(self): pass # should raise stopiteration at appropriate time - assert idx == (desired_rounds * len(self.cluster.active_clients)) - 1 + assert idx == (desired_rounds * len(self.cluster.active_clients) - 1) From 0667fb7876a2c5c0a1856a1b347854b9eb720b61 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 21 Nov 2018 09:45:48 -0700 Subject: [PATCH 18/24] Make cluster itself iterable, instead of managing cycles via a class. Update tests. --- fluster/cluster.py | 39 +++++++++++++++++++++++----- fluster/utils.py | 51 ------------------------------------- tests/fluster/test_utils.py | 35 +++++++++++++------------ 3 files changed, 52 insertions(+), 73 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 65b72fe..8dc8196 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -9,7 +9,6 @@ from .exceptions import ClusterEmptyError from .penalty_box import PenaltyBox -from .utils import ActiveClientCycle log = logging.getLogger(__name__) @@ -40,8 +39,41 @@ def __init__(self, multiplier=penalty_box_wait_multiplier) self.active_clients = self._prep_clients(clients) self.initial_clients = {c.pool_id: c for c in clients} + self.clients = cycle(self.initial_clients.values()) self._sort_clients() + def __iter__(self): + """Restarts the `rounds` tracker, and updates active clients.""" + self._prune_penalty_box() + + return self + + def __next__(self): + """Always returns a client, or raises an Exception if none are available.""" + # raise Exception if no clients are available + if len(self.active_clients) == 0: + raise ClusterEmptyError('All clients are down.') + + # always return something + nxt = None + while nxt is None: + nxt = self._next_helper() + + return nxt + + def next(self): + """Python 2/3 compatibility.""" + return self.__next__() + + def _next_helper(self): + """Helper that only returns an active connection. + """ + curr = next(self.clients) + + # only return active connections + if curr in self.active_clients: + return curr + def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" self.active_clients.sort(key=lambda c: c.pool_id) @@ -131,11 +163,6 @@ def get_client(self, shard_key): pos = hashed % len(self.active_clients) return self.active_clients[pos] - def get_active_client_cycle(self): - """Create an ActiveClientCycle using this cluster. - """ - return ActiveClientCycle(self) - def _penalize_client(self, client): """Place client in the penalty box. diff --git a/fluster/utils.py b/fluster/utils.py index 5856f44..a4a547e 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -1,8 +1,3 @@ -from itertools import cycle - -from .exceptions import ClusterEmptyError - - def round_controlled(cycled_iterable, rounds=1): """Raise StopIteration after passes through a cycled iterable.""" round_start = None @@ -18,49 +13,3 @@ def round_controlled(cycled_iterable, rounds=1): raise StopIteration yield item - - -class ActiveClientCycle(object): - """Only returns active clients. - - Useful when you need to evenly cycle through active connections, skipping - dead ones. - - Each user of the class should instantiate a separate object to correctly track - the last requested client for that user. - """ - def __init__(self, cluster): - self.cluster = cluster - self.clients = cycle(cluster.initial_clients.values()) - - def __iter__(self): - """Restarts the `rounds` tracker, and updates active clients.""" - self.cluster._prune_penalty_box() - - return self - - def __next__(self): - """Always returns a client, or raises an Exception if none are available.""" - # raise Exception if no clients are available - if len(self.cluster.active_clients) == 0: - raise ClusterEmptyError('All clients are down.') - - # always return something - nxt = None - while nxt is None: - nxt = self._next_helper() - - return nxt - - def next(self): - """Python 2/3 compatibility.""" - return self.__next__() - - def _next_helper(self): - """Helper that only returns an active connection. - """ - curr = next(self.clients) - - # only return active connections - if curr in self.cluster.active_clients: - return curr diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index a9c4ce4..bb7f469 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -4,12 +4,11 @@ import sys from testinstances import RedisInstance - from fluster import FlusterCluster, ClusterEmptyError, round_controlled +import redis class FlusterClusterTests(unittest.TestCase): - def assertCountEqual(self, a, b): if sys.version_info > (3, 0): super(FlusterClusterTests, self).assertCountEqual(a, b) @@ -41,11 +40,10 @@ def test_cycle_clients(self): # should cycle through clients indefinately returned_clients = set() limit = 15 - active_clients_cycle = self.cluster.get_active_client_cycle() assert True - for idx, client in enumerate(active_clients_cycle): + for idx, client in enumerate(self.cluster): returned_clients.update([client]) assert client is not None if idx >= limit: @@ -59,9 +57,8 @@ def test_cycle_clients_with_failures(self): self.instances[0].terminate() limit = 6 counter = 0 - active_clients_cycle = self.cluster.get_active_client_cycle() - for idx, client in enumerate(active_clients_cycle): + for idx, client in enumerate(self.cluster): assert client is not None try: client.incr('key', 1) @@ -83,7 +80,7 @@ def test_cycle_clients_with_failures(self): # should add restarted nodes back to the list after reported failure # calling __iter__ again checks the penalty box counter = 0 - for idx, client in enumerate(active_clients_cycle): + for idx, client in enumerate(self.cluster): if idx >= limit: break client.incr('key', 1) @@ -94,27 +91,34 @@ def test_cycle_clients_with_failures(self): def test_cycle_clients_tracking(self): # should track separate cycle entry points for each instance - active_cycle_1 = self.cluster.get_active_client_cycle() - active_cycle_2 = self.cluster.get_active_client_cycle() + cluster_instance_1 = self.cluster + # connect to already-running testinstances, instead of making more, + # to mimic two FlusterCluster instances + redis_clients = [redis.StrictRedis(port=conn.port) + for conn in self.instances] + cluster_instance_2 = FlusterCluster([i for i in redis_clients], + penalty_box_min_wait=0.5) - # advance cycle 1 - next(active_cycle_1) + # advance cluster instance one + next(cluster_instance_1) # should not start at the same point - assert next(active_cycle_1) != next(active_cycle_2) + assert next(cluster_instance_1) != next(cluster_instance_2) + + for temp_conn in redis_clients: + del temp_conn def test_dropped_connections_while_iterating(self): # dropped connections in the middle of an iteration should not cause an infinite loop # and should raise an exception limit = 21 - active_cycle = self.cluster.get_active_client_cycle() assert len(self.cluster.active_clients) == 3 drop_at_idx = (5, 6, 7) # at these points, kill a connection killed = 0 with self.assertRaises(ClusterEmptyError) as context: - for idx, client in enumerate(active_cycle): + for idx, client in enumerate(self.cluster): if idx >= limit: break # in case the test fails to stop if idx in drop_at_idx: @@ -144,10 +148,9 @@ def test_round_controller(self): assert idx == desired_rounds * len(repeated_sublist) - 1 # more specific application - active_cycle = self.cluster.get_active_client_cycle() desired_rounds = 3 - for idx, conn in enumerate(round_controlled(active_cycle, rounds=desired_rounds)): + for idx, conn in enumerate(round_controlled(self.cluster, rounds=desired_rounds)): pass # should raise stopiteration at appropriate time From 042300adae800521b2eba7d2f9e2198a4f3d5422 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 21 Nov 2018 10:21:24 -0700 Subject: [PATCH 19/24] Manage penalty box even for long running iterations. --- fluster/cluster.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index 8dc8196..d5e1e30 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -23,6 +23,9 @@ class FlusterCluster(object): Ideal cases for this are things like caches, where another copy of data isn't a huge problem (provided expiries are respected). + + The FlusterCluster instance can be iterated through, and only active + connections will be returned. """ @classmethod @@ -41,11 +44,13 @@ def __init__(self, self.initial_clients = {c.pool_id: c for c in clients} self.clients = cycle(self.initial_clients.values()) self._sort_clients() + # maintenance work is done each "tick" when iterating + self._tick_interval = 2 * len(clients) + self._ticks = 0 def __iter__(self): - """Restarts the `rounds` tracker, and updates active clients.""" + """Updates active clients each time it's iterated through.""" self._prune_penalty_box() - return self def __next__(self): @@ -59,8 +64,16 @@ def __next__(self): while nxt is None: nxt = self._next_helper() + self._ticks += 1 + if self._ticks == self._tick_interval: + self._tick() return nxt + def _tick(self): + """Called every self._tick_interval iterations to do maintenance work.""" + self._prune_penalty_box() + self._ticks = 0 + def next(self): """Python 2/3 compatibility.""" return self.__next__() From d039c109975cb9f89962089efc96d6527c935c65 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 21 Nov 2018 10:22:00 -0700 Subject: [PATCH 20/24] Add test for long running iterations, and move cluster tests to test_cluster to match implementation. --- tests/fluster/test_cluster.py | 126 +++++++++++++++++++++++++++++++++- tests/fluster/test_utils.py | 105 +--------------------------- 2 files changed, 126 insertions(+), 105 deletions(-) diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index 9b9cf34..7821be0 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -7,7 +7,8 @@ from redis.exceptions import ConnectionError from testinstances import RedisInstance -from fluster import FlusterCluster +from fluster import FlusterCluster, ClusterEmptyError +import redis class FlusterClusterTests(unittest.TestCase): @@ -96,6 +97,129 @@ def test_consistent_hashing(self): # Bring it back up self.instances[0] = RedisInstance(10101) + def test_cycle_clients(self): + # should cycle through clients indefinately + returned_clients = set() + limit = 15 + + assert True + + for idx, client in enumerate(self.cluster): + returned_clients.update([client]) + assert client is not None + if idx >= limit: + break + + assert idx == 15 + assert len(returned_clients) == len(self.cluster.active_clients) + + def test_cycle_clients_with_failures(self): + # should not include inactive nodes + self.instances[0].terminate() + limit = 6 + counter = 0 + + for idx, client in enumerate(self.cluster): + assert client is not None + try: + client.incr('key', 1) + counter += 1 + except Exception as e: + print("oops", client, e) + continue # exception handled by the cluster + if idx >= limit: + break + + # Restart instance + self.instances[0] = RedisInstance(10101) + time.sleep(0.5) + + assert counter == 6 # able to continue even when node is down + assert 2 == len(self.cluster.active_clients) + assert 2 == len(self.cluster.initial_clients.values()) - 1 + + # should add restarted nodes back to the list after reported failure + # calling __iter__ again checks the penalty box + counter = 0 + for idx, client in enumerate(self.cluster): + if idx >= limit: + break + client.incr('key', 1) + counter += 1 + + assert counter == limit + assert len(self.cluster.active_clients) == 3 # to verify it added the node back + + def test_long_running_iterations(self): + # long-running iterations should still add connections back to the cluster + drop_client = 3 + restart_client = 10 + client_available = restart_client + (2 * 3) + + for idx, client in enumerate(self.cluster): + # attempt to use each client + try: + client.incr('key', 1) + except Exception: + continue # exception handled by the cluster + # mimic connection dropping out and returning + if idx == drop_client: + self.instances[0].terminate() + elif idx == restart_client: + self.instances[0] = RedisInstance(10101) + # client should be visible after the next _tick + elif idx == client_available: + assert len(self.cluster.active_clients) == 3 + break + + def test_cycle_clients_tracking(self): + # should track separate cycle entry points for each instance + cluster_instance_1 = self.cluster + # connect to already-running testinstances, instead of making more, + # to mimic two FlusterCluster instances + redis_clients = [redis.StrictRedis(port=conn.port) + for conn in self.instances] + cluster_instance_2 = FlusterCluster([i for i in redis_clients], + penalty_box_min_wait=0.5) + + # advance cluster instance one + next(cluster_instance_1) + + # should not start at the same point + assert next(cluster_instance_1) != next(cluster_instance_2) + + for temp_conn in redis_clients: + del temp_conn + + def test_dropped_connections_while_iterating(self): + # dropped connections in the middle of an iteration should not cause an infinite loop + # and should raise an exception + limit = 21 + + assert len(self.cluster.active_clients) == 3 + + drop_at_idx = (5, 6, 7) # at these points, kill a connection + killed = 0 + with self.assertRaises(ClusterEmptyError) as context: + for idx, client in enumerate(self.cluster): + if idx >= limit: + break # in case the test fails to stop + if idx in drop_at_idx: + self.instances[killed].terminate() + killed += 1 + print('killed ', idx, killed) + try: + client.incr('key', 1) + except: + pass # mimic err handling + self.assertTrue('All clients are down.' in str(context.exception)) + + assert idx == 8 # the next iteration after the last client was killed + + # restart all the instances + for instance, port in enumerate(range(10101, 10104)): + self.instances[instance] = RedisInstance(port) + def test_zrevrange(self): """Add a sorted set, turn off the client, add to the set, turn the client back on, check results diff --git a/tests/fluster/test_utils.py b/tests/fluster/test_utils.py index bb7f469..4face1a 100644 --- a/tests/fluster/test_utils.py +++ b/tests/fluster/test_utils.py @@ -1,11 +1,9 @@ from __future__ import absolute_import, print_function -import time import unittest import sys from testinstances import RedisInstance -from fluster import FlusterCluster, ClusterEmptyError, round_controlled -import redis +from fluster import FlusterCluster, round_controlled class FlusterClusterTests(unittest.TestCase): @@ -36,107 +34,6 @@ def tearDown(self): if hasattr(instance.conn, 'pool_id'): delattr(instance.conn, 'pool_id') - def test_cycle_clients(self): - # should cycle through clients indefinately - returned_clients = set() - limit = 15 - - assert True - - for idx, client in enumerate(self.cluster): - returned_clients.update([client]) - assert client is not None - if idx >= limit: - break - - assert idx == 15 - assert len(returned_clients) == len(self.cluster.active_clients) - - def test_cycle_clients_with_failures(self): - # should not include inactive nodes - self.instances[0].terminate() - limit = 6 - counter = 0 - - for idx, client in enumerate(self.cluster): - assert client is not None - try: - client.incr('key', 1) - counter += 1 - except Exception as e: - print("oops", client, e) - continue # exception handled by the cluster - if idx >= limit: - break - - # Restart instance - self.instances[0] = RedisInstance(10101) - time.sleep(0.5) - - assert counter == 6 # able to continue even when node is down - assert 2 == len(self.cluster.active_clients) - assert 2 == len(self.cluster.initial_clients.values()) - 1 - - # should add restarted nodes back to the list after reported failure - # calling __iter__ again checks the penalty box - counter = 0 - for idx, client in enumerate(self.cluster): - if idx >= limit: - break - client.incr('key', 1) - counter += 1 - - assert counter == limit - assert len(self.cluster.active_clients) == 3 # to verify it added the node back - - def test_cycle_clients_tracking(self): - # should track separate cycle entry points for each instance - cluster_instance_1 = self.cluster - # connect to already-running testinstances, instead of making more, - # to mimic two FlusterCluster instances - redis_clients = [redis.StrictRedis(port=conn.port) - for conn in self.instances] - cluster_instance_2 = FlusterCluster([i for i in redis_clients], - penalty_box_min_wait=0.5) - - # advance cluster instance one - next(cluster_instance_1) - - # should not start at the same point - assert next(cluster_instance_1) != next(cluster_instance_2) - - for temp_conn in redis_clients: - del temp_conn - - def test_dropped_connections_while_iterating(self): - # dropped connections in the middle of an iteration should not cause an infinite loop - # and should raise an exception - limit = 21 - - assert len(self.cluster.active_clients) == 3 - - drop_at_idx = (5, 6, 7) # at these points, kill a connection - killed = 0 - with self.assertRaises(ClusterEmptyError) as context: - for idx, client in enumerate(self.cluster): - if idx >= limit: - break # in case the test fails to stop - if idx in drop_at_idx: - self.instances[killed].terminate() - killed += 1 - print('killed ', idx, killed) - try: - client.incr('key', 1) - except: - pass # mimic err handling - self.assertTrue('All clients are down.' in str(context.exception)) - - assert idx == 8 # the next iteration after the last client was killed - - # restart all the instances - for instance, port in enumerate(range(10101, 10104)): - self.instances[instance] = RedisInstance(port) - def test_round_controller(self): # the round controller should track rounds and limit iterations repeated_sublist = list(range(0, 3)) From 7f190fb0e81afa87447e00e63710420376c8a613 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 21 Nov 2018 10:33:21 -0700 Subject: [PATCH 21/24] Update test as 3.x.x interface to zadd has changed. --- tests/fluster/test_cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index 7821be0..e8aef00 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -227,7 +227,7 @@ def test_zrevrange(self): key = 'foo' for element, count in zip(self.keys, (1.0, 2.0, 3.0)): client = self.cluster.get_client(element) - client.zadd(key, count, element) + client.zadd(key, {element: count}) revrange = self.cluster.zrevrange_with_int_score(key, '+inf', 2) self.assertEqual(set([3, 2]), set(revrange.values())) @@ -238,12 +238,12 @@ def test_zrevrange(self): new_count = 5 client = self.cluster.get_client(dropped_element) try: - client.zadd(key, new_count, dropped_element) + client.zadd(key, {dropped_element: new_count}) raise Exception("Should not get here, client was terminated") except ConnectionError: client = self.cluster.get_client(dropped_element) print('replaced client', client) - client.zadd(key, new_count, dropped_element) + client.zadd(key, {dropped_element: new_count}) revrange = self.cluster.zrevrange_with_int_score(key, '+inf', 2) self.assertEqual(set([new_count, 2]), set(revrange.values())) @@ -255,6 +255,6 @@ def test_zrevrange(self): self.assertEqual(set([new_count, 2]), set(revrange.values())) #restarted instance is empty in this case client = self.cluster.get_client(dropped_element) - client.zadd(key, 3, dropped_element) #put original value back in + client.zadd(key, {dropped_element: 3}) #put original value back in revrange = self.cluster.zrevrange_with_int_score(key, '+inf', 2) self.assertEqual(set([new_count, 2]), set(revrange.values())) #max value found for duplicates is returned From 7e8e23cd43e4467fdd90a26f302918acd642701d Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 16 Jan 2019 16:33:11 -0700 Subject: [PATCH 22/24] Don't need the next helper. --- fluster/cluster.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index d5e1e30..ec1e044 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -59,15 +59,13 @@ def __next__(self): if len(self.active_clients) == 0: raise ClusterEmptyError('All clients are down.') - # always return something - nxt = None - while nxt is None: - nxt = self._next_helper() + # refresh connections if they're back up + self._prune_penalty_box() - self._ticks += 1 - if self._ticks == self._tick_interval: - self._tick() - return nxt + # return the first client that's active + for client in self.clients: + if client in self.active_clients: + return client def _tick(self): """Called every self._tick_interval iterations to do maintenance work.""" @@ -78,15 +76,6 @@ def next(self): """Python 2/3 compatibility.""" return self.__next__() - def _next_helper(self): - """Helper that only returns an active connection. - """ - curr = next(self.clients) - - # only return active connections - if curr in self.active_clients: - return curr - def _sort_clients(self): """Make sure clients are sorted consistently for consistent results.""" self.active_clients.sort(key=lambda c: c.pool_id) From eee3a7ce5601fb9363cb59191e49b173ae8689f7 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 16 Jan 2019 16:40:37 -0700 Subject: [PATCH 23/24] Remove concept of ticks() and update tests accordingly. --- fluster/cluster.py | 8 -------- tests/fluster/test_cluster.py | 4 ++-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/fluster/cluster.py b/fluster/cluster.py index ec1e044..59f6449 100644 --- a/fluster/cluster.py +++ b/fluster/cluster.py @@ -44,9 +44,6 @@ def __init__(self, self.initial_clients = {c.pool_id: c for c in clients} self.clients = cycle(self.initial_clients.values()) self._sort_clients() - # maintenance work is done each "tick" when iterating - self._tick_interval = 2 * len(clients) - self._ticks = 0 def __iter__(self): """Updates active clients each time it's iterated through.""" @@ -67,11 +64,6 @@ def __next__(self): if client in self.active_clients: return client - def _tick(self): - """Called every self._tick_interval iterations to do maintenance work.""" - self._prune_penalty_box() - self._ticks = 0 - def next(self): """Python 2/3 compatibility.""" return self.__next__() diff --git a/tests/fluster/test_cluster.py b/tests/fluster/test_cluster.py index e8aef00..5998e26 100644 --- a/tests/fluster/test_cluster.py +++ b/tests/fluster/test_cluster.py @@ -154,7 +154,7 @@ def test_long_running_iterations(self): # long-running iterations should still add connections back to the cluster drop_client = 3 restart_client = 10 - client_available = restart_client + (2 * 3) + client_available = restart_client + 1 for idx, client in enumerate(self.cluster): # attempt to use each client @@ -167,7 +167,7 @@ def test_long_running_iterations(self): self.instances[0].terminate() elif idx == restart_client: self.instances[0] = RedisInstance(10101) - # client should be visible after the next _tick + # client should be visible after calling next() again elif idx == client_available: assert len(self.cluster.active_clients) == 3 break From fd13c431026100486ddb55dc0a1dfc91e45addb0 Mon Sep 17 00:00:00 2001 From: aldraco Date: Wed, 16 Jan 2019 16:47:19 -0700 Subject: [PATCH 24/24] Bump minor version with new, backwards compatible functionality. --- fluster/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluster/__init__.py b/fluster/__init__.py index dd17984..2a0c816 100644 --- a/fluster/__init__.py +++ b/fluster/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.0.5' +__version__ = '0.1.0' from .utils import round_controlled from .cluster import FlusterCluster