diff --git a/src/black_white.py b/src/black_white.py index 494818a..d7f7bc4 100644 --- a/src/black_white.py +++ b/src/black_white.py @@ -2,14 +2,14 @@ def setup_black_white_lists( - blacklist_library: str, - whitelist_library: str, - blacklist_library_type: str, - whitelist_library_type: str, - blacklist_users: str, - whitelist_users: str, - library_mapping=None, - user_mapping=None, + blacklist_library: list[str] | None, + whitelist_library: list[str] | None, + blacklist_library_type: list[str] | None, + whitelist_library_type: list[str] | None, + blacklist_users: list[str] | None, + whitelist_users: list[str] | None, + library_mapping: dict[str, str] | None = None, + user_mapping: dict[str, str] | None = None, ): blacklist_library, blacklist_library_type, blacklist_users = setup_x_lists( blacklist_library, @@ -40,53 +40,44 @@ def setup_black_white_lists( def setup_x_lists( - xlist_library, - xlist_library_type, - xlist_users, - xlist_type, - library_mapping=None, - user_mapping=None, -): + xlist_library: list[str] | None, + xlist_library_type: list[str] | None, + xlist_users: list[str] | None, + xlist_type: str | None, + library_mapping: dict[str, str] | None = None, + user_mapping: dict[str, str] | None = None, +) -> tuple[list[str], list[str], list[str]]: + out_library: list[str] = [] if xlist_library: - if len(xlist_library) > 0: - xlist_library = xlist_library.split(",") - xlist_library = [x.strip() for x in xlist_library] - if library_mapping: - temp_library = [] - for library in xlist_library: - library_other = search_mapping(library_mapping, library) - if library_other: - temp_library.append(library_other) + out_library = [x.strip() for x in xlist_library] + if library_mapping: + temp_library: list[str] = [] + for library in xlist_library: + library_other = search_mapping(library_mapping, library) + if library_other: + temp_library.append(library_other) - xlist_library = xlist_library + temp_library - else: - xlist_library = [] - logger(f"{xlist_type}list Library: {xlist_library}", 1) + out_library = out_library + temp_library + logger(f"{xlist_type}list Library: {xlist_library}", 1) + out_library_type: list[str] = [] if xlist_library_type: - if len(xlist_library_type) > 0: - xlist_library_type = xlist_library_type.split(",") - xlist_library_type = [x.lower().strip() for x in xlist_library_type] - else: - xlist_library_type = [] - logger(f"{xlist_type}list Library Type: {xlist_library_type}", 1) + out_library_type = [x.lower().strip() for x in xlist_library_type] + + logger(f"{xlist_type}list Library Type: {out_library_type}", 1) + out_users: list[str] = [] if xlist_users: - if len(xlist_users) > 0: - xlist_users = xlist_users.split(",") - xlist_users = [x.lower().strip() for x in xlist_users] - if user_mapping: - temp_users = [] - for user in xlist_users: - user_other = search_mapping(user_mapping, user) - if user_other: - temp_users.append(user_other) + out_users = [x.lower().strip() for x in xlist_users] + if user_mapping: + temp_users: list[str] = [] + for user in out_users: + user_other = search_mapping(user_mapping, user) + if user_other: + temp_users.append(user_other) + + out_users = out_users + temp_users - xlist_users = xlist_users + temp_users - else: - xlist_users = [] - else: - xlist_users = [] - logger(f"{xlist_type}list Users: {xlist_users}", 1) + logger(f"{xlist_type}list Users: {out_users}", 1) - return xlist_library, xlist_library_type, xlist_users + return out_library, out_library_type, out_users diff --git a/src/connection.py b/src/connection.py index 0afaaeb..39be764 100644 --- a/src/connection.py +++ b/src/connection.py @@ -1,4 +1,5 @@ import os +from typing import Literal from dotenv import load_dotenv from src.functions import logger, str_to_bool @@ -9,39 +10,32 @@ load_dotenv(override=True) -def jellyfin_emby_server_connection(server_baseurl, server_token, server_type): - servers = [] +def jellyfin_emby_server_connection( + server_baseurl: str, server_token: str, server_type: Literal["jellyfin", "emby"] +) -> list[Jellyfin | Emby]: + servers: list[Jellyfin | Emby] = [] + server: Jellyfin | Emby - server_baseurl = server_baseurl.split(",") - server_token = server_token.split(",") + server_baseurls = server_baseurl.split(",") + server_tokens = server_token.split(",") - if len(server_baseurl) != len(server_token): + if len(server_baseurls) != len(server_tokens): raise Exception( f"{server_type.upper()}_BASEURL and {server_type.upper()}_TOKEN must have the same number of entries" ) - for i, baseurl in enumerate(server_baseurl): + for i, baseurl in enumerate(server_baseurls): baseurl = baseurl.strip() if baseurl[-1] == "/": baseurl = baseurl[:-1] if server_type == "jellyfin": - server = Jellyfin(baseurl=baseurl, token=server_token[i].strip()) - servers.append( - ( - "jellyfin", - server, - ) - ) + server = Jellyfin(baseurl=baseurl, token=server_tokens[i].strip()) + servers.append(server) elif server_type == "emby": - server = Emby(baseurl=baseurl, token=server_token[i].strip()) - servers.append( - ( - "emby", - server, - ) - ) + server = Emby(baseurl=baseurl, token=server_tokens[i].strip()) + servers.append(server) else: raise Exception("Unknown server type") @@ -50,19 +44,19 @@ def jellyfin_emby_server_connection(server_baseurl, server_token, server_type): return servers -def generate_server_connections(): - servers = [] +def generate_server_connections() -> list[Plex | Jellyfin | Emby]: + servers: list[Plex | Jellyfin | Emby] = [] - plex_baseurl = os.getenv("PLEX_BASEURL", None) - plex_token = os.getenv("PLEX_TOKEN", None) - plex_username = os.getenv("PLEX_USERNAME", None) - plex_password = os.getenv("PLEX_PASSWORD", None) - plex_servername = os.getenv("PLEX_SERVERNAME", None) + plex_baseurl_str: str | None = os.getenv("PLEX_BASEURL", None) + plex_token_str: str | None = os.getenv("PLEX_TOKEN", None) + plex_username_str: str | None = os.getenv("PLEX_USERNAME", None) + plex_password_str: str | None = os.getenv("PLEX_PASSWORD", None) + plex_servername_str: str | None = os.getenv("PLEX_SERVERNAME", None) ssl_bypass = str_to_bool(os.getenv("SSL_BYPASS", "False")) - if plex_baseurl and plex_token: - plex_baseurl = plex_baseurl.split(",") - plex_token = plex_token.split(",") + if plex_baseurl_str and plex_token_str: + plex_baseurl = plex_baseurl_str.split(",") + plex_token = plex_token_str.split(",") if len(plex_baseurl) != len(plex_token): raise Exception( @@ -81,17 +75,12 @@ def generate_server_connections(): logger(f"Plex Server {i} info: {server.info()}", 3) - servers.append( - ( - "plex", - server, - ) - ) + servers.append(server) - if plex_username and plex_password and plex_servername: - plex_username = plex_username.split(",") - plex_password = plex_password.split(",") - plex_servername = plex_servername.split(",") + if plex_username_str and plex_password_str and plex_servername_str: + plex_username = plex_username_str.split(",") + plex_password = plex_password_str.split(",") + plex_servername = plex_servername_str.split(",") if len(plex_username) != len(plex_password) or len(plex_username) != len( plex_servername @@ -111,16 +100,10 @@ def generate_server_connections(): ) logger(f"Plex Server {i} info: {server.info()}", 3) - servers.append( - ( - "plex", - server, - ) - ) + servers.append(server) jellyfin_baseurl = os.getenv("JELLYFIN_BASEURL", None) jellyfin_token = os.getenv("JELLYFIN_TOKEN", None) - if jellyfin_baseurl and jellyfin_token: servers.extend( jellyfin_emby_server_connection( @@ -130,7 +113,6 @@ def generate_server_connections(): emby_baseurl = os.getenv("EMBY_BASEURL", None) emby_token = os.getenv("EMBY_TOKEN", None) - if emby_baseurl and emby_token: servers.extend( jellyfin_emby_server_connection(emby_baseurl, emby_token, "emby") diff --git a/src/custom_types.py b/src/custom_types.py new file mode 100644 index 0000000..c44799c --- /dev/null +++ b/src/custom_types.py @@ -0,0 +1,12 @@ +UsersWatchedStatus = dict[str, str | tuple[str] | dict[str, bool | int]] +UsersWatched = ( + dict[ + frozenset[UsersWatchedStatus], + dict[ + str, + list[UsersWatchedStatus] + | dict[frozenset[UsersWatchedStatus], list[UsersWatchedStatus]], + ], + ] + | list[UsersWatchedStatus] +) diff --git a/src/emby.py b/src/emby.py index 0af0e0a..436069e 100644 --- a/src/emby.py +++ b/src/emby.py @@ -1,5 +1,5 @@ from src.jellyfin_emby import JellyfinEmby -from packaging import version +from packaging.version import parse, Version class Emby(JellyfinEmby): @@ -21,5 +21,5 @@ def __init__(self, baseurl, token): server_type="Emby", baseurl=baseurl, token=token, headers=headers ) - def is_partial_update_supported(self, server_version): - return server_version > version.parse("4.4") + def is_partial_update_supported(self, server_version: Version) -> bool: + return server_version > parse("4.4") diff --git a/src/functions.py b/src/functions.py index 8abb3cd..8b1f69b 100644 --- a/src/functions.py +++ b/src/functions.py @@ -1,5 +1,6 @@ import os -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Callable from dotenv import load_dotenv load_dotenv(override=True) @@ -8,11 +9,11 @@ mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log")) -def logger(message: str, log_type=0): +def logger(message: str, log_type: int = 0): debug = str_to_bool(os.getenv("DEBUG", "False")) debug_level = os.getenv("DEBUG_LEVEL", "info").lower() - output = str(message) + output: str | None = str(message) if log_type == 0: pass elif log_type == 1 and (debug and debug_level in ("info", "debug")): @@ -42,12 +43,9 @@ def log_marked( username: str, library: str, movie_show: str, - episode: str = None, - duration=None, + episode: str | None = None, + duration: float | None = None, ): - if mark_file is None: - return - output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}" if episode: @@ -62,14 +60,14 @@ def log_marked( # Reimplementation of distutils.util.strtobool due to it being deprecated # Source: https://github.com/PostHog/posthog/blob/01e184c29d2c10c43166f1d40a334abbc3f99d8a/posthog/utils.py#L668 -def str_to_bool(value: any) -> bool: +def str_to_bool(value: str) -> bool: if not value: return False return str(value).lower() in ("y", "yes", "t", "true", "on", "1") # Search for nested element in list -def contains_nested(element, lst): +def contains_nested(element: str, lst: list[tuple[str] | None] | tuple[str] | None): if lst is None: return None @@ -84,7 +82,7 @@ def contains_nested(element, lst): # Get mapped value -def search_mapping(dictionary: dict, key_value: str): +def search_mapping(dictionary: dict[str, str], key_value: str) -> str | None: if key_value in dictionary.keys(): return dictionary[key_value] elif key_value.lower() in dictionary.keys(): @@ -100,8 +98,10 @@ def search_mapping(dictionary: dict, key_value: str): # Return list of objects that exist in both lists including mappings -def match_list(list1, list2, list_mapping=None): - output = [] +def match_list( + list1: list[str], list2: list[str], list_mapping: dict[str, str] | None = None +) -> list[str]: + output: list[str] = [] for element in list1: if element in list2: output.append(element) @@ -114,35 +114,49 @@ def match_list(list1, list2, list_mapping=None): def future_thread_executor( - args: list, threads: int = None, override_threads: bool = False -): - futures_list = [] - results = [] - - workers = min(int(os.getenv("MAX_THREADS", 32)), os.cpu_count() * 2) - if threads: + args: list[tuple[Callable[..., Any], ...]], + threads: int | None = None, + override_threads: bool = False, +) -> list[Any]: + results: list[Any] = [] + + # Determine the number of workers, defaulting to 1 if os.cpu_count() returns None + max_threads_env: int = int(os.getenv("MAX_THREADS", 32)) + cpu_threads: int = os.cpu_count() or 1 # Default to 1 if os.cpu_count() is None + workers: int = min(max_threads_env, cpu_threads * 2) + + # Adjust workers based on threads parameter and override_threads flag + if threads is not None: workers = min(threads, workers) - if override_threads: - workers = threads + workers = threads if threads is not None else workers # If only one worker, run in main thread to avoid overhead if workers == 1: - results = [] for arg in args: results.append(arg[0](*arg[1:])) return results with ThreadPoolExecutor(max_workers=workers) as executor: + futures_list: list[Future[Any]] = [] + for arg in args: # * arg unpacks the list into actual arguments futures_list.append(executor.submit(*arg)) - for future in futures_list: + for out in futures_list: try: - result = future.result() + result = out.result() results.append(result) except Exception as e: raise Exception(e) return results + + +def parse_string_to_list(string: str | None) -> list[str]: + output: list[str] = [] + if string and len(string) > 0: + output = string.split(",") + + return output diff --git a/src/jellyfin.py b/src/jellyfin.py index faf91b5..4c0bcda 100644 --- a/src/jellyfin.py +++ b/src/jellyfin.py @@ -1,5 +1,5 @@ from src.jellyfin_emby import JellyfinEmby -from packaging import version +from packaging.version import parse, Version class Jellyfin(JellyfinEmby): @@ -21,5 +21,5 @@ def __init__(self, baseurl, token): server_type="Jellyfin", baseurl=baseurl, token=token, headers=headers ) - def is_partial_update_supported(self, server_version): - return server_version >= version.parse("10.9.0") + def is_partial_update_supported(self, server_version: Version) -> bool: + return server_version >= parse("10.9.0") diff --git a/src/jellyfin_emby.py b/src/jellyfin_emby.py index 55c3609..f8afa71 100644 --- a/src/jellyfin_emby.py +++ b/src/jellyfin_emby.py @@ -2,9 +2,10 @@ import traceback, os from math import floor +from typing import Any, Literal from dotenv import load_dotenv import requests -from packaging import version +from packaging.version import parse, Version from src.functions import ( logger, @@ -14,6 +15,7 @@ str_to_bool, ) from src.library import generate_library_guids_dict +from src.custom_types import UsersWatched load_dotenv(override=True) @@ -21,39 +23,6 @@ generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True")) -def get_guids(server_type, item): - if item.get("Name"): - guids = {"title": item.get("Name")} - else: - logger(f"{server_type}: Name not found in {item.get('Id')}", 1) - guids = {"title": None} - - if "ProviderIds" in item: - guids.update({k.lower(): v for k, v in item["ProviderIds"].items()}) - else: - logger(f"{server_type}: ProviderIds not found in {item.get('Name')}", 1) - - if "MediaSources" in item: - guids["locations"] = tuple( - [x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x] - ) - else: - logger(f"{server_type}: MediaSources not found in {item.get('Name')}", 1) - guids["locations"] = tuple() - - if "UserData" in item: - guids["status"] = { - "completed": item["UserData"]["Played"], - # Convert ticks to milliseconds to match Plex - "time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000), - } - else: - logger(f"{server_type}: UserData not found in {item.get('Name')}", 1) - guids["status"] = {} - - return guids - - def get_video_status(server_video, videos_ids, videos): video_status = None @@ -103,7 +72,13 @@ def get_video_status(server_video, videos_ids, videos): class JellyfinEmby: - def __init__(self, server_type, baseurl, token, headers): + def __init__( + self, + server_type: Literal["Jellyfin", "Emby"], + baseurl: str, + token: str, + headers: dict[str, str], + ): if server_type not in ["Jellyfin", "Emby"]: raise Exception(f"Server type {server_type} not supported") self.server_type = server_type @@ -122,9 +97,17 @@ def __init__(self, server_type, baseurl, token, headers): self.users = self.get_users() self.server_name = self.info(name_only=True) - def query(self, query, query_type, identifiers=None, json=None): + def query( + self, + query: str, + query_type: Literal["get", "post"], + identifiers: dict[str, str] | None = None, + json: dict[str, float] | None = None, + ) -> dict[str, Any] | list[dict[str, Any]] | None: try: - results = None + results: ( + dict[str, list[Any] | dict[str, str]] | list[dict[str, Any]] | None + ) = None if query_type == "get": response = self.session.get( @@ -160,7 +143,7 @@ def query(self, query, query_type, identifiers=None, json=None): raise Exception("Query result is not of type list or dict") # append identifiers to results - if identifiers: + if identifiers and results: results["Identifiers"] = identifiers return results @@ -172,54 +155,85 @@ def query(self, query, query_type, identifiers=None, json=None): ) raise Exception(e) - def info(self, name_only: bool = False) -> str: + def info( + self, name_only: bool = False, version_only: bool = False + ) -> str | Version | None: try: query_string = "/System/Info/Public" - response = self.query(query_string, "get") + response: dict[str, Any] = self.query(query_string, "get") if response: if name_only: - return f"{response['ServerName']}" - return f"{self.server_type} {response['ServerName']}: {response['Version']}" - else: - return None - - except Exception as e: - logger(f"{self.server_type}: Get server name failed {e}", 2) - raise Exception(e) - - def get_server_version(self): - try: - response = self.query("/System/Info/Public", "get") + return response["ServerName"] + elif version_only: + return parse(response["Version"]) - if response: - return version.parse(response["Version"]) + return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}" else: return None except Exception as e: - logger(f"{self.server_type}: Get server version failed: {e}", 2) + logger(f"{self.server_type}: Get server name failed {e}", 2) raise Exception(e) - def get_users(self): + def get_users(self) -> dict[str, str]: try: - users = {} + users: dict[str, str] = {} query_string = "/Users" - response = self.query(query_string, "get") + response: list[dict[str, str | bool]] = self.query(query_string, "get") # If response is not empty if response: for user in response: - users[user["Name"]] = user["Id"] + if isinstance(user["Name"], str) and isinstance(user["Id"], str): + users[user["Name"]] = user["Id"] return users except Exception as e: logger(f"{self.server_type}: Get users failed {e}", 2) raise Exception(e) - def get_libraries(self): + def get_guids(self, item: dict): + guids: dict[str, str | tuple[str] | dict[str, bool | int]] = {} + + if item.get("Name"): + guids["title"] = item.get("Name") + else: + logger(f"{self.server_type}: Name not found in {item.get('Id')}", 1) + guids["title"] = None + + if "ProviderIds" in item: + guids.update({k.lower(): v for k, v in item["ProviderIds"].items()}) + else: + logger( + f"{self.server_type}: ProviderIds not found in {item.get('Name')}", 1 + ) + + if "MediaSources" in item: + guids["locations"] = tuple( + [x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x] + ) + else: + logger( + f"{self.server_type}: MediaSources not found in {item.get('Name')}", 1 + ) + guids["locations"] = tuple() + + if "UserData" in item: + guids["status"] = { + "completed": item["UserData"]["Played"], + # Convert ticks to milliseconds to match Plex + "time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000), + } + else: + logger(f"{self.server_type}: UserData not found in {item.get('Name')}", 1) + guids["status"] = {} + + return guids + + def get_libraries(self) -> dict[str, str]: try: libraries = {} @@ -227,7 +241,7 @@ def get_libraries(self): users = self.get_users() for _, user_id in users.items(): - user_libraries = self.query(f"/Users/{user_id}/Views", "get") + user_libraries: dict = self.query(f"/Users/{user_id}/Views", "get") for library in user_libraries["Items"]: library_id = library["Id"] library_title = library["Name"] @@ -308,7 +322,7 @@ def get_user_library_watched( ) # Get the movie's GUIDs - movie_guids = get_guids(self.server_type, movie) + movie_guids = self.get_guids(movie) # Append the movie dictionary to the list for the given user and library user_watched[library_title].append(movie_guids) @@ -379,7 +393,7 @@ def get_user_library_watched( episode["UserData"]["Played"] == True or episode["UserData"]["PlaybackPositionTicks"] > 600000000 ): - episode_guids = get_guids(self.server_type, episode) + episode_guids = self.get_guids(episode) mark_episodes_list.append(episode_guids) if mark_episodes_list: @@ -411,9 +425,11 @@ def get_user_library_watched( logger(traceback.format_exc(), 2) return {} - def get_watched(self, users, sync_libraries): + def get_watched( + self, users: dict[str, str], sync_libraries: list[str] + ) -> UsersWatched: try: - users_watched = {} + users_watched: UsersWatched = {} watched = [] for user_name, user_id in users.items(): @@ -427,7 +443,7 @@ def get_watched(self, users, sync_libraries): if library_title not in sync_libraries: continue - identifiers = { + identifiers: dict[str, str] = { "library_id": library_id, "library_title": library_title, } @@ -444,8 +460,8 @@ def get_watched(self, users, sync_libraries): if len(library["Items"]) == 0: continue - library_id = library["Identifiers"]["library_id"] - library_title = library["Identifiers"]["library_title"] + library_id: str = library["Identifiers"]["library_id"] + library_title: str = library["Identifiers"]["library_title"] # Get all library types excluding "Folder" types = set( @@ -725,7 +741,7 @@ def update_watched( self, watched_list, user_mapping=None, library_mapping=None, dryrun=False ): try: - server_version = self.get_server_version() + server_version = self.info(version_only=True) update_partial = self.is_partial_update_supported(server_version) if not update_partial: diff --git a/src/library.py b/src/library.py index 1f26d56..a4f9464 100644 --- a/src/library.py +++ b/src/library.py @@ -3,17 +3,18 @@ match_list, search_mapping, ) +from src.custom_types import UsersWatched def check_skip_logic( - library_title, - library_type, - blacklist_library, - whitelist_library, - blacklist_library_type, - whitelist_library_type, - library_mapping=None, -): + library_title: str, + library_type: str, + blacklist_library: list[str], + whitelist_library: list[str], + blacklist_library_type: list[str], + whitelist_library_type: list[str], + library_mapping: dict[str, str] | None = None, +) -> str | None: skip_reason = None library_other = None if library_mapping: @@ -48,11 +49,11 @@ def check_skip_logic( def check_blacklist_logic( - library_title, - library_type, - blacklist_library, - blacklist_library_type, - library_other=None, + library_title: str, + library_type: str, + blacklist_library: list[str], + blacklist_library_type: list[str], + library_other: str | None = None, ): skip_reason = None if isinstance(library_type, (list, tuple, set)): @@ -84,11 +85,11 @@ def check_blacklist_logic( def check_whitelist_logic( - library_title, - library_type, - whitelist_library, - whitelist_library_type, - library_other=None, + library_title: str, + library_type: str, + whitelist_library: list[str], + whitelist_library_type: list[str], + library_other: str | None = None, ): skip_reason = None if len(whitelist_library_type) > 0: @@ -131,14 +132,14 @@ def check_whitelist_logic( def filter_libaries( - server_libraries, - blacklist_library, - blacklist_library_type, - whitelist_library, - whitelist_library_type, - library_mapping=None, -): - filtered_libaries = [] + server_libraries: dict[str, str], + blacklist_library: list[str], + blacklist_library_type: list[str], + whitelist_library: list[str], + whitelist_library_type: list[str], + library_mapping: dict[str, str] | None = None, +) -> list[str]: + filtered_libaries: list[str] = [] for library in server_libraries: skip_reason = check_skip_logic( library, @@ -162,12 +163,12 @@ def filter_libaries( def setup_libraries( server_1, server_2, - blacklist_library, - blacklist_library_type, - whitelist_library, - whitelist_library_type, - library_mapping=None, -): + blacklist_library: list[str], + blacklist_library_type: list[str], + whitelist_library: list[str], + whitelist_library_type: list[str], + library_mapping: dict[str, str] | None = None, +) -> tuple[list[str], list[str]]: server_1_libraries = server_1.get_libraries() server_2_libraries = server_2.get_libraries() logger(f"Server 1 libraries: {server_1_libraries}", 1) @@ -201,14 +202,16 @@ def setup_libraries( return output_server_1_libaries, output_server_2_libaries -def show_title_dict(user_list: dict): +def show_title_dict(user_list: UsersWatched) -> dict[str, list[tuple[str] | None]]: try: - show_output_dict = {} + if not isinstance(user_list, dict): + return {} + + show_output_dict: dict[str, list[tuple[str] | None]] = {} show_output_dict["locations"] = [] show_counter = 0 # Initialize a counter for the current show position - show_output_keys = user_list.keys() - show_output_keys = [dict(x) for x in list(show_output_keys)] + show_output_keys = [dict(x) for x in list(user_list.keys())] for show_key in show_output_keys: for provider_key, provider_value in show_key.items(): # Skip title @@ -233,9 +236,19 @@ def show_title_dict(user_list: dict): return {} -def episode_title_dict(user_list: dict): +def episode_title_dict( + user_list: UsersWatched, +) -> dict[ + str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None] +]: try: - episode_output_dict = {} + if not isinstance(user_list, dict): + return {} + + episode_output_dict: dict[ + str, + list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None], + ] = {} episode_output_dict["completed"] = [] episode_output_dict["time"] = [] episode_output_dict["locations"] = [] @@ -293,12 +306,18 @@ def episode_title_dict(user_list: dict): return {} -def movies_title_dict(user_list: dict): +def movies_title_dict( + user_list: UsersWatched, +) -> dict[str, list[str | bool | int | tuple[str] | None]]: try: - movies_output_dict = {} - movies_output_dict["completed"] = [] - movies_output_dict["time"] = [] - movies_output_dict["locations"] = [] + if not isinstance(user_list, list): + return {} + + movies_output_dict: dict[str, list[str | bool | int | tuple[str] | None]] = { + "completed": [], + "time": [], + "locations": [], + } movie_counter = 0 # Initialize a counter for the current movie position for movie in user_list: @@ -325,7 +344,13 @@ def movies_title_dict(user_list: dict): return {} -def generate_library_guids_dict(user_list: dict): +def generate_library_guids_dict( + user_list: UsersWatched, +) -> tuple[ + dict[str, list[tuple[str] | None]], + dict[str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]], + dict[str, list[str | bool | int | tuple[str] | None]], +]: # Handle the case where user_list is empty or does not contain the expected keys and values if not user_list: return {}, {}, {} diff --git a/src/main.py b/src/main.py index 07b56a1..91765b1 100644 --- a/src/main.py +++ b/src/main.py @@ -2,9 +2,13 @@ from dotenv import load_dotenv from time import sleep, perf_counter +from src.emby import Emby +from src.jellyfin import Jellyfin +from src.plex import Plex from src.library import setup_libraries from src.functions import ( logger, + parse_string_to_list, str_to_bool, ) from src.users import setup_users @@ -17,7 +21,10 @@ load_dotenv(override=True) -def should_sync_server(server_1_type, server_2_type): +def should_sync_server( + server_1: Plex | Jellyfin | Emby, + server_2: Plex | Jellyfin | Emby, +) -> bool: sync_from_plex_to_jellyfin = str_to_bool( os.getenv("SYNC_FROM_PLEX_TO_JELLYFIN", "True") ) @@ -40,42 +47,42 @@ def should_sync_server(server_1_type, server_2_type): ) sync_from_emby_to_emby = str_to_bool(os.getenv("SYNC_FROM_EMBY_TO_EMBY", "True")) - if server_1_type == "plex": - if server_2_type == "jellyfin" and not sync_from_plex_to_jellyfin: + if isinstance(server_1, Plex): + if isinstance(server_2, Jellyfin) and not sync_from_plex_to_jellyfin: logger("Sync from plex -> jellyfin is disabled", 1) return False - if server_2_type == "emby" and not sync_from_plex_to_emby: + if isinstance(server_2, Emby) and not sync_from_plex_to_emby: logger("Sync from plex -> emby is disabled", 1) return False - if server_2_type == "plex" and not sync_from_plex_to_plex: + if isinstance(server_2, Plex) and not sync_from_plex_to_plex: logger("Sync from plex -> plex is disabled", 1) return False - if server_1_type == "jellyfin": - if server_2_type == "plex" and not sync_from_jelly_to_plex: + if isinstance(server_1, Jellyfin): + if isinstance(server_2, Plex) and not sync_from_jelly_to_plex: logger("Sync from jellyfin -> plex is disabled", 1) return False - if server_2_type == "jellyfin" and not sync_from_jelly_to_jellyfin: + if isinstance(server_2, Jellyfin) and not sync_from_jelly_to_jellyfin: logger("Sync from jellyfin -> jellyfin is disabled", 1) return False - if server_2_type == "emby" and not sync_from_jelly_to_emby: + if isinstance(server_2, Emby) and not sync_from_jelly_to_emby: logger("Sync from jellyfin -> emby is disabled", 1) return False - if server_1_type == "emby": - if server_2_type == "plex" and not sync_from_emby_to_plex: + if isinstance(server_1, Emby): + if isinstance(server_2, Plex) and not sync_from_emby_to_plex: logger("Sync from emby -> plex is disabled", 1) return False - if server_2_type == "jellyfin" and not sync_from_emby_to_jellyfin: + if isinstance(server_2, Jellyfin) and not sync_from_emby_to_jellyfin: logger("Sync from emby -> jellyfin is disabled", 1) return False - if server_2_type == "emby" and not sync_from_emby_to_emby: + if isinstance(server_2, Emby) and not sync_from_emby_to_emby: logger("Sync from emby -> emby is disabled", 1) return False @@ -91,24 +98,26 @@ def main_loop(): dryrun = str_to_bool(os.getenv("DRYRUN", "False")) logger(f"Dryrun: {dryrun}", 1) - user_mapping = os.getenv("USER_MAPPING") - if user_mapping: - user_mapping = json.loads(user_mapping.lower()) - logger(f"User Mapping: {user_mapping}", 1) + user_mapping = os.getenv("USER_MAPPING", "") + user_mapping = json.loads(user_mapping.lower()) + logger(f"User Mapping: {user_mapping}", 1) - library_mapping = os.getenv("LIBRARY_MAPPING") - if library_mapping: - library_mapping = json.loads(library_mapping) - logger(f"Library Mapping: {library_mapping}", 1) + library_mapping = os.getenv("LIBRARY_MAPPING", "") + library_mapping = json.loads(library_mapping) + logger(f"Library Mapping: {library_mapping}", 1) # Create (black/white)lists logger("Creating (black/white)lists", 1) - blacklist_library = os.getenv("BLACKLIST_LIBRARY", None) - whitelist_library = os.getenv("WHITELIST_LIBRARY", None) - blacklist_library_type = os.getenv("BLACKLIST_LIBRARY_TYPE", None) - whitelist_library_type = os.getenv("WHITELIST_LIBRARY_TYPE", None) - blacklist_users = os.getenv("BLACKLIST_USERS", None) - whitelist_users = os.getenv("WHITELIST_USERS", None) + blacklist_library = parse_string_to_list(os.getenv("BLACKLIST_LIBRARY", None)) + whitelist_library = parse_string_to_list(os.getenv("WHITELIST_LIBRARY", None)) + blacklist_library_type = parse_string_to_list( + os.getenv("BLACKLIST_LIBRARY_TYPE", None) + ) + whitelist_library_type = parse_string_to_list( + os.getenv("WHITELIST_LIBRARY_TYPE", None) + ) + blacklist_users = parse_string_to_list(os.getenv("BLACKLIST_USERS", None)) + whitelist_users = parse_string_to_list(os.getenv("WHITELIST_USERS", None)) ( blacklist_library, @@ -140,13 +149,13 @@ def main_loop(): # Start server_2 at the next server in the list for server_2 in servers[servers.index(server_1) + 1 :]: # Check if server 1 and server 2 are going to be synced in either direction, skip if not - if not should_sync_server( - server_1[0], server_2[0] - ) and not should_sync_server(server_2[0], server_1[0]): + if not should_sync_server(server_1, server_2) and not should_sync_server( + server_2, server_1 + ): continue - logger(f"Server 1: {server_1[0].capitalize()}: {server_1[1].info()}", 0) - logger(f"Server 2: {server_2[0].capitalize()}: {server_2[1].info()}", 0) + logger(f"Server 1: {type(server_1)}: {server_1.info()}", 0) + logger(f"Server 2: {type(server_2)}: {server_2.info()}", 0) # Create users list logger("Creating users list", 1) @@ -155,8 +164,8 @@ def main_loop(): ) server_1_libraries, server_2_libraries = setup_libraries( - server_1[1], - server_2[1], + server_1, + server_2, blacklist_library, blacklist_library_type, whitelist_library, @@ -165,14 +174,10 @@ def main_loop(): ) logger("Creating watched lists", 1) - server_1_watched = server_1[1].get_watched( - server_1_users, server_1_libraries - ) + server_1_watched = server_1.get_watched(server_1_users, server_1_libraries) logger("Finished creating watched list server 1", 1) - server_2_watched = server_2[1].get_watched( - server_2_users, server_2_libraries - ) + server_2_watched = server_2.get_watched(server_2_users, server_2_libraries) logger("Finished creating watched list server 2", 1) logger(f"Server 1 watched: {server_1_watched}", 3) @@ -197,18 +202,18 @@ def main_loop(): 1, ) - if should_sync_server(server_2[0], server_1[0]): - logger(f"Syncing {server_2[1].info()} -> {server_1[1].info()}", 0) - server_1[1].update_watched( + if should_sync_server(server_2, server_1): + logger(f"Syncing {server_2.info()} -> {server_1.info()}", 0) + server_1.update_watched( server_2_watched_filtered, user_mapping, library_mapping, dryrun, ) - if should_sync_server(server_1[0], server_2[0]): - logger(f"Syncing {server_1[1].info()} -> {server_2[1].info()}", 0) - server_2[1].update_watched( + if should_sync_server(server_1, server_2): + logger(f"Syncing {server_1.info()} -> {server_2.info()}", 0) + server_2.update_watched( server_1_watched_filtered, user_mapping, library_mapping, @@ -219,7 +224,7 @@ def main_loop(): def main(): run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False")) sleep_duration = float(os.getenv("SLEEP_DURATION", "3600")) - times = [] + times: list[float] = [] while True: try: start = perf_counter() diff --git a/src/plex.py b/src/plex.py index 77b06da..82039c5 100644 --- a/src/plex.py +++ b/src/plex.py @@ -205,7 +205,7 @@ def get_user_library_watched(user, user_plex, library): def find_video(plex_search, video_ids, videos=None): try: if not generate_guids and not generate_locations: - return False, [] + return None if generate_locations: for location in plex_search.locations: @@ -226,7 +226,7 @@ def find_video(plex_search, video_ids, videos=None): for episode in episodes: episode_videos.append(episode) - return True, episode_videos + return episode_videos if generate_guids: for guid in plex_search.guids: @@ -244,11 +244,11 @@ def find_video(plex_search, video_ids, videos=None): for episode in episodes: episode_videos.append(episode) - return True, episode_videos + return episode_videos - return False, [] + return None except Exception: - return False, [] + return None def get_video_status(plex_search, video_ids, videos): @@ -286,31 +286,44 @@ def get_video_status(plex_search, video_ids, videos): return None -def update_user_watched(user, user_plex, library, videos, dryrun): +def update_user_watched(user, user_plex, library, watched_videos, dryrun): try: logger(f"Plex: Updating watched for {user.title} in library {library}", 1) ( - videos_shows_ids, - videos_episodes_ids, - videos_movies_ids, - ) = generate_library_guids_dict(videos) + watched_shows_ids, + watched_episodes_ids, + watched_movies_ids, + ) = generate_library_guids_dict(watched_videos) + + if ( + not watched_movies_ids + and not watched_shows_ids + and not watched_episodes_ids + ): + logger( + f"Jellyfin: No videos to mark as watched for {user.title} in library {library}", + 1, + ) + + return + logger( - f"Plex: mark list\nShows: {videos_shows_ids}\nEpisodes: {videos_episodes_ids}\nMovies: {videos_movies_ids}", + f"Plex: mark list\nShows: {watched_shows_ids}\nEpisodes: {watched_episodes_ids}\nMovies: {watched_movies_ids}", 1, ) library_videos = user_plex.library.section(library) - if videos_movies_ids: - for movies_search in library_videos.search(unwatched=True): - video_status = get_video_status( - movies_search, videos_movies_ids, videos + if watched_movies_ids: + for plex_movie in library_videos.search(unwatched=True): + watched_movie_status = get_video_status( + plex_movie, watched_movies_ids, watched_videos ) - if video_status: - if video_status["completed"]: - msg = f"Plex: {movies_search.title} as watched for {user.title} in {library}" + if watched_movie_status: + if watched_movie_status["completed"]: + msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library}" if not dryrun: logger(msg, 5) - movies_search.markWatched() + plex_movie.markWatched() else: logger(msg, 6) @@ -319,15 +332,15 @@ def update_user_watched(user, user_plex, library, videos, dryrun): user_plex.friendlyName, user.title, library, - movies_search.title, + plex_movie.title, None, None, ) - elif video_status["time"] > 60_000: - msg = f"Plex: {movies_search.title} as partially watched for {floor(video_status['time'] / 60_000)} minutes for {user.title} in {library}" + elif watched_movie_status["time"] > 60_000: + msg = f"Plex: {plex_movie.title} as partially watched for {floor(watched_movie_status['time'] / 60_000)} minutes for {user.title} in {library}" if not dryrun: logger(msg, 5) - movies_search.updateTimeline(video_status["time"]) + plex_movie.updateTimeline(watched_movie_status["time"]) else: logger(msg, 6) @@ -336,31 +349,33 @@ def update_user_watched(user, user_plex, library, videos, dryrun): user_plex.friendlyName, user.title, library, - movies_search.title, - duration=video_status["time"], + plex_movie.title, + duration=watched_movie_status["time"], ) else: logger( - f"Plex: Skipping movie {movies_search.title} as it is not in mark list for {user.title}", + f"Plex: Skipping movie {plex_movie.title} as it is not in mark list for {user.title}", 1, ) - if videos_shows_ids and videos_episodes_ids: - for show_search in library_videos.search(unwatched=True): - show_found, episode_videos = find_video( - show_search, videos_shows_ids, videos + if watched_shows_ids and watched_episodes_ids: + for plex_show in library_videos.search(unwatched=True): + watched_show_episodes_status = find_video( + plex_show, watched_shows_ids, watched_videos ) - if show_found: - for episode_search in show_search.episodes(): - video_status = get_video_status( - episode_search, videos_episodes_ids, episode_videos + if watched_show_episodes_status: + for plex_episode in plex_show.episodes(): + watched_episode_status = get_video_status( + plex_episode, + watched_episodes_ids, + watched_show_episodes_status, ) - if video_status: - if video_status["completed"]: - msg = f"Plex: {show_search.title} {episode_search.title} as watched for {user.title} in {library}" + if watched_episode_status: + if watched_episode_status["completed"]: + msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library}" if not dryrun: logger(msg, 5) - episode_search.markWatched() + plex_episode.markWatched() else: logger(msg, 6) @@ -369,14 +384,16 @@ def update_user_watched(user, user_plex, library, videos, dryrun): user_plex.friendlyName, user.title, library, - show_search.title, - episode_search.title, + plex_show.title, + plex_episode.title, ) else: - msg = f"Plex: {show_search.title} {episode_search.title} as partially watched for {floor(video_status['time'] / 60_000)} minutes for {user.title} in {library}" + msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(watched_episode_status['time'] / 60_000)} minutes for {user.title} in {library}" if not dryrun: logger(msg, 5) - episode_search.updateTimeline(video_status["time"]) + plex_episode.updateTimeline( + watched_episode_status["time"] + ) else: logger(msg, 6) @@ -385,27 +402,21 @@ def update_user_watched(user, user_plex, library, videos, dryrun): user_plex.friendlyName, user.title, library, - show_search.title, - episode_search.title, - video_status["time"], + plex_show.title, + plex_episode.title, + watched_episode_status["time"], ) else: logger( - f"Plex: Skipping episode {episode_search.title} as it is not in mark list for {user.title}", + f"Plex: Skipping episode {plex_episode.title} as it is not in mark list for {user.title}", 3, ) else: logger( - f"Plex: Skipping show {show_search.title} as it is not in mark list for {user.title}", + f"Plex: Skipping show {plex_show.title} as it is not in mark list for {user.title}", 3, ) - if not videos_movies_ids and not videos_shows_ids and not videos_episodes_ids: - logger( - f"Jellyfin: No videos to mark as watched for {user.title} in library {library}", - 1, - ) - except Exception as e: logger( f"Plex: Failed to update watched for {user.title} in library {library}, Error: {e}", @@ -477,7 +488,7 @@ def get_users(self): logger(f"Plex: Failed to get users, Error: {e}", 2) raise Exception(e) - def get_libraries(self): + def get_libraries(self) -> dict[str, str]: try: output = {} @@ -545,10 +556,7 @@ def update_watched( user_other = None # If type of user is dict if user_mapping: - if user in user_mapping.keys(): - user_other = user_mapping[user] - elif user in user_mapping.values(): - user_other = search_mapping(user_mapping, user) + user_other = search_mapping(user_mapping, user) for index, value in enumerate(self.users): username_title = ( @@ -588,13 +596,10 @@ def update_watched( ) continue - for library, videos in libraries.items(): + for library, watched_videos in libraries.items(): library_other = None if library_mapping: - if library in library_mapping.keys(): - library_other = library_mapping[library] - elif library in library_mapping.values(): - library_other = search_mapping(library_mapping, library) + library_other = search_mapping(library_mapping, library) # if library in plex library list library_list = user_plex.library.sections() @@ -627,7 +632,7 @@ def update_watched( user, user_plex, library, - videos, + watched_videos, dryrun, ] ) diff --git a/src/users.py b/src/users.py index 3c3e68e..0a3da71 100644 --- a/src/users.py +++ b/src/users.py @@ -1,30 +1,37 @@ +from typing import Literal +from plexapi.myplex import MyPlexAccount +from src.emby import Emby +from src.jellyfin import Jellyfin +from src.plex import Plex from src.functions import ( logger, search_mapping, ) -def generate_user_list(server): +def generate_user_list(server: Plex | Jellyfin | Emby) -> list[str]: # generate list of users from server 1 and server 2 - server_type = server[0] - server_connection = server[1] - server_users = [] - if server_type == "plex": - for user in server_connection.users: + server_users: list[str] = [] + if isinstance(server, Plex): + for user in server.users: server_users.append( user.username.lower() if user.username else user.title.lower() ) - elif server_type in ["jellyfin", "emby"]: - server_users = [key.lower() for key in server_connection.users.keys()] + elif isinstance(server, (Jellyfin, Emby)): + server_users = [key.lower() for key in server.users.keys()] return server_users -def combine_user_lists(server_1_users, server_2_users, user_mapping): +def combine_user_lists( + server_1_users: list[str], + server_2_users: list[str], + user_mapping: dict[str, str] | None, +) -> dict[str, str]: # combined list of overlapping users from plex and jellyfin - users = {} + users: dict[str, str] = {} for server_1_user in server_1_users: if user_mapping: @@ -49,8 +56,10 @@ def combine_user_lists(server_1_users, server_2_users, user_mapping): return users -def filter_user_lists(users, blacklist_users, whitelist_users): - users_filtered = {} +def filter_user_lists( + users: dict[str, str], blacklist_users: list[str], whitelist_users: list[str] +) -> dict[str, str]: + users_filtered: dict[str, str] = {} for user in users: # whitelist_user is not empty and user lowercase is not in whitelist lowercase if len(whitelist_users) > 0: @@ -64,12 +73,13 @@ def filter_user_lists(users, blacklist_users, whitelist_users): return users_filtered -def generate_server_users(server, users): - server_users = None - - if server[0] == "plex": - server_users = [] - for plex_user in server[1].users: +def generate_server_users( + server: Plex | Jellyfin | Emby, + users: dict[str, str], +) -> list[MyPlexAccount] | dict[str, str] | None: + if isinstance(server, Plex): + plex_server_users: list[MyPlexAccount] = [] + for plex_user in server.users: username_title = ( plex_user.username if plex_user.username else plex_user.title ) @@ -78,22 +88,30 @@ def generate_server_users(server, users): username_title.lower() in users.keys() or username_title.lower() in users.values() ): - server_users.append(plex_user) - elif server[0] in ["jellyfin", "emby"]: - server_users = {} - for jellyfin_user, jellyfin_id in server[1].users.items(): + plex_server_users.append(plex_user) + + return plex_server_users + elif isinstance(server, (Jellyfin, Emby)): + jelly_emby_server_users: dict[str, str] = {} + for jellyfin_user, jellyfin_id in server.users.items(): if ( jellyfin_user.lower() in users.keys() or jellyfin_user.lower() in users.values() ): - server_users[jellyfin_user] = jellyfin_id + jelly_emby_server_users[jellyfin_user] = jellyfin_id - return server_users + return jelly_emby_server_users + + return None def setup_users( - server_1, server_2, blacklist_users, whitelist_users, user_mapping=None -): + server_1: Plex | Jellyfin | Emby, + server_2: Plex | Jellyfin | Emby, + blacklist_users: list[str], + whitelist_users: list[str], + user_mapping: dict[str, str] | None = None, +) -> tuple[list[MyPlexAccount] | dict[str, str], list[MyPlexAccount] | dict[str, str]]: server_1_users = generate_user_list(server_1) server_2_users = generate_user_list(server_2) logger(f"Server 1 users: {server_1_users}", 1) @@ -111,12 +129,12 @@ def setup_users( # Check if users is none or empty if output_server_1_users is None or len(output_server_1_users) == 0: logger( - f"No users found for server 1 {server_1[0]}, users: {server_1_users}, overlapping users {users}, filtered users {users_filtered}, server 1 users {server_1[1].users}" + f"No users found for server 1 {type(server_1)}, users: {server_1_users}, overlapping users {users}, filtered users {users_filtered}, server 1 users {server_1.users}" ) if output_server_2_users is None or len(output_server_2_users) == 0: logger( - f"No users found for server 2 {server_2[0]}, users: {server_2_users}, overlapping users {users} filtered users {users_filtered}, server 2 users {server_2[1].users}" + f"No users found for server 2 {type(server_2)}, users: {server_2_users}, overlapping users {users} filtered users {users_filtered}, server 2 users {server_2.users}" ) if ( diff --git a/test/test_black_white.py b/test/test_black_white.py index be50007..b72d5b8 100644 --- a/test/test_black_white.py +++ b/test/test_black_white.py @@ -18,12 +18,12 @@ def test_setup_black_white_lists(): # Simple - blacklist_library = "library1, library2" - whitelist_library = "library1, library2" - blacklist_library_type = "library_type1, library_type2" - whitelist_library_type = "library_type1, library_type2" - blacklist_users = "user1, user2" - whitelist_users = "user1, user2" + blacklist_library = ["library1", "library2"] + whitelist_library = ["library1", "library2"] + blacklist_library_type = ["library_type1", "library_type2"] + whitelist_library_type = ["library_type1", "library_type2"] + blacklist_users = ["user1", "user2"] + whitelist_users = ["user1", "user2"] ( results_blacklist_library, @@ -48,6 +48,15 @@ def test_setup_black_white_lists(): assert return_blacklist_users == ["user1", "user2"] assert return_whitelist_users == ["user1", "user2"] + +def test_library_mapping_black_white_list(): + blacklist_library = ["library1", "library2"] + whitelist_library = ["library1", "library2"] + blacklist_library_type = ["library_type1", "library_type2"] + whitelist_library_type = ["library_type1", "library_type2"] + blacklist_users = ["user1", "user2"] + whitelist_users = ["user1", "user2"] + # Library Mapping and user mapping library_mapping = {"library1": "library3"} user_mapping = {"user1": "user3"} diff --git a/test/test_main.py b/test/test_main.py deleted file mode 100644 index 611ba12..0000000 --- a/test/test_main.py +++ /dev/null @@ -1,78 +0,0 @@ -import sys -import os - -# getting the name of the directory -# where the this file is present. -current = os.path.dirname(os.path.realpath(__file__)) - -# Getting the parent directory name -# where the current directory is present. -parent = os.path.dirname(current) - -# adding the parent directory to -# the sys.path. -sys.path.append(parent) - -from src.black_white import setup_black_white_lists - - -def test_setup_black_white_lists(): - # Simple - blacklist_library = "library1, library2" - whitelist_library = "library1, library2" - blacklist_library_type = "library_type1, library_type2" - whitelist_library_type = "library_type1, library_type2" - blacklist_users = "user1, user2" - whitelist_users = "user1, user2" - - ( - results_blacklist_library, - return_whitelist_library, - return_blacklist_library_type, - return_whitelist_library_type, - return_blacklist_users, - return_whitelist_users, - ) = setup_black_white_lists( - blacklist_library, - whitelist_library, - blacklist_library_type, - whitelist_library_type, - blacklist_users, - whitelist_users, - ) - - assert results_blacklist_library == ["library1", "library2"] - assert return_whitelist_library == ["library1", "library2"] - assert return_blacklist_library_type == ["library_type1", "library_type2"] - assert return_whitelist_library_type == ["library_type1", "library_type2"] - assert return_blacklist_users == ["user1", "user2"] - assert return_whitelist_users == ["user1", "user2"] - - # Library Mapping and user mapping - library_mapping = {"library1": "library3"} - user_mapping = {"user1": "user3"} - - ( - results_blacklist_library, - return_whitelist_library, - return_blacklist_library_type, - return_whitelist_library_type, - return_blacklist_users, - return_whitelist_users, - ) = setup_black_white_lists( - blacklist_library, - whitelist_library, - blacklist_library_type, - whitelist_library_type, - blacklist_users, - whitelist_users, - library_mapping, - user_mapping, - ) - - assert results_blacklist_library == ["library1", "library2", "library3"] - assert return_whitelist_library == ["library1", "library2", "library3"] - assert return_blacklist_library_type == ["library_type1", "library_type2"] - assert return_whitelist_library_type == ["library_type1", "library_type2"] - assert return_blacklist_users == ["user1", "user2", "user3"] - assert return_whitelist_users == ["user1", "user2", "user3"] diff --git a/test/validate_ci_marklog.py b/test/validate_ci_marklog.py index 830c172..263bde3 100644 --- a/test/validate_ci_marklog.py +++ b/test/validate_ci_marklog.py @@ -128,8 +128,7 @@ def main(): expected_locations = expected_emby + expected_plex + expected_jellyfin # Remove Custom Movies/TV Shows as they should not have guids - expected_guids = [item for item in expected_locations if "Custom" not in item ] - + expected_guids = [item for item in expected_locations if "Custom" not in item] expected_write = [ "Plex/JellyPlex-CI/jellyplex_watched/Custom Movies/Movie Two (2021)", @@ -171,7 +170,7 @@ def main(): "Jellyfin/Jellyfin-Server/JellyUser/Custom Movies/Movie Three (2022)", "Jellyfin/Jellyfin-Server/JellyUser/Custom TV Shows/Greatest Show Ever (3000)/S01E03", "Jellyfin/Jellyfin-Server/JellyUser/Movies/Tears of Steel", - "Jellyfin/Jellyfin-Server/JellyUser/Shows/Monarch: Legacy of Monsters/Parallels and Interiors/4" + "Jellyfin/Jellyfin-Server/JellyUser/Shows/Monarch: Legacy of Monsters/Parallels and Interiors/4", ] # Expected values for the mark.log file, dry-run is slightly different than write-run