From 6cbfbc840b68d0d5c76bc32c163ce01855f1fc5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C5=99emysl=20=C4=8Cern=C3=BD?= Date: Tue, 16 Jun 2020 10:08:50 +0200 Subject: [PATCH] Introduce ZooKeeper containers. --- asab/application.py | 4 +- asab/zookeeper/__init__.py | 5 +- asab/zookeeper/container.py | 61 ++++++++++++++++++++++++ asab/zookeeper/service.py | 92 +++++++++++++++++++++---------------- 4 files changed, 119 insertions(+), 43 deletions(-) create mode 100644 asab/zookeeper/container.py diff --git a/asab/application.py b/asab/application.py index aadebd56b..f30deb965 100644 --- a/asab/application.py +++ b/asab/application.py @@ -336,7 +336,7 @@ def _hup(self): # Modules - def add_module(self, module_class, *args, **kwargs): + def add_module(self, module_class): """ Load a new module. """ for module in self.Modules: @@ -344,7 +344,7 @@ def add_module(self, module_class, *args, **kwargs): # Already loaded and registered return - module = module_class(self, *args, **kwargs) + module = module_class(self) self.Modules.append(module) asyncio.ensure_future(module.initialize(self), loop=self.Loop) diff --git a/asab/zookeeper/__init__.py b/asab/zookeeper/__init__.py index 27c951983..4a7800f6c 100644 --- a/asab/zookeeper/__init__.py +++ b/asab/zookeeper/__init__.py @@ -1,6 +1,7 @@ import logging from .service import ZooKeeperService +from .container import ZooKeeperContainer from ..abc.module import Module @@ -13,6 +14,6 @@ class Module(Module): - def __init__(self, app, config_section="zookeeper"): + def __init__(self, app): super().__init__(app) - self.Service = ZooKeeperService(app, "asab.ZooKeeperService", config_section) + self.Service = ZooKeeperService(app, "asab.ZooKeeperService") diff --git a/asab/zookeeper/container.py b/asab/zookeeper/container.py new file mode 100644 index 000000000..29e6ec8c9 --- /dev/null +++ b/asab/zookeeper/container.py @@ -0,0 +1,61 @@ +import aiozk +import asyncio +import json + +from ..config import ConfigObject + + +class ZooKeeperContainer(ConfigObject): + """ + ZooKeeperContainer connects to Zookeeper via aiozk client: + https://zookeeper.apache.org/ + https://pypi.org/project/aiozk/ + """ + + ConfigDefaults = { + "urls": "zookeeper:12181", + "path": "/asab", + } + + def __init__(self, app, config_section_name, config=None): + super().__init__(config_section_name=config_section_name, config=config) + self.App = app + self.ConfigSectionName = config_section_name + self.ZooKeeper = aiozk.ZKClient(self.Config["urls"]) + self.ZooKeeperPath = self.Config["path"] + + async def initialize(self, app): + await self.ZooKeeper.start() + await self.ZooKeeper.ensure_path(self.ZooKeeperPath) + + async def finalize(self, app): + await self.ZooKeeper.close() + + async def advertise(self, data, encoding="utf-8"): + if isinstance(data, dict): + data = json.dumps(data).encode(encoding) + elif isinstance(data, str): + data = data.encode(encoding) + elif asyncio.iscoroutinefunction(data): + data = await data + elif callable(data): + data = data() + + return await self.ZooKeeper.create( + "{}/i".format(self.ZooKeeperPath), + data=data, + sequential=True, + ephemeral=True + ) + + async def get_children(self): + return await self.ZooKeeper.get_children(self.ZooKeeperPath) + + async def get_data(self, child, encoding="utf-8"): + raw_data = await self.get_raw_data(child) + if raw_data is None: + return {} + return json.loads(raw_data.decode(encoding)) + + async def get_raw_data(self, child): + return await self.ZooKeeper.get_data("{}/{}".format(self.ZooKeeperPath, child)) diff --git a/asab/zookeeper/service.py b/asab/zookeeper/service.py index cc8021a7d..818c4f29d 100644 --- a/asab/zookeeper/service.py +++ b/asab/zookeeper/service.py @@ -1,10 +1,10 @@ -import aiozk import asyncio -import json from ..abc.service import Service from ..config import Config +from .container import ZooKeeperContainer + class ZooKeeperService(Service): """ @@ -14,49 +14,63 @@ class ZooKeeperService(Service): """ Config.add_defaults({ - "zookeeper": { - "urls": "zookeeper:12181", - "path": "/asab", + "zookeeper": { # create a default container, also ensures backward compatibility + "urls": "", # zookeeper:12181 + "path": "", # /asab } }) def __init__(self, app, service_name, config_section="zookeeper"): super().__init__(app, service_name) - self.ZooKeeper = aiozk.ZKClient(Config[config_section]["urls"]) - self.ZooKeeperPath = Config[config_section]["path"] + self.App = app + self.DefaultContainer = None + self.Containers = {} + self.Futures = [] async def initialize(self, app): - await self.ZooKeeper.start() - await self.ZooKeeper.ensure_path(self.ZooKeeperPath) + # Create a default container + # Default container ensures backward compatibility + urls = Config["zookeeper"]["urls"] + if len(urls) > 0: + self.DefaultContainer = ZooKeeperContainer(app, "zookeeper") + await self.DefaultContainer.initialize(app) async def finalize(self, app): - await self.ZooKeeper.close() - - async def advertise(self, data, encoding="utf-8"): - if isinstance(data, dict): - data = json.dumps(data).encode(encoding) - elif isinstance(data, str): - data = data.encode(encoding) - elif asyncio.iscoroutinefunction(data): - data = await data - elif callable(data): - data = data() - - return await self.ZooKeeper.create( - "{}/i".format(self.ZooKeeperPath), - data=data, - sequential=True, - ephemeral=True - ) - - async def get_children(self): - return await self.ZooKeeper.get_children(self.ZooKeeperPath) - - async def get_data(self, child, encoding="utf-8"): - raw_data = await self.get_raw_data(child) - if raw_data is None: - return {} - return json.loads(raw_data.decode(encoding)) - - async def get_raw_data(self, child): - return await self.ZooKeeper.get_data("{}/{}".format(self.ZooKeeperPath, child)) + if len(self.Futures) > 0: + await asyncio.wait(self.Futures) + if self.DefaultContainer is not None: + await self.DefaultContainer.finalize(app) + for containers in self.Containers.values(): + await containers.finalize(app) + + def register_container(self, container): + self.Containers[container.ConfigSectionName] = container + self.Futures.append(asyncio.ensure_future(container.initialize(self.App))) + + async def advertise(self, data, encoding="utf-8", container=None): + if container is None: + container = self.DefaultContainer + if container is None: + raise RuntimeError("The container must be specified.") + return await container.advertise(data, encoding) + + async def get_children(self, container=None): + if container is None: + container = self.DefaultContainer + if container is None: + raise RuntimeError("The container must be specified.") + return await container.get_children() + + async def get_data(self, child, encoding="utf-8", container=None): + if container is None: + container = self.DefaultContainer + if container is None: + raise RuntimeError("The container must be specified.") + return await container.get_data(child, encoding) + + async def get_raw_data(self, child, container=None): + if container is None: + container = self.DefaultContainer + if container is None: + raise RuntimeError("The container must be specified.") + return await container.get_raw_data(child)