diff --git a/asab/storage/__init__.py b/asab/storage/__init__.py index fe06ecd47..1c7082c7d 100644 --- a/asab/storage/__init__.py +++ b/asab/storage/__init__.py @@ -22,5 +22,9 @@ def __init__(self, app): from .mongodb import StorageService self.Service = StorageService(app, "asab.StorageService") + elif sttype == "elasticsearch": + from .elasticsearch import StorageService + self.Service = StorageService(app, "asab.StorageService") + else: L.error("Unknown asab:storage type '{}'".format(sttype)) diff --git a/asab/storage/elasticsearch.py b/asab/storage/elasticsearch.py new file mode 100644 index 000000000..c1c813c00 --- /dev/null +++ b/asab/storage/elasticsearch.py @@ -0,0 +1,177 @@ +import aiohttp +import logging +import datetime +import json + +import asab +from .service import StorageServiceABC +from .upsertor import UpsertorABC + +# +L = logging.getLogger(__name__) +# + +asab.Config.add_defaults( + { + 'asab:storage': { + 'elasticsearch_url': 'http://localhost:9200/', + 'elasticsearch_username': '', + 'elasticsearch_password': '', + } + } +) + + +class StorageService(StorageServiceABC): + """ + Depends on `aiohttp`. + """ + + def __init__(self, app, service_name, config_section_name='asab:storage'): + super().__init__(app, service_name) + self.Loop = app.Loop + + self.ESURL = asab.Config.get(config_section_name, 'elasticsearch_url') + # self._timeout = asab.Config.get(config_section_name, 'elasticsearch_timeout') + + username = asab.Config.get(config_section_name, 'elasticsearch_username') + password = asab.Config.get(config_section_name, 'elasticsearch_password') + + if username == '': + self._auth = None + else: + self._auth = aiohttp.BasicAuth(login=username, password=password) + + self._ClientSession = None + + async def finalize(self, app): + if self._ClientSession is not None and not self._ClientSession.closed: + await self._ClientSession.close() + self._ClientSession = None + + def session(self): + if self._ClientSession is None: + self._ClientSession = aiohttp.ClientSession(auth=self._auth, loop=self.Loop) + elif self._ClientSession.closed: + self._ClientSession = aiohttp.ClientSession(auth=self._auth, loop=self.Loop) + return self._ClientSession + + async def delete(self, index, _id=None): + if _id: + url = "{}{}/_doc/{}".format(self.ESURL, index, _id) + else: + url = "{}{}".format(self.ESURL, index) + async with self.session().request(method="DELETE", url=url) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + resp = await resp.json() + if resp.get("acknowledged", False): + return resp + assert resp["result"] == "deleted", "Document was not deleted" + return resp + + async def get_by(self, collection: str, key: str, value): + raise NotImplementedError("get_by") + + async def get(self, index: str, obj_id) -> dict: + url = "{}{}/_doc/{}".format(self.ESURL, index, obj_id) + async with self.session().request(method="GET", url=url) as resp: + obj = await resp.json() + + # Manipulate the output so that it is a requested object + ret = obj['_source'] + ret['_v'] = obj['_version'] + ret['_id'] = obj['_id'] + + return ret + + def upsertor(self, index: str, obj_id=None, version: int = 0): + return ElasicSearchUpsertor(self, index, obj_id, version) + + async def list(self, index, size=10000): + ''' + Custom ElasticSearch method + ''' + url = "{}{}/_search?size={}".format(self.ESURL, index, size) + async with self.session().request(method="GET", url=url) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + return await resp.json() + + async def indices(self, search_string=None): + ''' + Custom ElasticSearch method + ''' + url = "{}_cat/indices/{}?format=json".format(self.ESURL, search_string) + async with self.session().request(method="GET", url=url) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + return await resp.json() + + async def empty_index(self, index): + ''' + Custom ElasticSearch method + ''' + # TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here + url = "{}{}".format(self.ESURL, index) + async with self.session().request(method="PUT", url=url) as resp: + assert resp.status == 200, "Unexpected response code: {}".format(resp.status) + return await resp.json() + + +class ElasicSearchUpsertor(UpsertorABC): + + @classmethod + def generate_id(cls): + raise NotImplementedError("generate_id") + + async def execute(self): + if self.ObjId is None: + return await self._insert_noobjid() + else: + return await self._upsert() + + async def _insert_noobjid(self): + setobj = {} + + if len(self.ModSet) > 0: + for k, v in self.ModSet.items(): + setobj[k] = serialize(self.ModSet[k]) + + if len(self.ModInc) > 0: + # addobj['$inc'] = self.ModInc + # raise NotImplementedError("yet") + pass + + if len(self.ModPush) > 0: + # addobj['$push'] = {k: {'$each': v} for k, v in self.ModPush.items()} + raise NotImplementedError("yet") + + # This is insert of the new document, the ObjId is to be generated by the ElasicSearch + url = "{}{}/_doc".format(self.Storage.ESURL, self.Collection) + async with self.Storage.session().request(method="POST", url=url, json=setobj) as resp: + assert resp.status == 201, "Unexpected response code: {}".format(resp.status) + resp_json = await resp.json() + + self.ObjId = resp_json['_id'] + + return self.ObjId + + async def _upsert(self): + upsertobj = {"doc": {}, "doc_as_upsert": True} + + if len(self.ModSet) > 0: + for k, v in self.ModSet.items(): + upsertobj["doc"][k] = serialize(self.ModSet[k]) + url = "{}{}/_update/{}".format(self.Storage.ESURL, self.Collection, self.ObjId) + async with self.Storage.session().request(method="POST", url=url, data=json.dumps(upsertobj), headers={'Content-Type': 'application/json'}) as resp: + assert resp.status == 200 or resp.status == 201, "Unexpected response code: {}".format(resp.status) + resp_json = await resp.json() + assert resp_json["result"] == "updated" or resp_json[ + "result"] == "created", "Creating/updating was unsuccessful" + + return self.ObjId + + +def serialize(v): + if isinstance(v, datetime.datetime): + return v.timestamp() + else: + return v diff --git a/examples/elasticsearch_storage.py b/examples/elasticsearch_storage.py new file mode 100644 index 000000000..9f44c8b23 --- /dev/null +++ b/examples/elasticsearch_storage.py @@ -0,0 +1,66 @@ +import pprint + +import asab +import asab.storage + + +asab.Config.add_defaults( + { + 'asab:storage': { + 'type': 'elasticsearch', + 'elasticsearch_url': 'http://10.17.174.124:9200/', + } + } +) + + +class MyApplication(asab.Application): + + async def initialize(self): + + # Loading the web service module + self.add_module(asab.storage.Module) + + + async def main(self): + storage = self.get_service("asab.StorageService") + + # Obtain upsertor object which is associated with given "test-collection" + # To create new object we keep default `version` to zero + print("Creating default id and version") + u = storage.upsertor("test-collection") + u.set("foo", "bar") + objid = await u.execute() + + obj = await storage.get("test-collection", objid) + # Obtain upsertor object for update - specify existing `version` number + print("Specify version when updating") + u = storage.upsertor("test-collection", obj_id=objid, version=obj['_v']) + u.set("foo", "buzz") + objid = await u.execute() + + obj = await storage.get("test-collection", objid) + print("Result of get by id '{}'".format(objid)) + pprint.pprint(obj) + + await storage.delete("test-collection", objid) + + + # Insert the document with provided ObjId + print("Insert the document with provided ObjId") + u = storage.upsertor("test-collection", "test") + u.set("foo", "bar") + objid = await u.execute() + + obj = await storage.get("test-collection", objid) + print("Result of get by id '{}'".format(objid)) + pprint.pprint(obj) + print("Delete the document with provided ObjId") + await storage.delete("test-collection", objid) + + self.stop() + + +if __name__ == '__main__': + app = MyApplication() + app.run()