Skip to content

Commit

Permalink
WebHost: use a limited process pool to run Rooms (#3214)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7900e4c)
  • Loading branch information
Berserker66 committed May 18, 2024
1 parent daccb30 commit 1b67b92
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 146 deletions.
62 changes: 32 additions & 30 deletions MultiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ class Context:
all_item_and_group_names: typing.Dict[str, typing.Set[str]]
all_location_and_group_names: typing.Dict[str, typing.Set[str]]
non_hintable_names: typing.Dict[str, typing.Set[str]]
logger: logging.Logger

def __init__(self, host: str, port: int, server_password: str, password: str, location_check_points: int,
hint_cost: int, item_cheat: bool, release_mode: str = "disabled", collect_mode="disabled",
remaining_mode: str = "disabled", auto_shutdown: typing.SupportsFloat = 0, compatibility: int = 2,
log_network: bool = False):
log_network: bool = False, logger: logging.Logger = logging.getLogger()):
self.logger = logger
super(Context, self).__init__()
self.slot_info = {}
self.log_network = log_network
Expand Down Expand Up @@ -287,12 +289,12 @@ async def send_msgs(self, endpoint: Endpoint, msgs: typing.Iterable[dict]) -> bo
try:
await endpoint.socket.send(msg)
except websockets.ConnectionClosed:
logging.exception(f"Exception during send_msgs, could not send {msg}")
self.logger.exception(f"Exception during send_msgs, could not send {msg}")
await self.disconnect(endpoint)
return False
else:
if self.log_network:
logging.info(f"Outgoing message: {msg}")
self.logger.info(f"Outgoing message: {msg}")
return True

async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool:
Expand All @@ -301,12 +303,12 @@ async def send_encoded_msgs(self, endpoint: Endpoint, msg: str) -> bool:
try:
await endpoint.socket.send(msg)
except websockets.ConnectionClosed:
logging.exception("Exception during send_encoded_msgs")
self.logger.exception("Exception during send_encoded_msgs")
await self.disconnect(endpoint)
return False
else:
if self.log_network:
logging.info(f"Outgoing message: {msg}")
self.logger.info(f"Outgoing message: {msg}")
return True

async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint], msg: str) -> bool:
Expand All @@ -317,11 +319,11 @@ async def broadcast_send_encoded_msgs(self, endpoints: typing.Iterable[Endpoint]
try:
websockets.broadcast(sockets, msg)
except RuntimeError:
logging.exception("Exception during broadcast_send_encoded_msgs")
self.logger.exception("Exception during broadcast_send_encoded_msgs")
return False
else:
if self.log_network:
logging.info(f"Outgoing broadcast: {msg}")
self.logger.info(f"Outgoing broadcast: {msg}")
return True

def broadcast_all(self, msgs: typing.List[dict]):
Expand All @@ -330,7 +332,7 @@ def broadcast_all(self, msgs: typing.List[dict]):
async_start(self.broadcast_send_encoded_msgs(endpoints, msgs))

def broadcast_text_all(self, text: str, additional_arguments: dict = {}):
logging.info("Notice (all): %s" % text)
self.logger.info("Notice (all): %s" % text)
self.broadcast_all([{**{"cmd": "PrintJSON", "data": [{ "text": text }]}, **additional_arguments}])

def broadcast_team(self, team: int, msgs: typing.List[dict]):
Expand All @@ -352,7 +354,7 @@ async def disconnect(self, endpoint: Client):
def notify_client(self, client: Client, text: str, additional_arguments: dict = {}):
if not client.auth:
return
logging.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text))
self.logger.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text))
async_start(self.send_msgs(client, [{"cmd": "PrintJSON", "data": [{ "text": text }], **additional_arguments}]))

def notify_client_multiple(self, client: Client, texts: typing.List[str], additional_arguments: dict = {}):
Expand Down Expand Up @@ -451,7 +453,7 @@ def _load(self, decoded_obj: dict, game_data_packages: typing.Dict[str, typing.A
for game_name, data in decoded_obj.get("datapackage", {}).items():
if game_name in game_data_packages:
data = game_data_packages[game_name]
logging.info(f"Loading embedded data package for game {game_name}")
self.logger.info(f"Loading embedded data package for game {game_name}")
self.gamespackage[game_name] = data
self.item_name_groups[game_name] = data["item_name_groups"]
if "location_name_groups" in data:
Expand Down Expand Up @@ -483,7 +485,7 @@ def _save(self, exit_save: bool = False) -> bool:
with open(self.save_filename, "wb") as f:
f.write(zlib.compress(encoded_save))
except Exception as e:
logging.exception(e)
self.logger.exception(e)
return False
else:
return True
Expand All @@ -501,9 +503,9 @@ def init_save(self, enabled: bool = True):
save_data = restricted_loads(zlib.decompress(f.read()))
self.set_save(save_data)
except FileNotFoundError:
logging.error('No save data found, starting a new game')
self.logger.error('No save data found, starting a new game')
except Exception as e:
logging.exception(e)
self.logger.exception(e)
self._start_async_saving()

def _start_async_saving(self):
Expand All @@ -520,11 +522,11 @@ def get_datetime_second():
next_wakeup = (second - get_datetime_second()) % self.auto_save_interval
time.sleep(max(1.0, next_wakeup))
if self.save_dirty:
logging.debug("Saving via thread.")
self.logger.debug("Saving via thread.")
self._save()
except OperationalError as e:
logging.exception(e)
logging.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.")
self.logger.exception(e)
self.logger.info(f"Saving failed. Retry in {self.auto_save_interval} seconds.")
else:
self.save_dirty = False
self.auto_saver_thread = threading.Thread(target=save_regularly, daemon=True)
Expand Down Expand Up @@ -598,7 +600,7 @@ def set_save(self, savedata: dict):
if "stored_data" in savedata:
self.stored_data = savedata["stored_data"]
# count items and slots from lists for items_handling = remote
logging.info(
self.logger.info(
f'Loaded save file with {sum([len(v) for k, v in self.received_items.items() if k[2]])} received items '
f'for {sum(k[2] for k in self.received_items)} players')

Expand Down Expand Up @@ -640,13 +642,13 @@ def _set_options(self, server_options: dict):
try:
raise Exception(f"Could not set server option {key}, skipping.") from e
except Exception as e:
logging.exception(e)
logging.debug(f"Setting server option {key} to {value} from supplied multidata")
self.logger.exception(e)
self.logger.debug(f"Setting server option {key} to {value} from supplied multidata")
setattr(self, key, value)
elif key == "disable_item_cheat":
self.item_cheat = not bool(value)
else:
logging.debug(f"Unrecognized server option {key}")
self.logger.debug(f"Unrecognized server option {key}")

def get_aliased_name(self, team: int, slot: int):
if (team, slot) in self.name_aliases:
Expand Down Expand Up @@ -680,7 +682,7 @@ def notify_hints(self, team: int, hints: typing.List[NetUtils.Hint], only_new: b
self.hints[team, player].add(hint)
new_hint_events.add(player)

logging.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint)))
self.logger.info("Notice (Team #%d): %s" % (team + 1, format_hint(self, team, hint)))
for slot in new_hint_events:
self.on_new_hint(team, slot)
for slot, hint_data in concerns.items():
Expand Down Expand Up @@ -739,21 +741,21 @@ async def server(websocket, path: str = "/", ctx: Context = None):

try:
if ctx.log_network:
logging.info("Incoming connection")
ctx.logger.info("Incoming connection")
await on_client_connected(ctx, client)
if ctx.log_network:
logging.info("Sent Room Info")
ctx.logger.info("Sent Room Info")
async for data in websocket:
if ctx.log_network:
logging.info(f"Incoming message: {data}")
ctx.logger.info(f"Incoming message: {data}")
for msg in decode(data):
await process_client_cmd(ctx, client, msg)
except Exception as e:
if not isinstance(e, websockets.WebSocketException):
logging.exception(e)
ctx.logger.exception(e)
finally:
if ctx.log_network:
logging.info("Disconnected")
ctx.logger.info("Disconnected")
await ctx.disconnect(client)


Expand Down Expand Up @@ -963,7 +965,7 @@ def register_location_checks(ctx: Context, team: int, slot: int, locations: typi
new_item = NetworkItem(item_id, location, slot, flags)
send_items_to(ctx, team, target_player, new_item)

logging.info('(Team #%d) %s sent %s to %s (%s)' % (
ctx.logger.info('(Team #%d) %s sent %s to %s (%s)' % (
team + 1, ctx.player_names[(team, slot)], ctx.item_names[item_id],
ctx.player_names[(team, target_player)], ctx.location_names[location]))
info_text = json_format_send_event(new_item, target_player)
Expand Down Expand Up @@ -1605,7 +1607,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict):
try:
cmd: str = args["cmd"]
except:
logging.exception(f"Could not get command from {args}")
ctx.logger.exception(f"Could not get command from {args}")
await ctx.send_msgs(client, [{'cmd': 'InvalidPacket', "type": "cmd", "original_cmd": None,
"text": f"Could not get command from {args} at `cmd`"}])
raise
Expand Down Expand Up @@ -1646,7 +1648,7 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict):
if ctx.compatibility == 0 and args['version'] != version_tuple:
errors.add('IncompatibleVersion')
if errors:
logging.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.")
ctx.logger.info(f"A client connection was refused due to: {errors}, the sent connect information was {args}.")
await ctx.send_msgs(client, [{"cmd": "ConnectionRefused", "errors": list(errors)}])
else:
team, slot = ctx.connect_names[args['name']]
Expand Down Expand Up @@ -2264,7 +2266,7 @@ def inactivity_shutdown():
if to_cancel:
for task in to_cancel:
task.cancel()
logging.info("Shutting down due to inactivity.")
ctx.logger.info("Shutting down due to inactivity.")

while not ctx.exit_event.is_set():
if not ctx.client_activity_timers.values():
Expand Down
1 change: 1 addition & 0 deletions WebHostLib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

app.config["SELFHOST"] = True # application process is in charge of running the websites
app.config["GENERATORS"] = 8 # maximum concurrent world gens
app.config["HOSTERS"] = 8 # maximum concurrent room hosters
app.config["SELFLAUNCH"] = True # application process is in charge of launching Rooms.
app.config["SELFLAUNCHCERT"] = None # can point to a SSL Certificate to encrypt Room websocket connections
app.config["SELFLAUNCHKEY"] = None # can point to a SSL Certificate Key to encrypt Room websocket connections
Expand Down
107 changes: 41 additions & 66 deletions WebHostLib/autolauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import multiprocessing
import threading
import time
import typing
from uuid import UUID
Expand All @@ -15,16 +14,6 @@
from .locker import Locker, AlreadyRunningException


def launch_room(room: Room, config: dict):
# requires db_session!
if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout):
multiworld = multiworlds.get(room.id, None)
if not multiworld:
multiworld = MultiworldInstance(room, config)

multiworld.start()


def handle_generation_success(seed_id):
logging.info(f"Generation finished for seed {seed_id}")

Expand Down Expand Up @@ -59,29 +48,40 @@ def init_db(pony_config: dict):
db.generate_mapping()


def cleanup():
"""delete unowned user-content"""
with db_session:
# >>> bool(uuid.UUID(int=0))
# True
rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True)
seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True)
slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True)
# Command gets deleted by ponyorm Cascade Delete, as Room is Required
if rooms or seeds or slots:
logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.")


def autohost(config: dict):
def keep_running():
try:
with Locker("autohost"):
# delete unowned user-content
with db_session:
# >>> bool(uuid.UUID(int=0))
# True
rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True)
seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True)
slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True)
# Command gets deleted by ponyorm Cascade Delete, as Room is Required
if rooms or seeds or slots:
logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.")
run_guardian()
cleanup()
hosters = []
for x in range(config["HOSTERS"]):
hoster = MultiworldInstance(config, x)
hosters.append(hoster)
hoster.start()

while 1:
time.sleep(0.1)
with db_session:
rooms = select(
room for room in Room if
room.last_activity >= datetime.utcnow() - timedelta(days=3))
for room in rooms:
launch_room(room, config)
# we have to filter twice, as the per-room timeout can't currently be PonyORM transpiled.
if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout):
hosters[room.id.int % len(hosters)].start_room(room.id)

except AlreadyRunningException:
logging.info("Autohost reports as already running, not starting another.")
Expand Down Expand Up @@ -132,29 +132,38 @@ def keep_running():


class MultiworldInstance():
def __init__(self, room: Room, config: dict):
self.room_id = room.id
def __init__(self, config: dict, id: int):
self.room_ids = set()
self.process: typing.Optional[multiprocessing.Process] = None
with guardian_lock:
multiworlds[self.room_id] = self
self.ponyconfig = config["PONY"]
self.cert = config["SELFLAUNCHCERT"]
self.key = config["SELFLAUNCHKEY"]
self.host = config["HOST_ADDRESS"]
self.rooms_to_start = multiprocessing.Queue()
self.rooms_shutting_down = multiprocessing.Queue()
self.name = f"MultiHoster{id}"

def start(self):
if self.process and self.process.is_alive():
return False

logging.info(f"Spinning up {self.room_id}")
process = multiprocessing.Process(group=None, target=run_server_process,
args=(self.room_id, self.ponyconfig, get_static_server_data(),
self.cert, self.key, self.host),
name="MultiHost")
args=(self.name, self.ponyconfig, get_static_server_data(),
self.cert, self.key, self.host,
self.rooms_to_start, self.rooms_shutting_down),
name=self.name)
process.start()
# bind after start to prevent thread sync issues with guardian.
self.process = process

def start_room(self, room_id):
while not self.rooms_shutting_down.empty():
self.room_ids.remove(self.rooms_shutting_down.get(block=True, timeout=None))
if room_id in self.room_ids:
pass # should already be hosted currently.
else:
self.room_ids.add(room_id)
self.rooms_to_start.put(room_id)

def stop(self):
if self.process:
self.process.terminate()
Expand All @@ -168,40 +177,6 @@ def collect(self):
self.process = None


guardian = None
guardian_lock = threading.Lock()


def run_guardian():
global guardian
global multiworlds
with guardian_lock:
if not guardian:
try:
import resource
except ModuleNotFoundError:
pass # unix only module
else:
# Each Server is another file handle, so request as many as we can from the system
file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
# set soft limit to hard limit
resource.setrlimit(resource.RLIMIT_NOFILE, (file_limit, file_limit))

def guard():
while 1:
time.sleep(1)
done = []
with guardian_lock:
for key, instance in multiworlds.items():
if instance.done():
instance.collect()
done.append(key)
for key in done:
del (multiworlds[key])

guardian = threading.Thread(name="Guardian", target=guard)


from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed, Slot
from .customserver import run_server_process, get_static_server_data
from .generate import gen_game
Loading

0 comments on commit 1b67b92

Please sign in to comment.