Skip to content

Commit

Permalink
Merge pull request #118 from TeskaLabs/zookeeper-configurable-section
Browse files Browse the repository at this point in the history
Introduce configurable config section to zookeeper service.
  • Loading branch information
PremyslCerny authored Jun 17, 2020
2 parents bb218e5 + 6cbfbc8 commit aad476d
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 40 deletions.
1 change: 1 addition & 0 deletions asab/zookeeper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from .service import ZooKeeperService
from .container import ZooKeeperContainer

from ..abc.module import Module

Expand Down
61 changes: 61 additions & 0 deletions asab/zookeeper/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import aiozk
import asyncio
import json

from ..config import ConfigObject


class ZooKeeperContainer(ConfigObject):
"""
ZooKeeperContainer connects to Zookeeper via aiozk client:
https://zookeeper.apache.org/
https://pypi.org/project/aiozk/
"""

ConfigDefaults = {
"urls": "zookeeper:12181",
"path": "/asab",
}

def __init__(self, app, config_section_name, config=None):
super().__init__(config_section_name=config_section_name, config=config)
self.App = app
self.ConfigSectionName = config_section_name
self.ZooKeeper = aiozk.ZKClient(self.Config["urls"])
self.ZooKeeperPath = self.Config["path"]

async def initialize(self, app):
await self.ZooKeeper.start()
await self.ZooKeeper.ensure_path(self.ZooKeeperPath)

async def finalize(self, app):
await self.ZooKeeper.close()

async def advertise(self, data, encoding="utf-8"):
if isinstance(data, dict):
data = json.dumps(data).encode(encoding)
elif isinstance(data, str):
data = data.encode(encoding)
elif asyncio.iscoroutinefunction(data):
data = await data
elif callable(data):
data = data()

return await self.ZooKeeper.create(
"{}/i".format(self.ZooKeeperPath),
data=data,
sequential=True,
ephemeral=True
)

async def get_children(self):
return await self.ZooKeeper.get_children(self.ZooKeeperPath)

async def get_data(self, child, encoding="utf-8"):
raw_data = await self.get_raw_data(child)
if raw_data is None:
return {}
return json.loads(raw_data.decode(encoding))

async def get_raw_data(self, child):
return await self.ZooKeeper.get_data("{}/{}".format(self.ZooKeeperPath, child))
94 changes: 54 additions & 40 deletions asab/zookeeper/service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import aiozk
import asyncio
import json

from ..abc.service import Service
from ..config import Config

from .container import ZooKeeperContainer


class ZooKeeperService(Service):
"""
Expand All @@ -14,49 +14,63 @@ class ZooKeeperService(Service):
"""

Config.add_defaults({
"zookeeper": {
"urls": "zookeeper:12181",
"path": "/asab",
"zookeeper": { # create a default container, also ensures backward compatibility
"urls": "", # zookeeper:12181
"path": "", # /asab
}
})

def __init__(self, app, service_name):
def __init__(self, app, service_name, config_section="zookeeper"):
super().__init__(app, service_name)
self.ZooKeeper = aiozk.ZKClient(Config["zookeeper"]["urls"])
self.ZooKeeperPath = Config["zookeeper"]["path"]
self.App = app
self.DefaultContainer = None
self.Containers = {}
self.Futures = []

async def initialize(self, app):
await self.ZooKeeper.start()
await self.ZooKeeper.ensure_path(self.ZooKeeperPath)
# Create a default container
# Default container ensures backward compatibility
urls = Config["zookeeper"]["urls"]
if len(urls) > 0:
self.DefaultContainer = ZooKeeperContainer(app, "zookeeper")
await self.DefaultContainer.initialize(app)

async def finalize(self, app):
await self.ZooKeeper.close()

async def advertise(self, data, encoding="utf-8"):
if isinstance(data, dict):
data = json.dumps(data).encode(encoding)
elif isinstance(data, str):
data = data.encode(encoding)
elif asyncio.iscoroutinefunction(data):
data = await data
elif callable(data):
data = data()

return await self.ZooKeeper.create(
"{}/i".format(self.ZooKeeperPath),
data=data,
sequential=True,
ephemeral=True
)

async def get_children(self):
return await self.ZooKeeper.get_children(self.ZooKeeperPath)

async def get_data(self, child, encoding="utf-8"):
raw_data = await self.get_raw_data(child)
if raw_data is None:
return {}
return json.loads(raw_data.decode(encoding))

async def get_raw_data(self, child):
return await self.ZooKeeper.get_data("{}/{}".format(self.ZooKeeperPath, child))
if len(self.Futures) > 0:
await asyncio.wait(self.Futures)
if self.DefaultContainer is not None:
await self.DefaultContainer.finalize(app)
for containers in self.Containers.values():
await containers.finalize(app)

def register_container(self, container):
self.Containers[container.ConfigSectionName] = container
self.Futures.append(asyncio.ensure_future(container.initialize(self.App)))

async def advertise(self, data, encoding="utf-8", container=None):
if container is None:
container = self.DefaultContainer
if container is None:
raise RuntimeError("The container must be specified.")
return await container.advertise(data, encoding)

async def get_children(self, container=None):
if container is None:
container = self.DefaultContainer
if container is None:
raise RuntimeError("The container must be specified.")
return await container.get_children()

async def get_data(self, child, encoding="utf-8", container=None):
if container is None:
container = self.DefaultContainer
if container is None:
raise RuntimeError("The container must be specified.")
return await container.get_data(child, encoding)

async def get_raw_data(self, child, container=None):
if container is None:
container = self.DefaultContainer
if container is None:
raise RuntimeError("The container must be specified.")
return await container.get_raw_data(child)

0 comments on commit aad476d

Please sign in to comment.