From 1b67b92aa9f521c1ba913bb94e5841f4ae701239 Mon Sep 17 00:00:00 2001 From: Fabian Dill Date: Fri, 17 May 2024 12:21:01 +0200 Subject: [PATCH] WebHost: use a limited process pool to run Rooms (#3214) (cherry picked from commit 7900e4c9a4475d866c7df6c8d9d28bc92c842175) --- MultiServer.py | 62 +++++++-------- WebHostLib/__init__.py | 1 + WebHostLib/autolauncher.py | 107 ++++++++++---------------- WebHostLib/customserver.py | 149 ++++++++++++++++++++++++------------- 4 files changed, 173 insertions(+), 146 deletions(-) diff --git a/MultiServer.py b/MultiServer.py index bfbed4620218..1ea5c7d72e9c 100644 --- a/MultiServer.py +++ b/MultiServer.py @@ -175,11 +175,13 @@ class Context: all_item_and_group_names: typing.Dict[str, typing.Set[str]] all_location_and_group_names: typing.Dict[str, typing.Set[str]] non_hintable_names: typing.Dict[str, typing.Set[str]] + logger: logging.Logger def __init__(self, host: str, port: int, server_password: str, password: str, location_check_points: int, hint_cost: int, item_cheat: bool, release_mode: str = "disabled", collect_mode="disabled", remaining_mode: str = "disabled", auto_shutdown: typing.SupportsFloat = 0, compatibility: int = 2, - log_network: bool = False): + log_network: bool = False, logger: logging.Logger = logging.getLogger()): + self.logger = logger super(Context, self).__init__() self.slot_info = {} self.log_network = log_network @@ -287,12 +289,12 @@ async def send_msgs(self, endpoint: Endpoint, msgs: typing.Iterable[dict]) -> bo try: await endpoint.socket.send(msg) except websockets.ConnectionClosed: - logging.exception(f"Exception during send_msgs, could not send {msg}") + self.logger.exception(f"Exception during send_msgs, could not send {msg}") await self.disconnect(endpoint) return False else: if self.log_network: - logging.info(f"Outgoing message: {msg}") + self.logger.info(f"Outgoing message: {msg}") return True async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool: @@ -301,12 +303,12 @@ async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool: try: await endpoint.socket.send(msg) except websockets.ConnectionClosed: - logging.exception("Exception during send_encoded_msgs") + self.logger.exception("Exception during send_encoded_msgs") await self.disconnect(endpoint) return False else: if self.log_network: - logging.info(f"Outgoing message: {msg}") + self.logger.info(f"Outgoing message: {msg}") return True async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint], msg: str) -> bool: @@ -317,11 +319,11 @@ async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint] try: websockets.broadcast(sockets, msg) except RuntimeError: - logging.exception("Exception during broadcast_send_encoded_msgs") + self.logger.exception("Exception during broadcast_send_encoded_msgs") return False else: if self.log_network: - logging.info(f"Outgoing broadcast: {msg}") + self.logger.info(f"Outgoing broadcast: {msg}") return True def broadcast_all(self, msgs: typing.List[dict]): @@ -330,7 +332,7 @@ def broadcast_all(self, msgs: typing.List[dict]): async_start(self.broadcast_send_encoded_msgs(endpoints, msgs)) def broadcast_text_all(self, text: str, additional_arguments: dict = {}): - logging.info("Notice (all): %s" % text) + self.logger.info("Notice (all): %s" % text) self.broadcast_all([{**{"cmd": "PrintJSON", "data": [{ "text": text }]}, **additional_arguments}]) def broadcast_team(self, team: int, msgs: typing.List[dict]): @@ -352,7 +354,7 @@ async def disconnect(self, endpoint: Client): def notify_client(self, client: Client, text: str, additional_arguments: dict = {}): if not client.auth: return - logging.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text)) + self.logger.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text)) async_start(self.send_msgs(client, [{"cmd": "PrintJSON", "data": [{ "text": text }], **additional_arguments}])) def notify_client_multiple(self, client: Client, texts: typing.List[str], additional_arguments: dict = {}): @@ -451,7 +453,7 @@ def _load(self, decoded_obj: dict, game_data_packages: typing.Dict[str, typing.A for game_name, data in decoded_obj.get("datapackage", {}).items(): if game_name in game_data_packages: data = game_data_packages[game_name] - logging.info(f"Loading embedded data package for game {game_name}") + self.logger.info(f"Loading embedded data package for game {game_name}") self.gamespackage[game_name] = data self.item_name_groups[game_name] = data["item_name_groups"] if "location_name_groups" in data: @@ -483,7 +485,7 @@ def _save(self, exit_save: bool = False) -> bool: with open(self.save_filename, "wb") as f: f.write(zlib.compress(encoded_save)) except Exception as e: - logging.exception(e) + self.logger.exception(e) return False else: return True @@ -501,9 +503,9 @@ def init_save(self, enabled: bool = True): save_data = restricted_loads(zlib.decompress(f.read())) self.set_save(save_data) except FileNotFoundError: - logging.error('No save data found, starting a new game') + self.logger.error('No save data found, starting a new game') except Exception as e: - logging.exception(e) + self.logger.exception(e) self._start_async_saving() def _start_async_saving(self): @@ -520,11 +522,11 @@ def get_datetime_second(): next_wakeup = (second - get_datetime_second()) % self.auto_save_interval time.sleep(max(1.0, next_wakeup)) if self.save_dirty: - logging.debug("Saving via thread.") + self.logger.debug("Saving via thread.") self._save() except OperationalError as e: - logging.exception(e) - logging.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.") + self.logger.exception(e) + self.logger.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.") else: self.save_dirty = False self.auto_saver_thread = threading.Thread(target=save_regularly, daemon=True) @@ -598,7 +600,7 @@ def set_save(self, savedata: dict): if "stored_data" in savedata: self.stored_data = savedata["stored_data"] # count items and slots from lists for items_handling = remote - logging.info( + self.logger.info( f'Loaded save file with {sum([len(v) for k, v in self.received_items.items() if k[2]])} received items ' f'for {sum(k[2] for k in self.received_items)} players') @@ -640,13 +642,13 @@ def _set_options(self, server_options: dict): try: raise Exception(f"Could not set server option {key}, skipping.") from e except Exception as e: - logging.exception(e) - logging.debug(f"Setting server option {key} to {value} from supplied multidata") + self.logger.exception(e) + self.logger.debug(f"Setting server option {key} to {value} from supplied multidata") setattr(self, key, value) elif key == "disable_item_cheat": self.item_cheat = not bool(value) else: - logging.debug(f"Unrecognized server option {key}") + self.logger.debug(f"Unrecognized server option {key}") def get_aliased_name(self, team: int, slot: int): if (team, slot) in self.name_aliases: @@ -680,7 +682,7 @@ def notify_hints(self, team: int, hints: typing.List[NetUtils.Hint], only_new: b self.hints[team, player].add(hint) new_hint_events.add(player) - logging.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint))) + self.logger.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint))) for slot in new_hint_events: self.on_new_hint(team, slot) for slot, hint_data in concerns.items(): @@ -739,21 +741,21 @@ async def server(websocket, path: str = "/", ctx: Context = None): try: if ctx.log_network: - logging.info("Incoming connection") + ctx.logger.info("Incoming connection") await on_client_connected(ctx, client) if ctx.log_network: - logging.info("Sent Room Info") + ctx.logger.info("Sent Room Info") async for data in websocket: if ctx.log_network: - logging.info(f"Incoming message: {data}") + ctx.logger.info(f"Incoming message: {data}") for msg in decode(data): await process_client_cmd(ctx, client, msg) except Exception as e: if not isinstance(e, websockets.WebSocketException): - logging.exception(e) + ctx.logger.exception(e) finally: if ctx.log_network: - logging.info("Disconnected") + ctx.logger.info("Disconnected") await ctx.disconnect(client) @@ -963,7 +965,7 @@ def register_location_checks(ctx: Context, team: int, slot: int, locations: typi new_item = NetworkItem(item_id, location, slot, flags) send_items_to(ctx, team, target_player, new_item) - logging.info('(Team #%d) %s sent %s to %s (%s)' % ( + ctx.logger.info('(Team #%d) %s sent %s to %s (%s)' % ( team + 1, ctx.player_names[(team, slot)], ctx.item_names[item_id], ctx.player_names[(team, target_player)], ctx.location_names[location])) info_text = json_format_send_event(new_item, target_player) @@ -1605,7 +1607,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict): try: cmd: str = args["cmd"] except: - logging.exception(f"Could not get command from {args}") + ctx.logger.exception(f"Could not get command from {args}") await ctx.send_msgs(client, [{'cmd': 'InvalidPacket', "type": "cmd", "original_cmd": None, "text": f"Could not get command from {args} at `cmd`"}]) raise @@ -1646,7 +1648,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict): if ctx.compatibility == 0 and args['version'] != version_tuple: errors.add('IncompatibleVersion') if errors: - logging.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.") + ctx.logger.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.") await ctx.send_msgs(client, [{"cmd": "ConnectionRefused", "errors": list(errors)}]) else: team, slot = ctx.connect_names[args['name']] @@ -2264,7 +2266,7 @@ def inactivity_shutdown(): if to_cancel: for task in to_cancel: task.cancel() - logging.info("Shutting down due to inactivity.") + ctx.logger.info("Shutting down due to inactivity.") while not ctx.exit_event.is_set(): if not ctx.client_activity_timers.values(): diff --git a/WebHostLib/__init__.py b/WebHostLib/__init__.py index 69314c334ee5..7d2a32d362b9 100644 --- a/WebHostLib/__init__.py +++ b/WebHostLib/__init__.py @@ -23,6 +23,7 @@ app.config["SELFHOST"] = True # application process is in charge of running the websites app.config["GENERATORS"] = 8 # maximum concurrent world gens +app.config["HOSTERS"] = 8 # maximum concurrent room hosters app.config["SELFLAUNCH"] = True # application process is in charge of launching Rooms. app.config["SELFLAUNCHCERT"] = None # can point to a SSL Certificate to encrypt Room websocket connections app.config["SELFLAUNCHKEY"] = None # can point to a SSL Certificate Key to encrypt Room websocket connections diff --git a/WebHostLib/autolauncher.py b/WebHostLib/autolauncher.py index 7254dd46e136..78fff6c50991 100644 --- a/WebHostLib/autolauncher.py +++ b/WebHostLib/autolauncher.py @@ -3,7 +3,6 @@ import json import logging import multiprocessing -import threading import time import typing from uuid import UUID @@ -15,16 +14,6 @@ from .locker import Locker, AlreadyRunningException -def launch_room(room: Room, config: dict): - # requires db_session! - if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout): - multiworld = multiworlds.get(room.id, None) - if not multiworld: - multiworld = MultiworldInstance(room, config) - - multiworld.start() - - def handle_generation_success(seed_id): logging.info(f"Generation finished for seed {seed_id}") @@ -59,21 +48,30 @@ def init_db(pony_config: dict): db.generate_mapping() +def cleanup(): + """delete unowned user-content""" + with db_session: + # >>> bool(uuid.UUID(int=0)) + # True + rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True) + seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True) + slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True) + # Command gets deleted by ponyorm Cascade Delete, as Room is Required + if rooms or seeds or slots: + logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.") + + def autohost(config: dict): def keep_running(): try: with Locker("autohost"): - # delete unowned user-content - with db_session: - # >>> bool(uuid.UUID(int=0)) - # True - rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True) - seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True) - slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True) - # Command gets deleted by ponyorm Cascade Delete, as Room is Required - if rooms or seeds or slots: - logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.") - run_guardian() + cleanup() + hosters = [] + for x in range(config["HOSTERS"]): + hoster = MultiworldInstance(config, x) + hosters.append(hoster) + hoster.start() + while 1: time.sleep(0.1) with db_session: @@ -81,7 +79,9 @@ def keep_running(): room for room in Room if room.last_activity >= datetime.utcnow() - timedelta(days=3)) for room in rooms: - launch_room(room, config) + # we have to filter twice, as the per-room timeout can't currently be PonyORM transpiled. + if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout): + hosters[room.id.int % len(hosters)].start_room(room.id) except AlreadyRunningException: logging.info("Autohost reports as already running, not starting another.") @@ -132,29 +132,38 @@ def keep_running(): class MultiworldInstance(): - def __init__(self, room: Room, config: dict): - self.room_id = room.id + def __init__(self, config: dict, id: int): + self.room_ids = set() self.process: typing.Optional[multiprocessing.Process] = None - with guardian_lock: - multiworlds[self.room_id] = self self.ponyconfig = config["PONY"] self.cert = config["SELFLAUNCHCERT"] self.key = config["SELFLAUNCHKEY"] self.host = config["HOST_ADDRESS"] + self.rooms_to_start = multiprocessing.Queue() + self.rooms_shutting_down = multiprocessing.Queue() + self.name = f"MultiHoster{id}" def start(self): if self.process and self.process.is_alive(): return False - logging.info(f"Spinning up {self.room_id}") process = multiprocessing.Process(group=None, target=run_server_process, - args=(self.room_id, self.ponyconfig, get_static_server_data(), - self.cert, self.key, self.host), - name="MultiHost") + args=(self.name, self.ponyconfig, get_static_server_data(), + self.cert, self.key, self.host, + self.rooms_to_start, self.rooms_shutting_down), + name=self.name) process.start() - # bind after start to prevent thread sync issues with guardian. self.process = process + def start_room(self, room_id): + while not self.rooms_shutting_down.empty(): + self.room_ids.remove(self.rooms_shutting_down.get(block=True, timeout=None)) + if room_id in self.room_ids: + pass # should already be hosted currently. + else: + self.room_ids.add(room_id) + self.rooms_to_start.put(room_id) + def stop(self): if self.process: self.process.terminate() @@ -168,40 +177,6 @@ def collect(self): self.process = None -guardian = None -guardian_lock = threading.Lock() - - -def run_guardian(): - global guardian - global multiworlds - with guardian_lock: - if not guardian: - try: - import resource - except ModuleNotFoundError: - pass # unix only module - else: - # Each Server is another file handle, so request as many as we can from the system - file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - # set soft limit to hard limit - resource.setrlimit(resource.RLIMIT_NOFILE, (file_limit, file_limit)) - - def guard(): - while 1: - time.sleep(1) - done = [] - with guardian_lock: - for key, instance in multiworlds.items(): - if instance.done(): - instance.collect() - done.append(key) - for key in done: - del (multiworlds[key]) - - guardian = threading.Thread(name="Guardian", target=guard) - - from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed, Slot from .customserver import run_server_process, get_static_server_data from .generate import gen_game diff --git a/WebHostLib/customserver.py b/WebHostLib/customserver.py index fb3b314753cf..04b4b6a0a02a 100644 --- a/WebHostLib/customserver.py +++ b/WebHostLib/customserver.py @@ -5,6 +5,7 @@ import datetime import functools import logging +import multiprocessing import pickle import random import socket @@ -53,17 +54,19 @@ def _cmd_video(self, platform: str, user: str): class DBCommandProcessor(ServerCommandProcessor): def output(self, text: str): - logging.info(text) + self.ctx.logger.info(text) class WebHostContext(Context): room_id: int - def __init__(self, static_server_data: dict): + def __init__(self, static_server_data: dict, logger: logging.Logger): # static server data is used during _load_game_data to load required data, # without needing to import worlds system, which takes quite a bit of memory self.static_server_data = static_server_data - super(WebHostContext, self).__init__("", 0, "", "", 1, 40, True, "enabled", "enabled", "enabled", 0, 2) + super(WebHostContext, self).__init__("", 0, "", "", 1, + 40, True, "enabled", "enabled", + "enabled", 0, 2, logger=logger) del self.static_server_data self.main_loop = asyncio.get_running_loop() self.video = {} @@ -159,63 +162,95 @@ def get_static_server_data() -> dict: return data -def run_server_process(room_id, ponyconfig: dict, static_server_data: dict, +def set_up_logging(room_id) -> logging.Logger: + import os + # logger setup + logger = logging.getLogger(f"RoomLogger {room_id}") + + # this *should* be empty, but just in case. + for handler in logger.handlers[:]: + logger.removeHandler(handler) + handler.close() + + file_handler = logging.FileHandler( + os.path.join(Utils.user_path("logs"), f"{room_id}.txt"), + "a", + encoding="utf-8-sig") + file_handler.setFormatter(logging.Formatter("[%(asctime)s]: %(message)s")) + logger.setLevel(logging.INFO) + logger.addHandler(file_handler) + return logger + + +def run_server_process(name: str, ponyconfig: dict, static_server_data: dict, cert_file: typing.Optional[str], cert_key_file: typing.Optional[str], - host: str): + host: str, rooms_to_run: multiprocessing.Queue, rooms_shutting_down: multiprocessing.Queue): + Utils.init_logging(name) + try: + import resource + except ModuleNotFoundError: + pass # unix only module + else: + # Each Server is another file handle, so request as many as we can from the system + file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + # set soft limit to hard limit + resource.setrlimit(resource.RLIMIT_NOFILE, (file_limit, file_limit)) + del resource, file_limit + # establish DB connection for multidata and multisave db.bind(**ponyconfig) db.generate_mapping(check_tables=False) - async def main(): - if "worlds" in sys.modules: - raise Exception("Worlds system should not be loaded in the custom server.") - - import gc - Utils.init_logging(str(room_id), write_mode="a") - ctx = WebHostContext(static_server_data) - ctx.load(room_id) - ctx.init_save() - ssl_context = load_server_cert(cert_file, cert_key_file) if cert_file else None - gc.collect() # free intermediate objects used during setup + if "worlds" in sys.modules: + raise Exception("Worlds system should not be loaded in the custom server.") + + import gc + ssl_context = load_server_cert(cert_file, cert_key_file) if cert_file else None + del cert_file, cert_key_file, ponyconfig + gc.collect() # free intermediate objects used during setup + + loop = asyncio.get_event_loop() + + async def start_room(room_id): try: - ctx.server = websockets.serve(functools.partial(server, ctx=ctx), ctx.host, ctx.port, ssl=ssl_context) - - await ctx.server - except OSError: # likely port in use - ctx.server = websockets.serve(functools.partial(server, ctx=ctx), ctx.host, 0, ssl=ssl_context) - - await ctx.server - port = 0 - for wssocket in ctx.server.ws_server.sockets: - socketname = wssocket.getsockname() - if wssocket.family == socket.AF_INET6: - # Prefer IPv4, as most users seem to not have working ipv6 support - if not port: + logger = set_up_logging(room_id) + ctx = WebHostContext(static_server_data, logger) + ctx.load(room_id) + ctx.init_save() + try: + ctx.server = websockets.serve(functools.partial(server, ctx=ctx), ctx.host, ctx.port, ssl=ssl_context) + + await ctx.server + except OSError: # likely port in use + ctx.server = websockets.serve(functools.partial(server, ctx=ctx), ctx.host, 0, ssl=ssl_context) + + await ctx.server + port = 0 + for wssocket in ctx.server.ws_server.sockets: + socketname = wssocket.getsockname() + if wssocket.family == socket.AF_INET6: + # Prefer IPv4, as most users seem to not have working ipv6 support + if not port: + port = socketname[1] + elif wssocket.family == socket.AF_INET: port = socketname[1] - elif wssocket.family == socket.AF_INET: - port = socketname[1] - if port: - logging.info(f'Hosting game at {host}:{port}') + if port: + ctx.logger.info(f'Hosting game at {host}:{port}') + with db_session: + room = Room.get(id=ctx.room_id) + room.last_port = port + else: + ctx.logger.exception("Could not determine port. Likely hosting failure.") with db_session: - room = Room.get(id=ctx.room_id) - room.last_port = port - else: - logging.exception("Could not determine port. Likely hosting failure.") - with db_session: - ctx.auto_shutdown = Room.get(id=room_id).timeout - ctx.shutdown_task = asyncio.create_task(auto_shutdown(ctx, [])) - await ctx.shutdown_task + ctx.auto_shutdown = Room.get(id=room_id).timeout + ctx.shutdown_task = asyncio.create_task(auto_shutdown(ctx, [])) + await ctx.shutdown_task - # ensure auto launch is on the same page in regard to room activity. - with db_session: - room: Room = Room.get(id=ctx.room_id) - room.last_activity = datetime.datetime.utcnow() - datetime.timedelta(seconds=room.timeout + 60) - - logging.info("Shutting down") + # ensure auto launch is on the same page in regard to room activity. + with db_session: + room: Room = Room.get(id=ctx.room_id) + room.last_activity = datetime.datetime.utcnow() - datetime.timedelta(seconds=room.timeout + 60) - with Locker(room_id): - try: - asyncio.run(main()) except (KeyboardInterrupt, SystemExit): with db_session: room = Room.get(id=room_id) @@ -228,3 +263,17 @@ async def main(): # ensure the Room does not spin up again on its own, minute of safety buffer room.last_activity = datetime.datetime.utcnow() - datetime.timedelta(minutes=1, seconds=room.timeout) raise + finally: + rooms_shutting_down.put(room_id) + + class Starter(threading.Thread): + def run(self): + while 1: + next_room = rooms_to_run.get(block=True, timeout=None) + asyncio.run_coroutine_threadsafe(start_room(next_room), loop) + logging.info(f"Starting room {next_room} on {name}.") + + starter = Starter() + starter.daemon = True + starter.start() + loop.run_forever()