Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Commit

Permalink
Notifications are now asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed Feb 11, 2016
1 parent df939be commit 06f88ea
Show file tree
Hide file tree
Showing 25 changed files with 389 additions and 107 deletions.
3 changes: 2 additions & 1 deletion cliquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
'cliquet.initialization.setup_authentication',
'cliquet.initialization.setup_backoff',
'cliquet.initialization.setup_statsd',
'cliquet.initialization.setup_listeners'
'cliquet.initialization.setup_listeners',
'cliquet.initialization.setup_workers'
),
'event_listeners': '',
'logging_renderer': 'cliquet.logs.ClassicLogRenderer',
Expand Down
55 changes: 30 additions & 25 deletions cliquet/errors.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
import six
from pyramid import httpexceptions
from enum import Enum

from cliquet.logs import logger
from cliquet.utils import Enum, json, reapply_cors, encode_header


ERRORS = Enum(
MISSING_AUTH_TOKEN=104,
INVALID_AUTH_TOKEN=105,
BADJSON=106,
INVALID_PARAMETERS=107,
MISSING_PARAMETERS=108,
INVALID_POSTED_DATA=109,
INVALID_RESOURCE_ID=110,
MISSING_RESOURCE=111,
MISSING_CONTENT_LENGTH=112,
REQUEST_TOO_LARGE=113,
MODIFIED_MEANWHILE=114,
METHOD_NOT_ALLOWED=115,
VERSION_NOT_AVAILABLE=116,
CLIENT_REACHED_CAPACITY=117,
FORBIDDEN=121,
CONSTRAINT_VIOLATED=122,
UNDEFINED=999,
BACKEND=201,
SERVICE_DEPRECATED=202
)
from cliquet.utils import json, reapply_cors, encode_header


class ERRORS(Enum):
MISSING_AUTH_TOKEN = 104
INVALID_AUTH_TOKEN = 105
BADJSON = 106
INVALID_PARAMETERS = 107
MISSING_PARAMETERS = 108
INVALID_POSTED_DATA = 109
INVALID_RESOURCE_ID = 110
MISSING_RESOURCE = 111
MISSING_CONTENT_LENGTH = 112
REQUEST_TOO_LARGE = 113
MODIFIED_MEANWHILE = 114
METHOD_NOT_ALLOWED = 115
VERSION_NOT_AVAILABLE = 116
CLIENT_REACHED_CAPACITY = 117
FORBIDDEN = 121
CONSTRAINT_VIOLATED = 122
UNDEFINED = 999
BACKEND = 201
SERVICE_DEPRECATED = 202


"""Predefined errors as specified by the protocol.
+-------------+-------+------------------------------------------------+
Expand Down Expand Up @@ -88,6 +90,9 @@ def http_error(httpexception, errno=None,
"""
errno = errno or ERRORS.UNDEFINED

if isinstance(errno, Enum):
errno = errno.value

# Track error number for request summary
logger.bind(errno=errno)

Expand Down Expand Up @@ -144,7 +149,7 @@ def json_error_handler(errors):
message = '%(location)s: %(description)s' % error

response = http_error(httpexceptions.HTTPBadRequest(),
errno=ERRORS.INVALID_PARAMETERS,
errno=ERRORS.INVALID_PARAMETERS.value,
error='Invalid parameters',
message=message,
details=errors)
Expand Down
19 changes: 13 additions & 6 deletions cliquet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@

import transaction
from pyramid.events import NewRequest
from enum import Enum

from cliquet.logs import logger
from cliquet.utils import strip_uri_prefix, Enum
from cliquet.utils import strip_uri_prefix


ACTIONS = Enum(CREATE='create',
DELETE='delete',
READ='read',
UPDATE='update')
class ACTIONS(Enum):
CREATE = 'create'
DELETE = 'delete'
READ = 'read'
UPDATE = 'update'

@staticmethod
def from_string_list(elements):
return tuple(ACTIONS(el) for el in elements)


class _ResourceEvent(object):
Expand Down Expand Up @@ -115,7 +121,8 @@ def notify_resource_event(request, timestamp, data, action, old=None):
resource_name = request.current_resource_name

# Add to impacted records or create new event.
group_by = resource_name + action
group_by = resource_name + action.value

if group_by in events:
if action == ACTIONS.READ:
events[group_by].read_records.extend(impacted)
Expand Down
16 changes: 14 additions & 2 deletions cliquet/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cliquet import permission
from cliquet.logs import logger
from cliquet.events import ResourceRead, ResourceChanged, ACTIONS
from cliquet.workers import get_memory_workers

from pyramid.events import NewRequest, NewResponse
from pyramid.exceptions import ConfigurationError
Expand Down Expand Up @@ -397,7 +398,7 @@ def on_new_response(event):

class EventActionFilter(object):
def __init__(self, actions, config):
self.actions = actions
self.actions = [action.value for action in actions]

def phash(self):
return 'for_actions = %s' % (','.join(self.actions))
Expand Down Expand Up @@ -446,7 +447,12 @@ def setup_listeners(config):
key = 'listeners.%s' % name
listener = statsd_client.timer(key)(listener.__call__)

actions = aslist(settings.get(prefix + 'actions', '')) or write_actions
actions = aslist(settings.get(prefix + 'actions', ''))
if len(actions) > 0:
actions = ACTIONS.from_string_list(actions)
else:
actions = write_actions

resource_names = aslist(settings.get(prefix + 'resources', ''))
options = dict(for_actions=actions, for_resources=resource_names)

Expand All @@ -458,6 +464,12 @@ def setup_listeners(config):
config.add_subscriber(listener, ResourceChanged, **options)


def setup_workers(config):
settings = config.get_settings()
num_workers = int(settings.get('background.processes', 1))
config.registry.workers = get_memory_workers(num_workers)


def load_default_settings(config, default_settings):
"""Read settings provided in Paste ini file, set default values and
replace if defined as environment variable.
Expand Down
20 changes: 17 additions & 3 deletions cliquet/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from pyramid.threadlocal import get_current_registry


class ListenerBase(object):
def __init__(self, *args, **kwargs):
def _done(self, name, res_id, success, result):
pass

def __call__(self, event):
def _async_run(self, event):
workers = get_current_registry().workers
workers.apply_async('event', self._run, (event,), self._done)

def _run(self, event):
raise NotImplementedError()

def __call__(self, event, async=True):
"""
:param event: Incoming event
:param async: Run asynchronously, default: True
"""
raise NotImplementedError()
if async:
return self._async_run(event)
else:
# not used yet
return self._run(event) # pragma: no cover
2 changes: 1 addition & 1 deletion cliquet/listeners/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, client, listname, *args, **kwargs):
self._client = client
self.listname = listname

def __call__(self, event):
def _run(self, event): # pragma: no cover
try:
payload = json.dumps(event.payload)
except TypeError:
Expand Down
25 changes: 15 additions & 10 deletions cliquet/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ def put(self):
existing = self._get_record_or_404(self.record_id)
except HTTPNotFound:
# Look if this record used to exist (for preconditions check).
filter_by_id = Filter(id_field, self.record_id, COMPARISON.EQ)
filter_by_id = Filter(id_field, self.record_id,
COMPARISON.EQ.value)
tombstones, _ = self.model.get_records(filters=[filter_by_id],
include_deleted=True)
if len(tombstones) > 0:
Expand Down Expand Up @@ -882,15 +883,15 @@ def _extract_filters(self, queryparams=None):
raise_invalid(self.request, **error_details)

if param == '_since':
operator = COMPARISON.GT
operator = COMPARISON.GT.value
else:
if param == '_to':
message = ('_to is now deprecated, '
'you should use _before instead')
url = ('http://cliquet.rtfd.org/en/2.4.0/api/resource'
'.html#list-of-available-url-parameters')
send_alert(self.request, message, url)
operator = COMPARISON.LT
operator = COMPARISON.LT.value
filters.append(
Filter(self.model.modified_field, value, operator)
)
Expand All @@ -899,9 +900,9 @@ def _extract_filters(self, queryparams=None):
m = re.match(r'^(min|max|not|lt|gt|in|exclude)_(\w+)$', param)
if m:
keyword, field = m.groups()
operator = getattr(COMPARISON, keyword.upper())
operator = getattr(COMPARISON, keyword.upper()).value
else:
operator, field = COMPARISON.EQ, param
operator, field = COMPARISON.EQ.value, param

if not self.is_known_field(field):
error_details = {
Expand All @@ -911,7 +912,7 @@ def _extract_filters(self, queryparams=None):
raise_invalid(self.request, **error_details)

value = native_value(paramvalue)
if operator in (COMPARISON.IN, COMPARISON.EXCLUDE):
if operator in (COMPARISON.IN.value, COMPARISON.EXCLUDE.value):
value = set([native_value(v) for v in paramvalue.split(',')])

filters.append(Filter(field, value, operator))
Expand Down Expand Up @@ -957,14 +958,17 @@ def _build_pagination_rules(self, sorting, last_record, rules=None):
next_sorting = sorting[:-1]

for field, _ in next_sorting:
rule.append(Filter(field, last_record.get(field), COMPARISON.EQ))
rule.append(Filter(field, last_record.get(field),
COMPARISON.EQ.value))

field, direction = sorting[-1]

if direction == -1:
rule.append(Filter(field, last_record.get(field), COMPARISON.LT))
rule.append(Filter(field, last_record.get(field),
COMPARISON.LT.value))
else:
rule.append(Filter(field, last_record.get(field), COMPARISON.GT))
rule.append(Filter(field, last_record.get(field),
COMPARISON.GT.value))

rules.append(rule)

Expand Down Expand Up @@ -1077,7 +1081,8 @@ def _extract_filters(self, queryparams=None):

ids = self.context.shared_ids
if ids:
filter_by_id = Filter(self.model.id_field, ids, COMPARISON.IN)
filter_by_id = Filter(self.model.id_field, ids,
COMPARISON.IN.value)
filters.insert(0, filter_by_id)

return filters
Expand Down
23 changes: 12 additions & 11 deletions cliquet/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,23 @@ def apply_filters(self, records, filters):
"""Filter the specified records, using basic iteration.
"""
operators = {
COMPARISON.LT: operator.lt,
COMPARISON.MAX: operator.le,
COMPARISON.EQ: operator.eq,
COMPARISON.NOT: operator.ne,
COMPARISON.MIN: operator.ge,
COMPARISON.GT: operator.gt,
COMPARISON.IN: operator.contains,
COMPARISON.EXCLUDE: lambda x, y: not operator.contains(x, y),
COMPARISON.LT.value: operator.lt,
COMPARISON.MAX.value: operator.le,
COMPARISON.EQ.value: operator.eq,
COMPARISON.NOT.value: operator.ne,
COMPARISON.MIN.value: operator.ge,
COMPARISON.GT.value: operator.gt,
COMPARISON.IN.value: operator.contains,
COMPARISON.EXCLUDE.value: lambda x, y: not operator.contains(x, y),
}

for record in records:
matches = True
for f in filters:
left = record.get(f.field)
right = f.value
if f.operator in (COMPARISON.IN, COMPARISON.EXCLUDE):
if f.operator in (COMPARISON.IN.value,
COMPARISON.EXCLUDE.value):
right = left
left = f.value
matches = matches and operators[f.operator](left, right)
Expand Down Expand Up @@ -330,11 +331,11 @@ def get_unicity_rules(collection_id, parent_id, record, unique_fields,
if value is None:
continue

filters = [Filter(field, value, COMPARISON.EQ)]
filters = [Filter(field, value, COMPARISON.EQ.value)]

if not for_creation:
object_id = record[id_field]
exclude = Filter(id_field, object_id, COMPARISON.NOT)
exclude = Filter(id_field, object_id, COMPARISON.NOT.value)
filters.append(exclude)

rules.append(filters)
Expand Down
15 changes: 8 additions & 7 deletions cliquet/storage/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,10 @@ def _format_conditions(self, filters, id_field, modified_field,
:rtype: tuple
"""
operators = {
COMPARISON.EQ: '=',
COMPARISON.NOT: '<>',
COMPARISON.IN: 'IN',
COMPARISON.EXCLUDE: 'NOT IN',
COMPARISON.EQ.value: '=',
COMPARISON.NOT.value: '<>',
COMPARISON.IN.value: 'IN',
COMPARISON.EXCLUDE.value: 'NOT IN',
}

conditions = []
Expand All @@ -589,7 +589,8 @@ def _format_conditions(self, filters, id_field, modified_field,
# If field is missing, we default to ''.
sql_field = "coalesce(data->>:%s, '')" % field_holder

if filtr.operator not in (COMPARISON.IN, COMPARISON.EXCLUDE):
if filtr.operator not in (COMPARISON.IN.value,
COMPARISON.EXCLUDE.value):
# For the IN operator, let psycopg escape the values list.
# Otherwise JSON-ify the native value (e.g. True -> 'true')
if not isinstance(filtr.value, six.string_types):
Expand Down Expand Up @@ -703,7 +704,7 @@ def _check_unicity(self, conn, collection_id, parent_id, record,
if value is None:
continue
sql, holders = self._format_conditions(
[Filter(field, value, COMPARISON.EQ)],
[Filter(field, value, COMPARISON.EQ.value)],
id_field,
modified_field,
prefix=field)
Expand All @@ -720,7 +721,7 @@ def _check_unicity(self, conn, collection_id, parent_id, record,
if not for_creation:
object_id = record[id_field]
sql, holders = self._format_conditions(
[Filter(id_field, object_id, COMPARISON.NOT)],
[Filter(id_field, object_id, COMPARISON.NOT.value)],
id_field,
modified_field)
safeholders['condition_record'] = sql
Expand Down
6 changes: 3 additions & 3 deletions cliquet/tests/resource/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,19 +277,19 @@ def test_impacted_records_are_merged(self):
self.assertEqual(len(self.events), 3)

create_event = self.events[0]
self.assertEqual(create_event.payload['action'], 'create')
self.assertEqual(create_event.payload['action'], ACTIONS.CREATE)
self.assertEqual(len(create_event.impacted_records), 1)
self.assertNotIn('old', create_event.impacted_records[0])
update_event = self.events[1]
self.assertEqual(update_event.payload['action'], 'update')
self.assertEqual(update_event.payload['action'], ACTIONS.UPDATE)
impacted = update_event.impacted_records
self.assertEqual(len(impacted), 2)
self.assertEqual(impacted[0]['old']['name'], 'foo')
self.assertEqual(impacted[0]['new']['name'], 'bar')
self.assertEqual(impacted[1]['old']['name'], 'bar')
self.assertEqual(impacted[1]['new']['name'], 'baz')
delete_event = self.events[2]
self.assertEqual(delete_event.payload['action'], 'delete')
self.assertEqual(delete_event.payload['action'], ACTIONS.DELETE)
self.assertEqual(len(delete_event.impacted_records), 1)
self.assertNotIn('new', delete_event.impacted_records[0])

Expand Down
Loading

0 comments on commit 06f88ea

Please sign in to comment.