From 35e3d2a91328cdd1e96246d2a66f8c3e824cc0b0 Mon Sep 17 00:00:00 2001 From: marcel Date: Tue, 13 Nov 2012 13:02:37 -0200 Subject: [PATCH] Implementing aggregate and group commands #21 db.articles.aggregate(pipeline, callback=callback) db.articles.group(callback=callback, **group) --- mongotor/client.py | 98 +++++++++++++++++++++++++++++- mongotor/cursor.py | 17 ++---- mongotor/helpers.py | 18 ++++++ mongotor/orm/manager.py | 19 ------ tests/test_client.py | 131 +++++++++++++++++++++++++++++++++++++++- 5 files changed, 250 insertions(+), 33 deletions(-) diff --git a/mongotor/client.py b/mongotor/client.py index b22515f..38b173e 100644 --- a/mongotor/client.py +++ b/mongotor/client.py @@ -15,10 +15,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging +from bson.code import Code from tornado import gen from mongotor.node import ReadPreference from mongotor.cursor import Cursor from mongotor import message +from mongotor import helpers log = logging.getLogger(__name__) @@ -190,7 +192,7 @@ def find(self, *args, **kwargs): log.debug("mongo: db.{0}.find({spec}).limit({limit}).sort({sort})".format( self._collection_name, - spec=args[0] or {}, + spec=args[0] if args else {}, sort=kwargs.get('sort', {}), limit=kwargs.get('limit', '') )) @@ -201,3 +203,97 @@ def find(self, *args, **kwargs): cursor.find(callback=kwargs['callback']) else: return cursor + + def distinct(self, key, callback): + """Get a list of distinct values for `key` among all documents + in this collection. + + Raises :class:`TypeError` if `key` is not an instance of + :class:`basestring` (:class:`str` in python 3). + + To get the distinct values for a key in the result set of a + query use :meth:`~mongotor.cursor.Cursor.distinct`. + + :Parameters: + - `key`: name of key for which we want to get the distinct values + + """ + self.find().distinct(key, callback=callback) + + def count(self, callback): + """Get the size of the results among all documents. + + Returns the number of documents in the results set + """ + self.find().count(callback=callback) + + @gen.engine + def aggregate(self, pipeline, read_preference=None, callback=None): + """Perform an aggregation using the aggregation framework on this + collection. + + :Parameters: + - `pipeline`: a single command or list of aggregation commands + - `read_preference` + + .. note:: Requires server version **>= 2.1.0** + + .. _aggregate command: + http://docs.mongodb.org/manual/applications/aggregation + """ + if not isinstance(pipeline, (dict, list, tuple)): + raise TypeError("pipeline must be a dict, list or tuple") + + if isinstance(pipeline, dict): + pipeline = [pipeline] + + response, error = yield gen.Task(self._database.command, + "aggregate", self._collection, pipeline=pipeline, + read_preference=read_preference) + + callback(response) + + @gen.engine + def group(self, key, condition, initial, reduce, finalize=None, + read_preference=None, callback=None): + """Perform a query similar to an SQL *group by* operation. + + Returns an array of grouped items. + + The `key` parameter can be: + + - ``None`` to use the entire document as a key. + - A :class:`list` of keys (each a :class:`basestring` + (:class:`str` in python 3)) to group by. + - A :class:`basestring` (:class:`str` in python 3), or + :class:`~bson.code.Code` instance containing a JavaScript + function to be applied to each document, returning the key + to group by. + + :Parameters: + - `key`: fields to group by (see above description) + - `condition`: specification of rows to be + considered (as a :meth:`find` query specification) + - `initial`: initial value of the aggregation counter object + - `reduce`: aggregation function as a JavaScript string + - `finalize`: function to be called on each object in output list. + + """ + + group = {} + if isinstance(key, basestring): + group["$keyf"] = Code(key) + elif key is not None: + group = {"key": helpers._fields_list_to_dict(key)} + + group["ns"] = self._collection + group["$reduce"] = Code(reduce) + group["cond"] = condition + group["initial"] = initial + if finalize is not None: + group["finalize"] = Code(finalize) + + response, error = yield gen.Task(self._database.command, + "group", group, read_preference=read_preference) + + callback(response) diff --git a/mongotor/cursor.py b/mongotor/cursor.py index fdbffe9..a81327b 100644 --- a/mongotor/cursor.py +++ b/mongotor/cursor.py @@ -100,12 +100,10 @@ def count(self, callback): Returns the number of documents in the results set for this query. Does """ - command = SON({ - "count": self._collection, - }) - command.update({"query": self._spec}) + command = {"query": self._spec} - response, error = yield gen.Task(self._database.command, command) + response, error = yield gen.Task(self._database.command, + 'count', self._collection, **command) total = 0 if response and len(response) > 0 and 'n' in response: @@ -125,15 +123,12 @@ def distinct(self, key, callback): raise TypeError("key must be an instance " "of %s" % (basestring.__name__,)) - command = SON({ - "distinct": self._collection, - }) - - command.update({"key": key}) + command = {"key": key} if self._spec: command.update({"query": self._spec}) - response, error = yield gen.Task(self._database.command, command) + response, error = yield gen.Task(self._database.command, + 'distinct', self._collection, **command) callback(response['values']) diff --git a/mongotor/helpers.py b/mongotor/helpers.py index f7f5f4b..8be483e 100644 --- a/mongotor/helpers.py +++ b/mongotor/helpers.py @@ -51,3 +51,21 @@ def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False): result["data"] = bson.decode_all(response[20:], as_class, tz_aware) assert len(result["data"]) == result["number_returned"] return result + + +def _fields_list_to_dict(fields): + """Takes a list of field names and returns a matching dictionary. + + ["a", "b"] becomes {"a": 1, "b": 1} + + and + + ["a.b.c", "d", "a.c"] becomes {"a.b.c": 1, "d": 1, "a.c": 1} + """ + as_dict = {} + for field in fields: + if not isinstance(field, basestring): + raise TypeError("fields must be a list of key names, " + "each an instance of %s" % (basestring.__name__,)) + as_dict[field] = 1 + return as_dict diff --git a/mongotor/orm/manager.py b/mongotor/orm/manager.py index e2e4936..d5a4def 100644 --- a/mongotor/orm/manager.py +++ b/mongotor/orm/manager.py @@ -59,25 +59,6 @@ def distinct(self, key, callback, query=None): client = Client(Database(), self.collection.__collection__) client.find(query).distinct(key, callback=callback) - @gen.engine - def sum(self, query, field, callback): - command = { - "group": { - 'ns': self.collection.__collection__, - 'cond': query, - 'initial': {'csum': 0}, - '$reduce': 'function(obj,prev){prev.csum+=obj.' + field + ';}' - } - } - - result, error = yield gen.Task(Database().command, command) - total = 0 - - if result and result['retval']: - total = result['retval'][0]['csum'] - - callback(total) - @gen.engine def geo_near(self, near, max_distance=None, num=None, spherical=None, unique_docs=None, query=None, callback=None, **kw): diff --git a/tests/test_client.py b/tests/test_client.py index 036a8df..9b312a0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -3,6 +3,7 @@ from tornado import testing from mongotor.database import Database from bson import ObjectId +from datetime import datetime import sure @@ -155,7 +156,7 @@ def test_find_one_document_by_id(self): error.should.be.none def test_count_documents_in_find(self): - """[ClientTestCase] - counting documents""" + """[ClientTestCase] - counting documents in query""" db = Database.connect(["localhost:27027", "localhost:27028"], dbname='test') @@ -171,8 +172,25 @@ def test_count_documents_in_find(self): total.should.be.equal(2) + def test_count_all_documents(self): + """[ClientTestCase] - counting among all documents""" + db = Database.connect(["localhost:27027", "localhost:27028"], + dbname='test') + + documents = [{'_id': ObjectId(), 'param': 'shouldbeparam1'}, + {'_id': ObjectId(), 'param': 'shouldbeparam1'}, + {'_id': ObjectId(), 'param': 'shouldbeparam2'}] + + db.collection_test.insert(documents, callback=self.stop) + response, error = self.wait() + + db.collection_test.count(callback=self.stop) + total = self.wait() + + total.should.be.equal(3) + def test_distinct_documents_in_find(self): - """[ClientTestCase] - distinct documents""" + """[ClientTestCase] - distinct documents in query""" db = Database.connect(["localhost:27027", "localhost:27028"], dbname='test') @@ -188,3 +206,112 @@ def test_distinct_documents_in_find(self): distincts.should.have.length_of(1) distincts[0].should.be.equal(100) + + def test_distinct_all_documents(self): + """[ClientTestCase] - distinct among all documents""" + db = Database.connect(["localhost:27027", "localhost:27028"], + dbname='test') + + documents = [{'_id': ObjectId(), 'param': 'shouldbeparam1', 'uuid': 100}, + {'_id': ObjectId(), 'param': 'shouldbeparam1', 'uuid': 100}, + {'_id': ObjectId(), 'param': 'shouldbeparam2', 'uuid': 200}] + + db.collection_test.insert(documents, callback=self.stop) + response, error = self.wait() + + db.collection_test.distinct('uuid', callback=self.stop) + distincts = self.wait() + + distincts.should.have.length_of(2) + distincts[0].should.be.equal(100) + distincts[1].should.be.equal(200) + + def test_aggregate_collection(self): + """[ClientTestCase] - aggregate command""" + db = Database.connect(["localhost:27027", "localhost:27028"], + dbname='test') + + documents = [{ + "title": "this is my title", + "author": "bob", + "posted": datetime.now(), + "pageViews": 5, + "tags": ["good", "fun"], + }, { + "title": "this is my title", + "author": "joe", + "posted": datetime.now(), + "pageViews": 5, + "tags": ["good"], + }] + + db.articles.insert(documents, callback=self.stop) + response, error = self.wait() + + try: + pipeline = { + "$project": { + "author": 1, + "tags": 1, + } + }, { + "$unwind": "$tags" + }, { + "$group": { + "_id": {"tags": "$tags"}, + "authors": {"$addToSet": "$author"} + } + } + db.articles.aggregate(pipeline, callback=self.stop) + + response = self.wait() + + response['result'][0]['_id'].should.be.equal({u'tags': u'fun'}) + response['result'][0]['authors'].should.be.equal([u'bob']) + + response['result'][1]['_id'].should.be.equal({u'tags': u'good'}) + response['result'][1]['authors'].should.be.equal([u'joe', u'bob']) + finally: + db.articles.remove({}, callback=self.stop) + self.wait() + + def test_group(self): + """[ClientTestCase] - group command""" + db = Database.connect(["localhost:27027", "localhost:27028"], + dbname='test') + group = { + 'key': None, + 'condition': {'author': 'joe'}, + 'initial': {'csum': 0}, + 'reduce': 'function(obj,prev){prev.csum+=obj.pageViews;}' + } + + documents = [{ + "title": "this is my title", + "author": "bob", + "posted": datetime.now(), + "pageViews": 5, + "tags": ["good", "fun"], + }, { + "title": "this is my title", + "author": "joe", + "posted": datetime.now(), + "pageViews": 6, + "tags": ["good"], + }, { + "title": "this is my title", + "author": "joe", + "posted": datetime.now(), + "pageViews": 10, + "tags": ["good"], + }] + db.articles.insert(documents, callback=self.stop) + response, error = self.wait() + + try: + db.articles.group(callback=self.stop, **group) + result = self.wait() + result['retval'][0]['csum'].should.be.equal(16) + finally: + db.articles.remove({}, callback=self.stop) + self.wait()