diff --git a/lib/galaxy/datatypes/dataproviders/hierarchy.py b/lib/galaxy/datatypes/dataproviders/hierarchy.py index e0773975548f..6ee9d5b992c4 100644 --- a/lib/galaxy/datatypes/dataproviders/hierarchy.py +++ b/lib/galaxy/datatypes/dataproviders/hierarchy.py @@ -3,7 +3,10 @@ """ import logging -from galaxy.util import etree +from galaxy.util import ( + Element, + etree, +) from . import line _TODO = """ @@ -65,7 +68,6 @@ def matches_selector(self, element, selector=None): # TODO: add more flexibility here w/o re-implementing xpath # TODO: fails with '#' - browser thinks it's an anchor - use urlencode # TODO: need removal/replacement of etree namespacing here - then move to string match - Element = getattr(etree, "_Element", etree.Element) return bool((selector is None) or (isinstance(element, Element) and selector in element.tag)) def element_as_dict(self, element): diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index 0cfa3f7148ea..17aa22c8d60d 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -67,18 +67,15 @@ def load_job_config_dict(job_conf_dict): job_conf_path = join(dirname(self.config_file), job_conf_path) if ".xml" in job_conf_path: try: - try: - for plugin in parse_xml(job_conf_path).find("plugins").findall("plugin"): + job_conf_tree = parse_xml(job_conf_path) + plugins_elem = job_conf_tree.find("plugins") + if plugins_elem: + for plugin in plugins_elem.findall("plugin"): if "load" in plugin.attrib: self.job_runners.append(plugin.attrib["load"]) - except OSError: - pass - try: - for plugin in parse_xml(job_conf_path).findall('.//destination/param[@id="rules_module"]'): - self.job_rule_modules.append(plugin.text) - except OSError: - pass - except etree.ParseError: + for plugin in job_conf_tree.findall('.//destination/param[@id="rules_module"]'): + self.job_rule_modules.append(plugin.text) + except (OSError, etree.ParseError): pass else: try: diff --git a/lib/galaxy/dependencies/pinned-typecheck-requirements.txt b/lib/galaxy/dependencies/pinned-typecheck-requirements.txt index 77f07fd86ab1..2bd9345bf304 100644 --- a/lib/galaxy/dependencies/pinned-typecheck-requirements.txt +++ b/lib/galaxy/dependencies/pinned-typecheck-requirements.txt @@ -1,6 +1,7 @@ annotated-types==0.6.0 ; python_version >= "3.8" and python_version < "3.13" cffi==1.16.0 ; python_version >= "3.8" and python_version < "3.13" cryptography==41.0.7 ; python_version >= "3.8" and python_version < "3.13" +lxml-stubs==0.5.1 ; python_version >= "3.8" and python_version < "3.13" mypy-extensions==1.0.0 ; python_version >= "3.8" and python_version < "3.13" mypy==1.8.0 ; python_version >= "3.8" and python_version < "3.13" pycparser==2.21 ; python_version >= "3.8" and python_version < "3.13" diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 48a22bae5d10..6ae85cb7e34f 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -79,7 +79,6 @@ ToolEvaluator, ) from galaxy.util import ( - etree, parse_xml_string, RWXRWXRWX, safe_makedirs, @@ -643,7 +642,7 @@ def get_tool_resource_xml(self, tool_id, tool_type): field_name, self.resource_parameters ) raise KeyError(message) - fields.append(etree.fromstring(self.resource_parameters[field_name])) + fields.append(parse_xml_string(self.resource_parameters[field_name])) if fields: conditional_element = parse_xml_string(self.JOB_RESOURCE_CONDITIONAL_XML) diff --git a/lib/galaxy/jobs/dynamic_tool_destination.py b/lib/galaxy/jobs/dynamic_tool_destination.py index 53664b33ece5..9e61e9adb88f 100755 --- a/lib/galaxy/jobs/dynamic_tool_destination.py +++ b/lib/galaxy/jobs/dynamic_tool_destination.py @@ -1647,14 +1647,14 @@ def get_destination_list_from_job_config(job_config_location) -> set: # Add all destination IDs from the job configuration xml file for destination in job_conf.getroot().iter("destination"): - if isinstance(destination.get("id"), str): - destination_list.add(destination.get("id")) - + destination_id = destination.get("id") + if destination_id: + destination_list.add(destination_id) else: error = f"Destination ID '{str(destination)}" error += "' in job configuration file cannot be" error += " parsed. Things may not work as expected!" - log.debug(error) + log.warning(error) return destination_list diff --git a/lib/galaxy/jobs/runners/util/cli/job/__init__.py b/lib/galaxy/jobs/runners/util/cli/job/__init__.py index 000c1f38942b..2556df5d2aec 100644 --- a/lib/galaxy/jobs/runners/util/cli/job/__init__.py +++ b/lib/galaxy/jobs/runners/util/cli/job/__init__.py @@ -6,6 +6,10 @@ abstractmethod, ) from enum import Enum +from typing import ( + Dict, + List, +) try: from galaxy.model import Job @@ -60,7 +64,7 @@ def get_single_status(self, job_id): """ @abstractmethod - def parse_status(self, status, job_ids): + def parse_status(self, status: str, job_ids: List[str]) -> Dict[str, str]: """ Parse the statuses of output from get_status command. """ diff --git a/lib/galaxy/jobs/runners/util/cli/job/lsf.py b/lib/galaxy/jobs/runners/util/cli/job/lsf.py index cdaedf447a42..2a5903e4b486 100644 --- a/lib/galaxy/jobs/runners/util/cli/job/lsf.py +++ b/lib/galaxy/jobs/runners/util/cli/job/lsf.py @@ -99,7 +99,7 @@ def parse_failure_reason(self, reason, job_id): return runner_states.WALLTIME_REACHED return None - def _get_job_state(self, state): + def _get_job_state(self, state: str) -> str: # based on: # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/job_state_lsf.html # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html @@ -115,7 +115,7 @@ def _get_job_state(self, state): "UNKWN": job_states.ERROR, "WAIT": job_states.QUEUED, "ZOMBI": job_states.ERROR, - }.get(state) + }[state] except KeyError: raise KeyError(f"Failed to map LSF status code [{state}] to job state.") diff --git a/lib/galaxy/jobs/runners/util/cli/job/slurm.py b/lib/galaxy/jobs/runners/util/cli/job/slurm.py index 5c1626d7315f..66eda28fb28c 100644 --- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py @@ -64,7 +64,7 @@ def parse_single_status(self, status, job_id): # else line like "slurm_load_jobs error: Invalid job id specified" return job_states.OK - def _get_job_state(self, state): + def _get_job_state(self, state: str) -> str: try: return { "F": job_states.ERROR, @@ -72,7 +72,7 @@ def _get_job_state(self, state): "CG": job_states.RUNNING, "PD": job_states.QUEUED, "CD": job_states.OK, - }.get(state) + }[state] except KeyError: raise KeyError(f"Failed to map slurm status code [{state}] to job state.") diff --git a/lib/galaxy/jobs/runners/util/cli/job/torque.py b/lib/galaxy/jobs/runners/util/cli/job/torque.py index 56754b142b58..1e7bad382789 100644 --- a/lib/galaxy/jobs/runners/util/cli/job/torque.py +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -77,14 +77,19 @@ def parse_status(self, status, job_ids): tree = None if tree is None: log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}") - return None + return {} else: for job in tree.findall("Job"): - id = job.find("Job_Id").text - if id in job_ids: - state = job.find("job_state").text + job_id_elem = job.find("Job_Id") + assert job_id_elem is not None + id_ = job_id_elem.text + if id_ in job_ids: + job_state_elem = job.find("job_state") + assert job_state_elem is not None + state = job_state_elem.text + assert state # map PBS job states to Galaxy job states. - rval[id] = self._get_job_state(state) + rval[id_] = self._get_job_state(state) return rval def parse_single_status(self, status, job_id): @@ -95,11 +100,9 @@ def parse_single_status(self, status, job_id): # no state found, job has exited return job_states.OK - def _get_job_state(self, state): + def _get_job_state(self, state: str) -> str: try: - return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}.get( - state - ) + return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}[state] except KeyError: raise KeyError(f"Failed to map torque status code [{state}] to job state.") diff --git a/lib/galaxy/tool_shed/galaxy_install/tools/data_manager.py b/lib/galaxy/tool_shed/galaxy_install/tools/data_manager.py index f2cbcdc072af..95af11b21f53 100644 --- a/lib/galaxy/tool_shed/galaxy_install/tools/data_manager.py +++ b/lib/galaxy/tool_shed/galaxy_install/tools/data_manager.py @@ -111,8 +111,8 @@ def install_data_managers( data_manager_id = elem.get("id", None) if data_manager_id is None: log.error( - "A data manager was defined that does not have an id and will not be installed:\n%s" - % xml_to_string(elem) + "A data manager was defined that does not have an id and will not be installed:\n%s", + xml_to_string(elem), ) continue data_manager_dict = ( @@ -170,7 +170,7 @@ def install_data_managers( ) if data_manager: rval.append(data_manager) - elif elem.tag is etree.Comment: + elif elem.tag is etree.Comment: # type: ignore[comparison-overlap] pass else: log.warning(f"Encountered unexpected element '{elem.tag}':\n{xml_to_string(elem)}") diff --git a/lib/galaxy/tool_shed/galaxy_install/tools/tool_panel_manager.py b/lib/galaxy/tool_shed/galaxy_install/tools/tool_panel_manager.py index e4d1dc3c19bd..1539590b3ee9 100644 --- a/lib/galaxy/tool_shed/galaxy_install/tools/tool_panel_manager.py +++ b/lib/galaxy/tool_shed/galaxy_install/tools/tool_panel_manager.py @@ -13,8 +13,9 @@ from galaxy.tool_shed.util.repository_util import get_repository_owner from galaxy.tool_shed.util.shed_util_common import get_tool_panel_config_tool_path_install_dir from galaxy.util import ( - etree, + Element, parse_xml_string, + SubElement, xml_to_string, ) from galaxy.util.renamed_temporary_file import RenamedTemporaryFile @@ -154,27 +155,27 @@ def config_elems_to_xml_file(self, config_elems, config_filename, tool_path, too def generate_tool_elem( self, tool_shed, repository_name, changeset_revision, owner, tool_file_path, tool, tool_section - ): + ) -> Element: """Create and return an ElementTree tool Element.""" if tool_section is not None: - tool_elem = etree.SubElement(tool_section, "tool") + tool_elem = SubElement(tool_section, "tool") else: - tool_elem = etree.Element("tool") + tool_elem = Element("tool") tool_elem.attrib["file"] = tool_file_path if not tool.guid: raise ValueError("tool has no guid") tool_elem.attrib["guid"] = tool.guid - tool_shed_elem = etree.SubElement(tool_elem, "tool_shed") + tool_shed_elem = SubElement(tool_elem, "tool_shed") tool_shed_elem.text = tool_shed - repository_name_elem = etree.SubElement(tool_elem, "repository_name") + repository_name_elem = SubElement(tool_elem, "repository_name") repository_name_elem.text = repository_name - repository_owner_elem = etree.SubElement(tool_elem, "repository_owner") + repository_owner_elem = SubElement(tool_elem, "repository_owner") repository_owner_elem.text = owner - changeset_revision_elem = etree.SubElement(tool_elem, "installed_changeset_revision") + changeset_revision_elem = SubElement(tool_elem, "installed_changeset_revision") changeset_revision_elem.text = changeset_revision - id_elem = etree.SubElement(tool_elem, "id") + id_elem = SubElement(tool_elem, "id") id_elem.text = tool.id - version_elem = etree.SubElement(tool_elem, "version") + version_elem = SubElement(tool_elem, "version") version_elem.text = tool.version return tool_elem @@ -297,7 +298,7 @@ def generate_tool_panel_elem_list( owner="", ): """Generate a list of ElementTree Element objects for each section or tool.""" - elem_list: List[etree.Element] = [] + elem_list: List[Element] = [] tool_elem = None cleaned_repository_clone_url = remove_protocol_and_user_from_clone_url(repository_clone_url) if not owner: @@ -335,6 +336,7 @@ def generate_tool_panel_elem_list( tool_section if inside_section else None, ) if inside_section: + assert tool_section is not None if section_in_elem_list is not None: elem_list[section_in_elem_list] = tool_section else: @@ -366,17 +368,13 @@ def generate_tool_section_dicts(self, tool_config=None, tool_sections=None) -> L tool_section_dicts.append(dict(tool_config=tool_config, id="", version="", name="")) return tool_section_dicts - def generate_tool_section_element_from_dict(self, tool_section_dict): + def generate_tool_section_element_from_dict(self, tool_section_dict: Dict[str, str]) -> Element: # The value of tool_section_dict looks like the following. # { id: , version : , name : } - if tool_section_dict["id"]: - # Create a new tool section. - tool_section = etree.Element("section") - tool_section.attrib["id"] = tool_section_dict["id"] - tool_section.attrib["name"] = tool_section_dict["name"] - tool_section.attrib["version"] = tool_section_dict["version"] - else: - tool_section = None + tool_section = Element("section") + tool_section.attrib["id"] = tool_section_dict["id"] + tool_section.attrib["name"] = tool_section_dict["name"] + tool_section.attrib["version"] = tool_section_dict["version"] return tool_section def get_or_create_tool_section(self, toolbox, tool_panel_section_id, new_tool_panel_section_label=None): diff --git a/lib/galaxy/tool_shed/metadata/metadata_generator.py b/lib/galaxy/tool_shed/metadata/metadata_generator.py index 71d3f666f7ad..a93139c17afd 100644 --- a/lib/galaxy/tool_shed/metadata/metadata_generator.py +++ b/lib/galaxy/tool_shed/metadata/metadata_generator.py @@ -492,9 +492,9 @@ def generate_repository_dependency_metadata(self, repository_dependencies_config root = tree.getroot() xml_is_valid = root.tag == "repositories" if xml_is_valid: - invalid_repository_dependencies_dict = dict(description=root.get("description")) + invalid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description")) invalid_repository_dependency_tups = [] - valid_repository_dependencies_dict = dict(description=root.get("description")) + valid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description")) valid_repository_dependency_tups = [] for repository_elem in root.findall("repository"): repository_dependency_tup, repository_dependency_is_valid, err_msg = self.handle_repository_elem( diff --git a/lib/galaxy/tool_shed/tools/data_table_manager.py b/lib/galaxy/tool_shed/tools/data_table_manager.py index 4ca5dcfdc5d1..f5d0a5669a7d 100644 --- a/lib/galaxy/tool_shed/tools/data_table_manager.py +++ b/lib/galaxy/tool_shed/tools/data_table_manager.py @@ -9,7 +9,10 @@ from galaxy.tool_shed.galaxy_install.client import InstallationTarget from galaxy.tool_shed.util import hg_util -from galaxy.util import etree +from galaxy.util import ( + Element, + SubElement, +) from galaxy.util.tool_shed import xml_util if TYPE_CHECKING: @@ -29,24 +32,24 @@ def __init__(self, app: RequiredAppT): def generate_repository_info_elem( self, tool_shed: str, repository_name: str, changeset_revision: str, owner: str, parent_elem=None, **kwd - ) -> etree.Element: + ) -> Element: """Create and return an ElementTree repository info Element.""" if parent_elem is None: - elem = etree.Element("tool_shed_repository") + elem = Element("tool_shed_repository") else: - elem = etree.SubElement(parent_elem, "tool_shed_repository") - tool_shed_elem = etree.SubElement(elem, "tool_shed") + elem = SubElement(parent_elem, "tool_shed_repository") + tool_shed_elem = SubElement(elem, "tool_shed") tool_shed_elem.text = tool_shed - repository_name_elem = etree.SubElement(elem, "repository_name") + repository_name_elem = SubElement(elem, "repository_name") repository_name_elem.text = repository_name - repository_owner_elem = etree.SubElement(elem, "repository_owner") + repository_owner_elem = SubElement(elem, "repository_owner") repository_owner_elem.text = owner - changeset_revision_elem = etree.SubElement(elem, "installed_changeset_revision") + changeset_revision_elem = SubElement(elem, "installed_changeset_revision") changeset_revision_elem.text = changeset_revision # add additional values # TODO: enhance additional values to allow e.g. use of dict values that will recurse for key, value in kwd.items(): - new_elem = etree.SubElement(elem, key) + new_elem = SubElement(elem, key) new_elem.text = value return elem diff --git a/lib/galaxy/tool_util/data/__init__.py b/lib/galaxy/tool_util/data/__init__.py index 77f03cf9c7e0..6597124536ba 100644 --- a/lib/galaxy/tool_util/data/__init__.py +++ b/lib/galaxy/tool_util/data/__init__.py @@ -169,8 +169,9 @@ def __init__( filename: Optional[StrPath] = None, other_config_dict: Optional[StoresConfigFilePaths] = None, ) -> None: - self.name = config_element.get("name") - self.comment_char = config_element.get("comment_char") + name = config_element.get("name") + assert name + self.name = name self.empty_field_value = config_element.get("empty_field_value", "") self.empty_field_values: Dict[str, str] = {} self.allow_duplicate_entries = util.asbool(config_element.get("allow_duplicate_entries", True)) @@ -179,7 +180,7 @@ def __init__( self.tool_data_path = tool_data_path self.tool_data_path_files = tool_data_path_files self.other_config_dict = other_config_dict or {} - self.missing_index_file = None + self.missing_index_file: Optional[str] = None # increment this variable any time a new entry is added, or when the table is totally reloaded # This value has no external meaning, and does not represent an abstract version of the underlying data self._loaded_content_version = 1 @@ -348,11 +349,19 @@ def configure_and_load( # store repo info if available: repo_elem = config_element.find("tool_shed_repository") if repo_elem is not None: + tool_shed_elem = repo_elem.find("tool_shed") + assert tool_shed_elem is not None + repository_name_elem = repo_elem.find("repository_name") + assert repository_name_elem is not None + repository_owner_elem = repo_elem.find("repository_owner") + assert repository_owner_elem is not None + installed_changeset_revision_elem = repo_elem.find("installed_changeset_revision") + assert installed_changeset_revision_elem is not None repo_info = dict( - tool_shed=repo_elem.find("tool_shed").text, - name=repo_elem.find("repository_name").text, - owner=repo_elem.find("repository_owner").text, - installed_changeset_revision=repo_elem.find("installed_changeset_revision").text, + tool_shed=tool_shed_elem.text, + name=repository_name_elem.text, + owner=repository_owner_elem.text, + installed_changeset_revision=installed_changeset_revision_elem.text, ) else: repo_info = None @@ -375,12 +384,13 @@ def configure_and_load( tmp_file.flush() else: # Pull the filename from a global config - filename = file_element.get("from_config", None) or None + filename = file_element.get("from_config") if filename: filename = self.other_config_dict.get(filename, None) - filename = file_path = _expand_here_template(filename, here=self.here) + if filename: + filename = _expand_here_template(filename, here=self.here) found = False - if file_path is None: + if filename is None: log.debug( "Encountered a file element (%s) that does not contain a path value when loading tool data table '%s'.", util.xml_to_string(file_element), @@ -399,7 +409,7 @@ def configure_and_load( # regular galaxy app has and uses tool_data_path. # We're loading a tool in the tool shed, so we cannot use the Galaxy tool-data # directory which is hard-coded into the tool_data_table_conf.xml entries. - filename = os.path.split(file_path)[1] + filename = os.path.split(filename)[1] filename = os.path.join(tool_data_path, filename) if self.tool_data_path_files.exists(filename): found = True @@ -526,11 +536,11 @@ def parse_column_spec(self, config_element: Element) -> None: else: self.largest_index = 0 for column_elem in config_element.findall("column"): - name = column_elem.get("name", None) + name = column_elem.get("name") assert name is not None, "Required 'name' attribute missing from column def" - index = column_elem.get("index", None) - assert index is not None, "Required 'index' attribute missing from column def" - index = int(index) + index_attr = column_elem.get("index") + assert index_attr is not None, "Required 'index' attribute missing from column def" + index = int(index_attr) self.columns[name] = index if index > self.largest_index: self.largest_index = index diff --git a/lib/galaxy/tool_util/data/bundles/models.py b/lib/galaxy/tool_util/data/bundles/models.py index d37bd6d86c24..730c51c216f3 100644 --- a/lib/galaxy/tool_util/data/bundles/models.py +++ b/lib/galaxy/tool_util/data/bundles/models.py @@ -167,11 +167,11 @@ def _xml_to_data_table_output_column_move(move_elem: Element) -> DataTableBundle source_value = "" else: source_base = source_elem.get("base", None) - source_value = source_elem.text + source_value = source_elem.text or "" target_elem = move_elem.find("target") if target_elem is None: target_base = None - target_value = "" + target_value: Optional[str] = "" else: target_base = target_elem.get("base", None) target_value = target_elem.text diff --git a/lib/galaxy/tool_util/lint.py b/lib/galaxy/tool_util/lint.py index 44ab28b9790e..54390031a232 100644 --- a/lib/galaxy/tool_util/lint.py +++ b/lib/galaxy/tool_util/lint.py @@ -57,7 +57,7 @@ from galaxy.tool_util.parser import get_tool_source from galaxy.util import ( - etree, + Element, submodules, ) @@ -102,7 +102,7 @@ def __repr__(self) -> str: class XMLLintMessageLine(LintMessage): - def __init__(self, level: str, message: str, node: Optional[etree.Element] = None): + def __init__(self, level: str, message: str, node: Optional[Element] = None): super().__init__(level, message) self.line = None if node is not None: @@ -118,7 +118,7 @@ def __str__(self) -> str: class XMLLintMessageXPath(LintMessage): - def __init__(self, level: str, message: str, node: Optional[etree.Element] = None): + def __init__(self, level: str, message: str, node: Optional[Element] = None): super().__init__(level, message) self.xpath = None if node is not None: diff --git a/lib/galaxy/tool_util/parser/xml.py b/lib/galaxy/tool_util/parser/xml.py index 96b3634c333b..5c723c3a851f 100644 --- a/lib/galaxy/tool_util/parser/xml.py +++ b/lib/galaxy/tool_util/parser/xml.py @@ -93,7 +93,6 @@ def destroy_tree(tree): def parse_change_format(change_format: Iterable[Element]) -> List[ChangeFormatModel]: change_models: List[ChangeFormatModel] = [] for change_elem in change_format: - change_elem = cast(Element, change_elem) for when_elem in change_elem.findall("when"): when_elem = cast(Element, when_elem) value: Optional[str] = when_elem.get("value", None) @@ -123,7 +122,6 @@ class XmlToolSource(ToolSource): """Responsible for parsing a tool from classic Galaxy representation.""" language = "xml" - root: Element def __init__(self, xml_tree: ElementTree, source_path=None, macro_paths=None): self.xml_tree = xml_tree diff --git a/lib/galaxy/tool_util/toolbox/base.py b/lib/galaxy/tool_util/toolbox/base.py index a146c93cde26..c2a2f1ee47b2 100644 --- a/lib/galaxy/tool_util/toolbox/base.py +++ b/lib/galaxy/tool_util/toolbox/base.py @@ -661,6 +661,9 @@ def _load_integrated_tool_panel_keys(self): section = ToolSection(elem) for section_elem in elem: section_id = section_elem.get("id") + assert ( + section_id + ), f"Element '{etree.tostring(section_elem, encoding='unicode')}' did not specify 'id' attribute" if section_elem.tag == "tool": section.elems.stub_tool(section_id) elif section_elem.tag == "workflow": diff --git a/lib/galaxy/tool_util/verify/__init__.py b/lib/galaxy/tool_util/verify/__init__.py index 364aa46489b4..83e6a2c3fd23 100644 --- a/lib/galaxy/tool_util/verify/__init__.py +++ b/lib/galaxy/tool_util/verify/__init__.py @@ -69,7 +69,7 @@ def get_filename(filename: str) -> str: assertions = attributes.get("assert_list", None) if attributes is not None and assertions is not None: try: - verify_assertions(output_content, attributes["assert_list"], attributes.get("decompress")) + verify_assertions(output_content, attributes["assert_list"], attributes.get("decompress", False)) except AssertionError as err: errmsg = f"{item_label} different than expected\n" errmsg += unicodify(err) diff --git a/lib/galaxy/tool_util/verify/asserts/__init__.py b/lib/galaxy/tool_util/verify/asserts/__init__.py index 1589b4b0c18d..3d34e5bdb18d 100644 --- a/lib/galaxy/tool_util/verify/asserts/__init__.py +++ b/lib/galaxy/tool_util/verify/asserts/__init__.py @@ -32,7 +32,7 @@ assertion_functions[member] = value -def verify_assertions(data: bytes, assertion_description_list, decompress=None): +def verify_assertions(data: bytes, assertion_description_list, decompress: bool = False): """This function takes a list of assertions and a string to check these assertions against.""" if decompress: diff --git a/lib/galaxy/tool_util/verify/asserts/xml.py b/lib/galaxy/tool_util/verify/asserts/xml.py index bdb53e3913e8..b80798436e2e 100644 --- a/lib/galaxy/tool_util/verify/asserts/xml.py +++ b/lib/galaxy/tool_util/verify/asserts/xml.py @@ -129,7 +129,7 @@ def assert_xml_element( if attribute is None or attribute == "": content = occ.text else: - content = occ.attrib[attribute] + content = occ.attrib[attribute] # type: ignore[assignment] # https://github.com/lxml/lxml-stubs/pull/99 try: verify_assertions_function(content, children) except AssertionError as e: diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 5831359a8411..a3b894d1e9b8 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -26,7 +26,6 @@ from urllib.parse import unquote_plus import webob.exc -from lxml import etree from mako.template import Template from packaging.version import Version from sqlalchemy import ( @@ -150,6 +149,7 @@ listify, Params, parse_xml_string, + parse_xml_string_to_etree, rst_to_html, string_as_bool, unicodify, @@ -535,7 +535,7 @@ def create_tool(self, config_file, tool_cache_data_dir=None, **kwds): if tool_document: tool_source = self.get_expanded_tool_source( config_file=config_file, - xml_tree=etree.ElementTree(etree.fromstring(tool_document["document"].encode("utf-8"))), + xml_tree=parse_xml_string_to_etree(tool_document["document"]), macro_paths=tool_document["macro_paths"], ) else: diff --git a/lib/galaxy/tools/data_manager/manager.py b/lib/galaxy/tools/data_manager/manager.py index 1c8d6c7b92b6..8da9f8e97f9e 100644 --- a/lib/galaxy/tools/data_manager/manager.py +++ b/lib/galaxy/tools/data_manager/manager.py @@ -35,7 +35,7 @@ def __init__(self, app: StructuredApp, xml_filename=None, reload_count: Optional self.app = app self.data_managers = {} self.managed_data_tables = {} - self.tool_path = None + self.tool_path: Optional[str] = None self.__reload_count = reload_count or 0 self.filename = xml_filename or self.app.config.data_manager_config_file for filename in util.listify(self.filename): @@ -143,13 +143,13 @@ class DataManager: tool: Optional[Tool] - def __init__(self, data_managers: DataManagers, elem: Element = None, tool_path: Optional[str] = None): + def __init__(self, data_managers: DataManagers, elem: Optional[Element] = None, tool_path: Optional[str] = None): self.data_managers = data_managers - self.declared_id = None - self.name = None - self.description = None + self.declared_id: Optional[str] = None + self.name: Optional[str] = None + self.description: Optional[str] = None self.version = self.DEFAULT_VERSION - self.guid = None + self.guid: Optional[str] = None self.tool = None self.tool_shed_repository_info: Optional[RepoInfo] = None self.undeclared_tables = False diff --git a/lib/galaxy/util/__init__.py b/lib/galaxy/util/__init__.py index 2f19adcf8cd5..6c5403ff6d60 100644 --- a/lib/galaxy/util/__init__.py +++ b/lib/galaxy/util/__init__.py @@ -35,9 +35,12 @@ from os.path import relpath from typing import ( Any, + cast, + Dict, Iterable, Iterator, List, + Mapping, Optional, overload, Tuple, @@ -58,7 +61,10 @@ ) from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry # type: ignore[import-untyped] -from typing_extensions import Literal +from typing_extensions import ( + Literal, + Self, +) try: import grp @@ -68,22 +74,52 @@ LXML_AVAILABLE = True try: from lxml import etree - from lxml.etree import _Element as Element + + # lxml.etree.Element is a function that returns a new instance of the + # lxml.etree._Element class. This class doesn't have a proper __init__() + # method, so we can add a __new__() constructor that mimicks the + # xml.etree.ElementTree.Element initialization. + class Element(etree._Element): + def __new__(cls, tag, attrib={}, **extra) -> Self: # noqa: B006 + return cast(Self, etree.Element(tag, attrib, **extra)) + + def __iter__(self) -> Iterator[Self]: # type: ignore[override] + return cast(Iterator[Self], super().__iter__()) + + def find(self, path: str, namespaces: Optional[Mapping[str, str]] = None) -> Union[Self, None]: + ret = super().find(path, namespaces) + if ret is not None: + return cast(Self, ret) + else: + return None + + def findall(self, path: str, namespaces: Optional[Mapping[str, str]] = None) -> List[Self]: # type: ignore[override] + return cast(List[Self], super().findall(path, namespaces)) + + def SubElement(parent: Element, tag: str, attrib: Optional[Dict[str, str]] = None, **extra) -> Element: + return cast(Element, etree.SubElement(parent, tag, attrib, **extra)) # lxml.etree.ElementTree is a function that returns a new instance of the - # lxml.etree._ElementTree class. This class doesn't have a proper - # __init__() method, so we can add a __new__() constructor that mimicks + # lxml.etree._ElementTree class. This class doesn't have a proper __init__() + # method, so we can add a __new__() constructor that mimicks the # xml.etree.ElementTree.ElementTree initialization. class ElementTree(etree._ElementTree): - def __new__(cls, element=None, file=None) -> etree.ElementTree: - return etree.ElementTree(element, file=file) + def __new__(cls, element=None, file=None) -> Self: + return cast(Self, etree.ElementTree(element, file=file)) + + def getroot(self) -> Element: + return cast(Element, super().getroot()) + + def XML(text: Union[str, bytes]) -> Element: + return cast(Element, etree.XML(text)) except ImportError: LXML_AVAILABLE = False import xml.etree.ElementTree as etree # type: ignore[assignment,no-redef] - from xml.etree.ElementTree import ( # type: ignore[assignment] + from xml.etree.ElementTree import ( # type: ignore[assignment] # noqa: F401 Element, ElementTree, + XML, ) from .custom_logging import get_logger @@ -129,8 +165,6 @@ def shlex_join(split_command): RWXR_XR_X = stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH RWXRWXRWX = stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO -XML = etree.XML - defaultdict = collections.defaultdict UNKNOWN = "unknown" @@ -311,7 +345,7 @@ def parse_xml( schema = etree.XMLSchema(schema_root) try: - tree = etree.parse(str(fname), parser=parser) + tree = cast(ElementTree, etree.parse(str(fname), parser=parser)) root = tree.getroot() if strip_whitespace: for elem in root.iter("*"): @@ -337,10 +371,11 @@ def parse_xml( def parse_xml_string(xml_string: str, strip_whitespace: bool = True) -> Element: try: - elem = etree.fromstring(xml_string) + elem = XML(xml_string) except ValueError as e: if "strings with encoding declaration are not supported" in unicodify(e): - elem = etree.fromstring(xml_string.encode("utf-8")) + # This happens with lxml for a string that starts with e.g. `` + elem = XML(xml_string.encode("utf-8")) else: raise e if strip_whitespace: @@ -993,7 +1028,7 @@ def parse_resource_parameters(resource_param_file): resource_definitions_root = resource_definitions.getroot() for parameter_elem in resource_definitions_root.findall("param"): name = parameter_elem.get("name") - resource_parameters[name] = etree.tostring(parameter_elem) + resource_parameters[name] = etree.tostring(parameter_elem, encoding="unicode") return resource_parameters diff --git a/lib/galaxy/util/tool_shed/xml_util.py b/lib/galaxy/util/tool_shed/xml_util.py index b7bc8fba1ac3..0aee018cd5e4 100644 --- a/lib/galaxy/util/tool_shed/xml_util.py +++ b/lib/galaxy/util/tool_shed/xml_util.py @@ -7,9 +7,9 @@ ) from galaxy.util import ( - etree, + Element, + ElementTree, parse_xml as galaxy_parse_xml, - unicodify, xml_to_string, ) from galaxy.util.path import StrPath @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) -def create_and_write_tmp_file(elem: etree.Element) -> str: +def create_and_write_tmp_file(elem: Element) -> str: tmp_str = xml_to_string(elem, pretty=True) with tempfile.NamedTemporaryFile(prefix="tmp-toolshed-cawrf", delete=False) as fh: tmp_filename = fh.name @@ -26,7 +26,7 @@ def create_and_write_tmp_file(elem: etree.Element) -> str: return tmp_filename -def parse_xml(file_name: StrPath, check_exists=True) -> Tuple[Optional[etree.ElementTree], str]: +def parse_xml(file_name: StrPath, check_exists=True) -> Tuple[Optional[ElementTree], str]: """Returns a parsed xml tree with comments intact.""" error_message = "" if check_exists and not os.path.exists(file_name): @@ -36,7 +36,7 @@ def parse_xml(file_name: StrPath, check_exists=True) -> Tuple[Optional[etree.Ele except OSError: raise except Exception as e: - error_message = f"Exception attempting to parse {str(file_name)}: {unicodify(e)}" + error_message = f"Exception attempting to parse {file_name}: {e}" log.exception(error_message) return None, error_message return tree, error_message diff --git a/lib/galaxy_test/selenium/test_workflow_editor.py b/lib/galaxy_test/selenium/test_workflow_editor.py index caa5de321d0c..d8bdd2c34018 100644 --- a/lib/galaxy_test/selenium/test_workflow_editor.py +++ b/lib/galaxy_test/selenium/test_workflow_editor.py @@ -1110,8 +1110,10 @@ def test_editor_place_comments(self): # check if all options were applied comment_content: WebElement = editor.comment.text_inner.wait_for_visible() assert comment_content.text == "Hello World" - assert "bold" in comment_content.get_attribute("class") - assert "italic" in comment_content.get_attribute("class") + comment_content_class = comment_content.get_attribute("class") + assert comment_content_class + assert "bold" in comment_content_class + assert "italic" in comment_content_class # check for correct size width, height = self.get_element_size(editor.comment._.wait_for_visible()) diff --git a/lib/tool_shed/dependencies/attribute_handlers.py b/lib/tool_shed/dependencies/attribute_handlers.py index ea611b132ab8..e581137bdb4f 100644 --- a/lib/tool_shed/dependencies/attribute_handlers.py +++ b/lib/tool_shed/dependencies/attribute_handlers.py @@ -10,7 +10,8 @@ from galaxy.util import ( asbool, - etree, + Element, + SubElement, ) from tool_shed.dependencies.tool import tag_attribute_handler from tool_shed.repository_types.util import ( @@ -228,13 +229,13 @@ def _create_element( tag: str, attributes: Optional[Dict[str, str]] = None, sub_elements: Optional[Dict[str, List[Tuple[str, str]]]] = None, -) -> Optional[etree.Element]: +) -> Optional[Element]: """ Create a new element whose tag is the value of the received tag, and whose attributes are all key / value pairs in the received attributes and sub_elements. """ if tag: - elem = etree.Element(tag) + elem = Element(tag) if attributes: # The received attributes is an odict to preserve ordering. for k, attribute_value in attributes.items(): @@ -247,14 +248,14 @@ def _create_element( if v: if k == "packages": # The received sub_elements is an odict whose key is 'packages' and whose - # value is a list of ( name, version ) tuples. + # value is a list of (name, version) tuples. for v_tuple in v: - sub_elem = etree.SubElement(elem, "package") + sub_elem = SubElement(elem, "package") sub_elem_name, sub_elem_version = v_tuple sub_elem.set("name", sub_elem_name) sub_elem.set("version", sub_elem_version) elif isinstance(v, list): - sub_elem = etree.SubElement(elem, k) + sub_elem = SubElement(elem, k) # If v is a list, then it must be a list of tuples where the first # item is the tag and the second item is the text value. for v_tuple in v: @@ -263,10 +264,10 @@ def _create_element( v_text = v_tuple[1] # Don't include fields that are blank. if v_text: - v_elem = etree.SubElement(sub_elem, v_tag) + v_elem = SubElement(sub_elem, v_tag) v_elem.text = v_text else: - sub_elem = etree.SubElement(elem, k) + sub_elem = SubElement(elem, k) sub_elem.text = v return elem return None diff --git a/pyproject.toml b/pyproject.toml index f72a74981b4e..6ce683587e31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -166,6 +166,7 @@ watchdog = "*" [tool.poetry.group.typecheck.dependencies] mypy = "*" +lxml-stubs = "*" pydantic = ">=2" # for pydantic.mypy plugin types-bleach = "*" types-boto = "*" diff --git a/test/unit/app/queue_worker/conftest.py b/test/unit/app/queue_worker/conftest.py index 3849e7b86904..af918f0b44ab 100644 --- a/test/unit/app/queue_worker/conftest.py +++ b/test/unit/app/queue_worker/conftest.py @@ -7,7 +7,7 @@ try: import psycopg except ImportError: - psycopg = None + psycopg = None # type: ignore[assignment] try: import psycopg2 diff --git a/test/unit/tool_util/data/test_tool_data_bundles.py b/test/unit/tool_util/data/test_tool_data_bundles.py index 9c6afddc5735..616323946a7f 100644 --- a/test/unit/tool_util/data/test_tool_data_bundles.py +++ b/test/unit/tool_util/data/test_tool_data_bundles.py @@ -28,6 +28,7 @@ def test_xml_parsing() -> None: tree = parse_xml(path) data_managers_el = tree.getroot() data_manager_el = data_managers_el.find("data_manager") + assert data_manager_el is not None description = convert_data_tables_xml(data_manager_el) assert not description.undeclared_tables assert len(description.data_tables) == 1 @@ -58,6 +59,7 @@ def test_parsing_manual() -> None: tree = parse_xml(path) data_managers_el = tree.getroot() data_manager_el = data_managers_el.find("data_manager") + assert data_manager_el is not None description = convert_data_tables_xml(data_manager_el) assert description.undeclared_tables assert len(description.data_tables) == 0 @@ -68,6 +70,7 @@ def test_parsing_mothur() -> None: tree = parse_xml(path) data_managers_el = tree.getroot() data_manager_el = data_managers_el.find("data_manager") + assert data_manager_el is not None description = convert_data_tables_xml(data_manager_el) assert not description.undeclared_tables assert len(description.data_tables) == 4 @@ -217,7 +220,6 @@ def test_undeclared_tables(tdt_manager, tmp_path): target_path.write_text("Moo Cow") output = {"data_tables": {"testalpha": [{"value": "newvalue", "name": "mynewname", "path": "newvalue.txt"}]}} output_dataset_path = tmp_path / "output.dat" - import json output_dataset_path.write_text(json.dumps(output)) extra_files_path = tmp_path / "extra" diff --git a/test/unit/tool_util/verify/test_asserts.py b/test/unit/tool_util/verify/test_asserts.py index cb415507c530..22d5b67b23f7 100644 --- a/test/unit/tool_util/verify/test_asserts.py +++ b/test/unit/tool_util/verify/test_asserts.py @@ -2,6 +2,7 @@ import os import shutil import tempfile +from typing import Tuple try: import h5py @@ -10,7 +11,7 @@ from galaxy.tool_util.parser.xml import __parse_assert_list_from_elem from galaxy.tool_util.verify import asserts -from galaxy.util import etree +from galaxy.util import parse_xml_string TABULAR_ASSERTION = """ @@ -1314,8 +1315,8 @@ def test_has_h5_attribute_failure(): assert len(a) == 1 -def run_assertions(assertion_xml, data, decompress=None): - assertion = etree.fromstring(assertion_xml) +def run_assertions(assertion_xml: str, data, decompress=False) -> Tuple: + assertion = parse_xml_string(assertion_xml) assertion_description = __parse_assert_list_from_elem(assertion) try: asserts.verify_assertions(data, assertion_description, decompress=decompress)