Skip to content

Commit

Permalink
Add Elasticsearch storage module
Browse files Browse the repository at this point in the history
Co-authored-by: Jakub Schier <[email protected]>
Co-authored-by: Ales Teska <[email protected]>
  • Loading branch information
3 people authored Jun 3, 2020
1 parent 14ccad6 commit bb218e5
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
4 changes: 4 additions & 0 deletions asab/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ def __init__(self, app):
from .mongodb import StorageService
self.Service = StorageService(app, "asab.StorageService")

elif sttype == "elasticsearch":
from .elasticsearch import StorageService
self.Service = StorageService(app, "asab.StorageService")

else:
L.error("Unknown asab:storage type '{}'".format(sttype))
177 changes: 177 additions & 0 deletions asab/storage/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import aiohttp
import logging
import datetime
import json

import asab
from .service import StorageServiceABC
from .upsertor import UpsertorABC

#
L = logging.getLogger(__name__)
#

asab.Config.add_defaults(
{
'asab:storage': {
'elasticsearch_url': 'http://localhost:9200/',
'elasticsearch_username': '',
'elasticsearch_password': '',
}
}
)


class StorageService(StorageServiceABC):
"""
Depends on `aiohttp`.
"""

def __init__(self, app, service_name, config_section_name='asab:storage'):
super().__init__(app, service_name)
self.Loop = app.Loop

self.ESURL = asab.Config.get(config_section_name, 'elasticsearch_url')
# self._timeout = asab.Config.get(config_section_name, 'elasticsearch_timeout')

username = asab.Config.get(config_section_name, 'elasticsearch_username')
password = asab.Config.get(config_section_name, 'elasticsearch_password')

if username == '':
self._auth = None
else:
self._auth = aiohttp.BasicAuth(login=username, password=password)

self._ClientSession = None

async def finalize(self, app):
if self._ClientSession is not None and not self._ClientSession.closed:
await self._ClientSession.close()
self._ClientSession = None

def session(self):
if self._ClientSession is None:
self._ClientSession = aiohttp.ClientSession(auth=self._auth, loop=self.Loop)
elif self._ClientSession.closed:
self._ClientSession = aiohttp.ClientSession(auth=self._auth, loop=self.Loop)
return self._ClientSession

async def delete(self, index, _id=None):
if _id:
url = "{}{}/_doc/{}".format(self.ESURL, index, _id)
else:
url = "{}{}".format(self.ESURL, index)
async with self.session().request(method="DELETE", url=url) as resp:
assert resp.status == 200, "Unexpected response code: {}".format(resp.status)
resp = await resp.json()
if resp.get("acknowledged", False):
return resp
assert resp["result"] == "deleted", "Document was not deleted"
return resp

async def get_by(self, collection: str, key: str, value):
raise NotImplementedError("get_by")

async def get(self, index: str, obj_id) -> dict:
url = "{}{}/_doc/{}".format(self.ESURL, index, obj_id)
async with self.session().request(method="GET", url=url) as resp:
obj = await resp.json()

# Manipulate the output so that it is a requested object
ret = obj['_source']
ret['_v'] = obj['_version']
ret['_id'] = obj['_id']

return ret

def upsertor(self, index: str, obj_id=None, version: int = 0):
return ElasicSearchUpsertor(self, index, obj_id, version)

async def list(self, index, size=10000):
'''
Custom ElasticSearch method
'''
url = "{}{}/_search?size={}".format(self.ESURL, index, size)
async with self.session().request(method="GET", url=url) as resp:
assert resp.status == 200, "Unexpected response code: {}".format(resp.status)
return await resp.json()

async def indices(self, search_string=None):
'''
Custom ElasticSearch method
'''
url = "{}_cat/indices/{}?format=json".format(self.ESURL, search_string)
async with self.session().request(method="GET", url=url) as resp:
assert resp.status == 200, "Unexpected response code: {}".format(resp.status)
return await resp.json()

async def empty_index(self, index):
'''
Custom ElasticSearch method
'''
# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here
url = "{}{}".format(self.ESURL, index)
async with self.session().request(method="PUT", url=url) as resp:
assert resp.status == 200, "Unexpected response code: {}".format(resp.status)
return await resp.json()


class ElasicSearchUpsertor(UpsertorABC):

@classmethod
def generate_id(cls):
raise NotImplementedError("generate_id")

async def execute(self):
if self.ObjId is None:
return await self._insert_noobjid()
else:
return await self._upsert()

async def _insert_noobjid(self):
setobj = {}

if len(self.ModSet) > 0:
for k, v in self.ModSet.items():
setobj[k] = serialize(self.ModSet[k])

if len(self.ModInc) > 0:
# addobj['$inc'] = self.ModInc
# raise NotImplementedError("yet")
pass

if len(self.ModPush) > 0:
# addobj['$push'] = {k: {'$each': v} for k, v in self.ModPush.items()}
raise NotImplementedError("yet")

# This is insert of the new document, the ObjId is to be generated by the ElasicSearch
url = "{}{}/_doc".format(self.Storage.ESURL, self.Collection)
async with self.Storage.session().request(method="POST", url=url, json=setobj) as resp:
assert resp.status == 201, "Unexpected response code: {}".format(resp.status)
resp_json = await resp.json()

self.ObjId = resp_json['_id']

return self.ObjId

async def _upsert(self):
upsertobj = {"doc": {}, "doc_as_upsert": True}

if len(self.ModSet) > 0:
for k, v in self.ModSet.items():
upsertobj["doc"][k] = serialize(self.ModSet[k])
url = "{}{}/_update/{}".format(self.Storage.ESURL, self.Collection, self.ObjId)
async with self.Storage.session().request(method="POST", url=url, data=json.dumps(upsertobj), headers={'Content-Type': 'application/json'}) as resp:
assert resp.status == 200 or resp.status == 201, "Unexpected response code: {}".format(resp.status)
resp_json = await resp.json()
assert resp_json["result"] == "updated" or resp_json[
"result"] == "created", "Creating/updating was unsuccessful"

return self.ObjId


def serialize(v):
if isinstance(v, datetime.datetime):
return v.timestamp()
else:
return v
66 changes: 66 additions & 0 deletions examples/elasticsearch_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pprint

import asab
import asab.storage


asab.Config.add_defaults(
{
'asab:storage': {
'type': 'elasticsearch',
'elasticsearch_url': 'http://10.17.174.124:9200/',
}
}
)


class MyApplication(asab.Application):

async def initialize(self):

# Loading the web service module
self.add_module(asab.storage.Module)


async def main(self):
storage = self.get_service("asab.StorageService")

# Obtain upsertor object which is associated with given "test-collection"
# To create new object we keep default `version` to zero
print("Creating default id and version")
u = storage.upsertor("test-collection")
u.set("foo", "bar")
objid = await u.execute()

obj = await storage.get("test-collection", objid)
# Obtain upsertor object for update - specify existing `version` number
print("Specify version when updating")
u = storage.upsertor("test-collection", obj_id=objid, version=obj['_v'])
u.set("foo", "buzz")
objid = await u.execute()

obj = await storage.get("test-collection", objid)
print("Result of get by id '{}'".format(objid))
pprint.pprint(obj)

await storage.delete("test-collection", objid)


# Insert the document with provided ObjId
print("Insert the document with provided ObjId")
u = storage.upsertor("test-collection", "test")
u.set("foo", "bar")
objid = await u.execute()

obj = await storage.get("test-collection", objid)
print("Result of get by id '{}'".format(objid))
pprint.pprint(obj)
print("Delete the document with provided ObjId")
await storage.delete("test-collection", objid)

self.stop()


if __name__ == '__main__':
app = MyApplication()
app.run()

0 comments on commit bb218e5

Please sign in to comment.