Skip to content

Commit

Permalink
Core: Remove Universally Unique ID Requirements (Per-Game Data Packag…
Browse files Browse the repository at this point in the history
  • Loading branch information
ThePhar authored Jun 1, 2024
1 parent f3003ff commit 5aa6ad6
Show file tree
Hide file tree
Showing 77 changed files with 319 additions and 392 deletions.
2 changes: 1 addition & 1 deletion AdventureClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def on_package(self, cmd: str, args: dict):
if ': !' not in msg:
self._set_message(msg, SYSTEM_MESSAGE_ID)
elif cmd == "ReceivedItems":
msg = f"Received {', '.join([self.item_names[item.item] for item in args['items']])}"
msg = f"Received {', '.join([self.item_names.lookup_in_slot(item.item) for item in args['items']])}"
self._set_message(msg, SYSTEM_MESSAGE_ID)
elif cmd == "Retrieved":
if f"adventure_{self.auth}_freeincarnates_used" in args["keys"]:
Expand Down
91 changes: 79 additions & 12 deletions CommonClient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import collections
import copy
import logging
import asyncio
Expand All @@ -8,6 +9,7 @@
import typing
import time
import functools
import warnings

import ModuleUpdate
ModuleUpdate.update()
Expand Down Expand Up @@ -173,10 +175,74 @@ class CommonContext:
items_handling: typing.Optional[int] = None
want_slot_data: bool = True # should slot_data be retrieved via Connect

# data package
# Contents in flux until connection to server is made, to download correct data for this multiworld.
item_names: typing.Dict[int, str] = Utils.KeyedDefaultDict(lambda code: f'Unknown item (ID:{code})')
location_names: typing.Dict[int, str] = Utils.KeyedDefaultDict(lambda code: f'Unknown location (ID:{code})')
class NameLookupDict:
"""A specialized dict, with helper methods, for id -> name item/location data package lookups by game."""
def __init__(self, ctx: CommonContext, lookup_type: typing.Literal["item", "location"]):
self.ctx: CommonContext = ctx
self.lookup_type: typing.Literal["item", "location"] = lookup_type
self._unknown_item: typing.Callable[[int], str] = lambda key: f"Unknown {lookup_type} (ID: {key})"
self._archipelago_lookup: typing.Dict[int, str] = {}
self._flat_store: typing.Dict[int, str] = Utils.KeyedDefaultDict(self._unknown_item)
self._game_store: typing.Dict[str, typing.ChainMap[int, str]] = collections.defaultdict(
lambda: collections.ChainMap(self._archipelago_lookup, Utils.KeyedDefaultDict(self._unknown_item)))
self.warned: bool = False

# noinspection PyTypeChecker
def __getitem__(self, key: str) -> typing.Mapping[int, str]:
# TODO: In a future version (0.6.0?) this should be simplified by removing implicit id lookups support.
if isinstance(key, int):
if not self.warned:
# Use warnings instead of logger to avoid deprecation message from appearing on user side.
self.warned = True
warnings.warn(f"Implicit name lookup by id only is deprecated and only supported to maintain "
f"backwards compatibility for now. If multiple games share the same id for a "
f"{self.lookup_type}, name could be incorrect. Please use "
f"`{self.lookup_type}_names.lookup_in_game()` or "
f"`{self.lookup_type}_names.lookup_in_slot()` instead.")
return self._flat_store[key] # type: ignore

return self._game_store[key]

def __len__(self) -> int:
return len(self._game_store)

def __iter__(self) -> typing.Iterator[str]:
return iter(self._game_store)

def __repr__(self) -> str:
return self._game_store.__repr__()

def lookup_in_game(self, code: int, game_name: typing.Optional[str] = None) -> str:
"""Returns the name for an item/location id in the context of a specific game or own game if `game` is
omitted.
"""
if game_name is None:
game_name = self.ctx.game
assert game_name is not None, f"Attempted to lookup {self.lookup_type} with no game name available."

return self._game_store[game_name][code]

def lookup_in_slot(self, code: int, slot: typing.Optional[int] = None) -> str:
"""Returns the name for an item/location id in the context of a specific slot or own slot if `slot` is
omitted.
"""
if slot is None:
slot = self.ctx.slot
assert slot is not None, f"Attempted to lookup {self.lookup_type} with no slot info available."

return self.lookup_in_game(code, self.ctx.slot_info[slot].game)

def update_game(self, game: str, name_to_id_lookup_table: typing.Dict[str, int]) -> None:
"""Overrides existing lookup tables for a particular game."""
id_to_name_lookup_table = Utils.KeyedDefaultDict(self._unknown_item)
id_to_name_lookup_table.update({code: name for name, code in name_to_id_lookup_table.items()})
self._game_store[game] = collections.ChainMap(self._archipelago_lookup, id_to_name_lookup_table)
self._flat_store.update(id_to_name_lookup_table) # Only needed for legacy lookup method.
if game == "Archipelago":
# Keep track of the Archipelago data package separately so if it gets updated in a custom datapackage,
# it updates in all chain maps automatically.
self._archipelago_lookup.clear()
self._archipelago_lookup.update(id_to_name_lookup_table)

# defaults
starting_reconnect_delay: int = 5
Expand Down Expand Up @@ -231,7 +297,7 @@ class CommonContext:
# message box reporting a loss of connection
_messagebox_connection_loss: typing.Optional["kvui.MessageBox"] = None

def __init__(self, server_address: typing.Optional[str], password: typing.Optional[str]) -> None:
def __init__(self, server_address: typing.Optional[str] = None, password: typing.Optional[str] = None) -> None:
# server state
self.server_address = server_address
self.username = None
Expand Down Expand Up @@ -271,6 +337,9 @@ def __init__(self, server_address: typing.Optional[str], password: typing.Option
self.exit_event = asyncio.Event()
self.watcher_event = asyncio.Event()

self.item_names = self.NameLookupDict(self, "item")
self.location_names = self.NameLookupDict(self, "location")

self.jsontotextparser = JSONtoTextParser(self)
self.rawjsontotextparser = RawJSONtoTextParser(self)
self.update_data_package(network_data_package)
Expand Down Expand Up @@ -486,19 +555,17 @@ async def prepare_data_package(self, relevant_games: typing.Set[str],
or remote_checksum != cache_checksum:
needed_updates.add(game)
else:
self.update_game(cached_game)
self.update_game(cached_game, game)
if needed_updates:
await self.send_msgs([{"cmd": "GetDataPackage", "games": [game_name]} for game_name in needed_updates])

def update_game(self, game_package: dict):
for item_name, item_id in game_package["item_name_to_id"].items():
self.item_names[item_id] = item_name
for location_name, location_id in game_package["location_name_to_id"].items():
self.location_names[location_id] = location_name
def update_game(self, game_package: dict, game: str):
self.item_names.update_game(game, game_package["item_name_to_id"])
self.location_names.update_game(game, game_package["location_name_to_id"])

def update_data_package(self, data_package: dict):
for game, game_data in data_package["games"].items():
self.update_game(game_data)
self.update_game(game_data, game)

def consume_network_data_package(self, data_package: dict):
self.update_data_package(data_package)
Expand Down
50 changes: 28 additions & 22 deletions MultiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ class Context:
slot_info: typing.Dict[int, NetworkSlot]
generator_version = Version(0, 0, 0)
checksums: typing.Dict[str, str]
item_names: typing.Dict[int, str] = Utils.KeyedDefaultDict(lambda code: f'Unknown item (ID:{code})')
item_names: typing.Dict[str, typing.Dict[int, str]] = (
collections.defaultdict(lambda: Utils.KeyedDefaultDict(lambda code: f'Unknown item (ID:{code})')))
item_name_groups: typing.Dict[str, typing.Dict[str, typing.Set[str]]]
location_names: typing.Dict[int, str] = Utils.KeyedDefaultDict(lambda code: f'Unknown location (ID:{code})')
location_names: typing.Dict[str, typing.Dict[int, str]] = (
collections.defaultdict(lambda: Utils.KeyedDefaultDict(lambda code: f'Unknown location (ID:{code})')))
location_name_groups: typing.Dict[str, typing.Dict[str, typing.Set[str]]]
all_item_and_group_names: typing.Dict[str, typing.Set[str]]
all_location_and_group_names: typing.Dict[str, typing.Set[str]]
Expand Down Expand Up @@ -271,14 +273,21 @@ def _init_game_data(self):
if "checksum" in game_package:
self.checksums[game_name] = game_package["checksum"]
for item_name, item_id in game_package["item_name_to_id"].items():
self.item_names[item_id] = item_name
self.item_names[game_name][item_id] = item_name
for location_name, location_id in game_package["location_name_to_id"].items():
self.location_names[location_id] = location_name
self.location_names[game_name][location_id] = location_name
self.all_item_and_group_names[game_name] = \
set(game_package["item_name_to_id"]) | set(self.item_name_groups[game_name])
self.all_location_and_group_names[game_name] = \
set(game_package["location_name_to_id"]) | set(self.location_name_groups.get(game_name, []))

archipelago_item_names = self.item_names["Archipelago"]
archipelago_location_names = self.location_names["Archipelago"]
for game in [game_name for game_name in self.gamespackage if game_name != "Archipelago"]:
# Add Archipelago items and locations to each data package.
self.item_names[game].update(archipelago_item_names)
self.location_names[game].update(archipelago_location_names)

def item_names_for_game(self, game: str) -> typing.Optional[typing.Dict[str, int]]:
return self.gamespackage[game]["item_name_to_id"] if game in self.gamespackage else None

Expand Down Expand Up @@ -783,10 +792,7 @@ async def on_client_connected(ctx: Context, client: Client):
for slot, connected_clients in clients.items():
if connected_clients:
name = ctx.player_names[team, slot]
players.append(
NetworkPlayer(team, slot,
ctx.name_aliases.get((team, slot), name), name)
)
players.append(NetworkPlayer(team, slot, ctx.name_aliases.get((team, slot), name), name))
games = {ctx.games[x] for x in range(1, len(ctx.games) + 1)}
games.add("Archipelago")
await ctx.send_msgs(client, [{
Expand All @@ -801,8 +807,6 @@ async def on_client_connected(ctx: Context, client: Client):
'permissions': get_permissions(ctx),
'hint_cost': ctx.hint_cost,
'location_check_points': ctx.location_check_points,
'datapackage_versions': {game: game_data["version"] for game, game_data
in ctx.gamespackage.items() if game in games},
'datapackage_checksums': {game: game_data["checksum"] for game, game_data
in ctx.gamespackage.items() if game in games and "checksum" in game_data},
'seed_name': ctx.seed_name,
Expand Down Expand Up @@ -1006,8 +1010,8 @@ def register_location_checks(ctx: Context, team: int, slot: int, locations: typi
send_items_to(ctx, team, target_player, new_item)

ctx.logger.info('(Team #%d) %s sent %s to %s (%s)' % (
team + 1, ctx.player_names[(team, slot)], ctx.item_names[item_id],
ctx.player_names[(team, target_player)], ctx.location_names[location]))
team + 1, ctx.player_names[(team, slot)], ctx.item_names[ctx.slot_info[target_player].game][item_id],
ctx.player_names[(team, target_player)], ctx.location_names[ctx.slot_info[slot].game][location]))
info_text = json_format_send_event(new_item, target_player)
ctx.broadcast_team(team, [info_text])

Expand Down Expand Up @@ -1061,8 +1065,8 @@ def collect_hint_location_id(ctx: Context, team: int, slot: int, seeked_location

def format_hint(ctx: Context, team: int, hint: NetUtils.Hint) -> str:
text = f"[Hint]: {ctx.player_names[team, hint.receiving_player]}'s " \
f"{ctx.item_names[hint.item]} is " \
f"at {ctx.location_names[hint.location]} " \
f"{ctx.item_names[ctx.slot_info[hint.receiving_player].game][hint.item]} is " \
f"at {ctx.location_names[ctx.slot_info[hint.finding_player].game][hint.location]} " \
f"in {ctx.player_names[team, hint.finding_player]}'s World"

if hint.entrance:
Expand Down Expand Up @@ -1364,7 +1368,7 @@ def _cmd_remaining(self) -> bool:
if self.ctx.remaining_mode == "enabled":
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(self.ctx.item_names[item_id]
self.output("Remaining items: " + ", ".join(self.ctx.item_names[self.client.slot.game][item_id]
for item_id in remaining_item_ids))
else:
self.output("No remaining items found.")
Expand All @@ -1377,7 +1381,7 @@ def _cmd_remaining(self) -> bool:
if self.ctx.client_game_state[self.client.team, self.client.slot] == ClientStatus.CLIENT_GOAL:
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(self.ctx.item_names[item_id]
self.output("Remaining items: " + ", ".join(self.ctx.item_names[self.client.slot.game][item_id]
for item_id in remaining_item_ids))
else:
self.output("No remaining items found.")
Expand All @@ -1395,7 +1399,8 @@ def _cmd_missing(self, filter_text="") -> bool:
locations = get_missing_checks(self.ctx, self.client.team, self.client.slot)

if locations:
names = [self.ctx.location_names[location] for location in locations]
game = self.ctx.slot_info[self.client.slot].game
names = [self.ctx.location_names[game][location] for location in locations]
if filter_text:
location_groups = self.ctx.location_name_groups[self.ctx.games[self.client.slot]]
if filter_text in location_groups: # location group name
Expand All @@ -1420,7 +1425,8 @@ def _cmd_checked(self, filter_text="") -> bool:
locations = get_checked_checks(self.ctx, self.client.team, self.client.slot)

if locations:
names = [self.ctx.location_names[location] for location in locations]
game = self.ctx.slot_info[self.client.slot].game
names = [self.ctx.location_names[game][location] for location in locations]
if filter_text:
location_groups = self.ctx.location_name_groups[self.ctx.games[self.client.slot]]
if filter_text in location_groups: # location group name
Expand Down Expand Up @@ -1501,10 +1507,10 @@ def get_hints(self, input_text: str, for_location: bool = False) -> bool:
elif input_text.isnumeric():
game = self.ctx.games[self.client.slot]
hint_id = int(input_text)
hint_name = self.ctx.item_names[hint_id] \
if not for_location and hint_id in self.ctx.item_names \
else self.ctx.location_names[hint_id] \
if for_location and hint_id in self.ctx.location_names \
hint_name = self.ctx.item_names[game][hint_id] \
if not for_location and hint_id in self.ctx.item_names[game] \
else self.ctx.location_names[game][hint_id] \
if for_location and hint_id in self.ctx.location_names[game] \
else None
if hint_name in self.ctx.non_hintable_names[game]:
self.output(f"Sorry, \"{hint_name}\" is marked as non-hintable.")
Expand Down
6 changes: 3 additions & 3 deletions NetUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,16 @@ def _handle_item_name(self, node: JSONMessagePart):

def _handle_item_id(self, node: JSONMessagePart):
item_id = int(node["text"])
node["text"] = self.ctx.item_names[item_id]
node["text"] = self.ctx.item_names.lookup_in_slot(item_id, node["player"])
return self._handle_item_name(node)

def _handle_location_name(self, node: JSONMessagePart):
node["color"] = 'green'
return self._handle_color(node)

def _handle_location_id(self, node: JSONMessagePart):
item_id = int(node["text"])
node["text"] = self.ctx.location_names[item_id]
location_id = int(node["text"])
node["text"] = self.ctx.location_names.lookup_in_slot(location_id, node["player"])
return self._handle_location_name(node)

def _handle_entrance_name(self, node: JSONMessagePart):
Expand Down
4 changes: 2 additions & 2 deletions UndertaleClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ async def process_undertale_cmd(ctx: UndertaleContext, cmd: str, args: dict):
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
toDraw = ""
for i in range(20):
if i < len(str(ctx.item_names[l.item])):
toDraw += str(ctx.item_names[l.item])[i]
if i < len(str(ctx.item_names.lookup_in_slot(l.item))):
toDraw += str(ctx.item_names.lookup_in_slot(l.item))[i]
else:
break
f.write(toDraw)
Expand Down
5 changes: 4 additions & 1 deletion Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def as_simple_string(self) -> str:
return ".".join(str(item) for item in self)


__version__ = "0.4.6"
__version__ = "0.5.0"
version_tuple = tuplize_version(__version__)

is_linux = sys.platform.startswith("linux")
Expand Down Expand Up @@ -458,6 +458,9 @@ class KeyedDefaultDict(collections.defaultdict):
"""defaultdict variant that uses the missing key as argument to default_factory"""
default_factory: typing.Callable[[typing.Any], typing.Any]

def __init__(self, default_factory: typing.Callable[[Any], Any] = None, **kwargs):
super().__init__(default_factory, **kwargs)

def __missing__(self, key):
self[key] = value = self.default_factory(key)
return value
Expand Down
Loading

0 comments on commit 5aa6ad6

Please sign in to comment.