From d737e4c2433726f13edd8966f9e9a41210fc0d04 Mon Sep 17 00:00:00 2001 From: Enrico Straehler Date: Tue, 23 Apr 2024 16:32:55 +0200 Subject: [PATCH 01/10] Added compatibility for Garderos GRS-based devices. --- netmiko/base_connection.py | 2 + netmiko/garderos/__init__.py | 3 + netmiko/garderos/garderos_ssh.py | 484 +++++++++++++++++++++++++++++++ netmiko/ssh_dispatcher.py | 3 + 4 files changed, 492 insertions(+) create mode 100644 netmiko/garderos/__init__.py create mode 100644 netmiko/garderos/garderos_ssh.py diff --git a/netmiko/base_connection.py b/netmiko/base_connection.py index 39bac3b92..a7d8e5b56 100644 --- a/netmiko/base_connection.py +++ b/netmiko/base_connection.py @@ -2397,6 +2397,7 @@ def strip_ansi_escape_codes(self, string_buffer: str) -> str: code_cursor_down = chr(27) + r"\[\d*B" code_wrap_around = chr(27) + r"\[\?7h" code_bracketed_paste_mode = chr(27) + r"\[\?2004h" + code_underline = chr(27) + r"\[4m" code_set = [ code_position_cursor, @@ -2427,6 +2428,7 @@ def strip_ansi_escape_codes(self, string_buffer: str) -> str: code_cursor_forward, code_wrap_around, code_bracketed_paste_mode, + code_underline, ] output = string_buffer diff --git a/netmiko/garderos/__init__.py b/netmiko/garderos/__init__.py new file mode 100644 index 000000000..41011c18c --- /dev/null +++ b/netmiko/garderos/__init__.py @@ -0,0 +1,3 @@ +from netmiko.garderos.garderos_ssh import GarderosSSH + +__all__ = ["GarderosSSH"] diff --git a/netmiko/garderos/garderos_ssh.py b/netmiko/garderos/garderos_ssh.py new file mode 100644 index 000000000..f1047275b --- /dev/null +++ b/netmiko/garderos/garderos_ssh.py @@ -0,0 +1,484 @@ +from netmiko.cisco_base_connection import CiscoSSHConnection +from netmiko.exceptions import ConfigInvalidException +from time import sleep +from typing import ( + Optional, + Any, + List, + Dict, + Sequence, + Iterator, + TextIO, + Union, +) + + +class GarderosSSH(CiscoSSHConnection): + def session_preparation(self) -> None: + """Prepare the session after the connection has been established""" + self.ansi_escape_codes = True + self._test_channel_read() + self.set_base_prompt(pri_prompt_terminator="#") + self.clear_buffer() + + def send_command( + self, + command_string: str, + expect_string: Optional[str] = None, + read_timeout: float = 10.0, + delay_factor: Optional[float] = None, + max_loops: Optional[int] = None, + auto_find_prompt: bool = True, + strip_prompt: bool = True, + strip_command: bool = True, + normalize: bool = True, + use_textfsm: bool = False, + textfsm_template: Optional[str] = None, + use_ttp: bool = False, + ttp_template: Optional[str] = None, + use_genie: bool = False, + cmd_verify: bool = True, + ) -> Union[str, List[Any], Dict[str, Any]]: + """Add strip() command to output of send_command()""" + # First check if command contains a newline character + # This is not allowed in Garderos GRS + newline_chars = ["\n", "\r"] + for newline in newline_chars: + if newline in command_string: + raise ValueError( + "The following command contains an illegal newline character: " + + command_string + ) + # Send command to device + result = super().send_command( + command_string=command_string, + expect_string=expect_string, + read_timeout=read_timeout, + delay_factor=delay_factor, + max_loops=max_loops, + auto_find_prompt=auto_find_prompt, + strip_prompt=strip_prompt, + strip_command=strip_command, + normalize=normalize, + use_textfsm=use_textfsm, + textfsm_template=textfsm_template, + use_ttp=use_ttp, + ttp_template=ttp_template, + use_genie=use_genie, + cmd_verify=cmd_verify, + ) + # Optimize output of strings + if isinstance(result, str): + result = result.strip() + # Return result + return result + + def check_config_mode( + self, check_string: str = ")#", pattern: str = "#", force_regex: bool = False + ) -> bool: + """Checks if the device is in configuration mode or not.""" + return super().check_config_mode( + check_string=check_string, pattern=pattern, force_regex=force_regex + ) + + def config_mode( + self, + config_command: str = "configuration terminal", + pattern: str = "", + re_flags: int = 0, + ) -> str: + return super().config_mode( + config_command=config_command, pattern=pattern, re_flags=re_flags + ) + + def exit_config_mode(self, exit_config: str = "exit", pattern: str = "#") -> str: + """Exit from configuration mode.""" + return super().exit_config_mode(exit_config=exit_config, pattern=pattern) + + def commit(self, commit: str = "commit") -> str: + """Commit the candidate configuration.""" + # Verify device is not in configuration mode + if self.check_config_mode(): + raise ValueError("Device is in configuration mode. Please exit first.") + # Run commit command + commit_result = self.send_command(commit) + # Verify success + if commit_result.__contains__("No configuration to commit"): + raise ValueError( + "No configuration to commit. Please configure device first." + ) + elif not commit_result.__contains__("Values will be reloaded"): + raise ValueError(f"Commit was unsuccessful. Device said: {commit_result}") + # Return device output + commit_result = str(commit_result) + return commit_result + + def save_config( + self, + cmd: str = "write startup-configuration", + confirm: bool = False, + confirm_response: str = "", + ) -> str: + """Saves Config.""" + # Verify device is not in configuration mode + if self.check_config_mode(): + raise ValueError("Device is in configuration mode. Please exit first.") + # Variables confirm and confirm_response are not relevant for Garderos + if confirm: + raise ValueError( + "Garderos saves the config without the need of confirmation. " + "Please set variable 'confirm' to False!" + ) + # Run write command + save_config_result = self.send_command(cmd) + # Verify success + if not save_config_result.__contains__( + "Values are persistently saved to STARTUP-CONF" + ): + raise ValueError( + f"Saving configuration was unsuccessful. Device said: {save_config_result}" + ) + # Return device output + save_config_result = str(save_config_result) + return save_config_result + + def check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> bool: + """Checks if the device is in Linux mode or not. + + :param check_string: Identification of configuration mode from the device + + :param pattern: Pattern to terminate reading of channel + """ + self.write_channel(self.RETURN) + output = self.read_until_prompt(read_entire_line=True) + return check_string in output + + def linux_mode(self, linux_command: str = "linux-shell", pattern: str = "") -> str: + """Enter into Linux mode. + + :param config_command: Linux command to send to the device + + :param pattern: Pattern to terminate reading of channel + """ + output = "" + if not self.check_linux_mode(): + self.write_channel(self.normalize_cmd(linux_command)) + output = self.read_until_pattern(pattern=pattern) + if not self.check_linux_mode(): + raise ValueError("Failed to enter Linux mode.") + return output + + def exit_linux_mode(self, exit_linux: str = "exit", pattern: str = "#") -> str: + """Exit from Linux mode. + + :param exit_config: Command to exit Linux mode + + :param pattern: Pattern to terminate reading of channel + """ + output = "" + if self.check_linux_mode(): + self.write_channel(self.normalize_cmd(exit_linux)) + output = self.read_until_pattern(pattern=pattern) + if self.check_linux_mode(): + raise ValueError("Failed to exit Linux mode") + return output + + def remove_and_replace_control_chars(self, s: str) -> str: + """Removing all occurrences of "\r\n\r" except of the last occurrence + Last occurence will be replaced by "\n" + + :param s: String that needs to be cleansed + """ + # Because the sequence "\r\n\r" also matches "\r\n\r\n", + # we need to replace "\r\n\r\n" with "\n\n" first + s = s.replace("\r\n\r\n", "\n\n") + # Now we have eliminated "\r\n\r\n" + # and can begin working on the remaining "\r\n\r" occurrences + control_seq = "\r\n\r" + if s.count(control_seq) == 0: + return s + else: + index_last_occurrence = s.rfind(control_seq) + index_rest_of_string = index_last_occurrence + len(control_seq) + return ( + s[:index_last_occurrence].replace(control_seq, "") + + "\n" + + s[index_rest_of_string:] + ) + + def normalize_linefeeds(self, a_string: str) -> str: + """Optimised normalisation of line feeds + + :param a_string: A string that may have non-normalized line feeds + i.e. output returned from device, or a device prompt + """ + # Garderos has special behavior in terms of line feeds: + # The echo of commands sometimes contains "\r\n\r" + # which breaks the functionality of _sanitize_output(). + # Therefore, this character sequence needs to be fixed + # before passing the string to normalize_linefeeds(). + + # First we will remove all the occurrences of "\r\n\r" except of the last one. + # The last occurrence will be replaced by "\r\n". + a_string = self.remove_and_replace_control_chars(a_string) + + # Then we will pass the string to normalize_linefeeds() to replace all line feeds with "\n" + return super().normalize_linefeeds(a_string=a_string) + + def send_config_command( + self, + command_string: str, + expect_string: Optional[str] = None, + read_timeout: float = 10.0, + delay_factor: Optional[float] = None, + max_loops: Optional[int] = None, + auto_find_prompt: bool = True, + strip_prompt: bool = True, + strip_command: bool = True, + normalize: bool = True, + use_textfsm: bool = False, + textfsm_template: Optional[str] = None, + use_ttp: bool = False, + ttp_template: Optional[str] = None, + use_genie: bool = False, + cmd_verify: bool = True, + ) -> Union[str, List[Any], Dict[str, Any]]: + """ + Execute a command in configuration mode and raise error if command execution failed. + Function neither checks if device is configuration mode nor turns on configuration mode. + """ + # Send command to device + command_result = self.send_command( + command_string=command_string, + expect_string=expect_string, + read_timeout=read_timeout, + delay_factor=delay_factor, + max_loops=max_loops, + auto_find_prompt=auto_find_prompt, + strip_prompt=strip_prompt, + strip_command=strip_command, + normalize=normalize, + use_textfsm=use_textfsm, + textfsm_template=textfsm_template, + use_ttp=use_ttp, + ttp_template=ttp_template, + use_genie=use_genie, + cmd_verify=cmd_verify, + ) + # Verify if configuration command executed successfully + if command_result != "Set.": + raise ConfigInvalidException( + 'Error executing configuration command "{}". Device said: {}'.format( + command_string, command_result + ) + ) + return command_result + + def send_config_set( + self, + config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None, + *, + exit_config_mode: bool = True, + read_timeout: Optional[float] = None, + delay_factor: Optional[float] = None, + max_loops: Optional[int] = None, + strip_prompt: bool = False, + strip_command: bool = False, + config_mode_command: Optional[str] = None, + cmd_verify: bool = True, + enter_config_mode: bool = True, + error_pattern: str = "", + terminator: str = r"#", + bypass_commands: Optional[str] = None, + ) -> str: + """ + Optimised version of send_config_set() for Garderos. + Checks whether single config commands executed successfully. + + Automatically exits/enters configuration mode. + + :param config_commands: Multiple configuration commands to be sent to the device + + :param exit_config_mode: Determines whether or not to exit config mode after complete + + :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5. + + :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5. + + :param strip_prompt: Determines whether or not to strip the prompt + + :param strip_command: Determines whether or not to strip the command + + :param read_timeout: Absolute timer to send to read_channel_timing. Also adjusts + read_timeout in read_until_pattern calls. + + :param config_mode_command: The command to enter into config mode + + :param cmd_verify: Whether or not to verify command echo for each command in config_set + + :param enter_config_mode: Do you enter config mode before sending config commands + + :param error_pattern: Regular expression pattern to detect config errors in the + output. + + :param terminator: Regular expression pattern to use as an alternate terminator in certain + situations. + + :param bypass_commands: Regular expression pattern indicating configuration commands + where cmd_verify is automatically disabled. + """ + # The result of all commands will be collected to config_results + config_results = "" + # Set delay_factor to given value + if delay_factor is None: + delay_factor = self.select_delay_factor(0) + else: + delay_factor = self.select_delay_factor(delay_factor) + # Verify if config_commands is an array + if config_commands is None: + return config_results + elif isinstance(config_commands, str): + config_commands = (config_commands,) + if not hasattr(config_commands, "__iter__"): + raise ValueError("Invalid argument passed into send_config_set") + # Go to config mode. Use given config_mode_command if necessary. + if enter_config_mode: + if config_mode_command: + config_results += self.config_mode(config_mode_command) + else: + config_results += self.config_mode() + # Send all commands to the router and verify their successful execution + for command in config_commands: + # Verification is done in send_config_command() function + # Will raise error on execution failure + result = self.send_config_command(command) + config_results = f"{command}\n{result}\n" + # Exit config mode if needed + if exit_config_mode: + self.exit_config_mode() + # Return all results + # Will only be executed if no error occured + return config_results + + def ssh_connect( + self, + host: str, + username: str = "", + password: str = "", + timeout: int = 5, + expect_prompt: str = "#", + ) -> str: + """ + Opening a nested SSH connection to another device + + :param host: IP address (or hostname if DNS is configured) + + :param username: Username to be used for SSH connection + + :param password: Password to be used for SSH connection + + :param timeout: Maximum time in seconds to wait for username and password prompt + + :param expect_prompt: Device prompt (or part of it ) to expect after successful login + + """ + # Send SSH command to Garderos + self.write_channel("ssh " + host + "\n") + # Wait for username and password prompt + # Maximum waiting time in seconds is set by variable timeout + password_sent = False + login_completed = False + counter = 0 + output_current = "" + output_summary = "" + while not (password_sent and login_completed) and counter < timeout: + sleep(1) + output_current = self.read_channel() + output_summary += output_current + if "exited" in output_current: + # SSH session terminated + # Clear the receive channel before raising exception // Also retrieves error message + output_current = self.read_channel() + raise ConnectionError( + "Error connecting to host {}. Session terminated unexpectedly. \n{}".format( + host, output_current.strip() + ) + ) + elif not password_sent and "user" in output_current.lower(): + # Send username when prompted + self.write_channel(username + "\n") + elif not password_sent and "password" in output_current.lower(): + # Send password when prompted for the first time + self.write_channel(password + "\n") + password_sent = True + elif password_sent and "password" in output_current.lower(): + # If password has already been sent but a second password prompt appears, + # this means the password was incorrect + # Cancel nested SSH connection: CTRL+C followed by ENTER + self.write_channel(b"\x03".decode("utf-8")) + sleep(1) + self.write_channel("\n") + sleep(2) + # Clear the receive channel before raising exception + output_current = self.read_channel() + raise ValueError( + "Authentication error while connecting to host {}.".format(host) + ) + elif ( + password_sent + and not login_completed + and not (expect_prompt in output_current) + ): + # After password has been sent, try to send ENTER to device to get the device prompt + self.write_channel("\n") + elif ( + password_sent + and not login_completed + and expect_prompt in output_current + ): + # Device prompt successfully received + login_completed = True + # Increase counter in any case + counter += 1 + # On exit of loop check whether password has been sent + if not password_sent or not login_completed: + # Cancel nested SSH connection: CTRL+C followed by ENTER + self.write_channel(b"\x03".decode("utf-8")) + sleep(1) + self.write_channel("\n") + sleep(2) + # Clear the receive channel before raising exception + output_current = self.read_channel() + raise TimeoutError( + "Timeout error while connecting to host {}. Transcript of session: \n{}".format( + host, output_summary + ) + ) + return output_current.strip() + + def ssh_send_command( + self, + command_string: str, + expect_string: Optional[str] = None, + expect_timeout: float = 5.0, + ) -> str: + """ + Sending a command through an active nested SSH connection + + :param command_string: Command to be send through nested SSH session + + :param expect_string: Output to be expected from remote device, e.g. command promt + + :param expect_timeout: Timeout in seconds to wait for expect_string in output + + """ + # Send command to channel + self.write_channel(command_string + "\n") + output_summary = "" + # Check if waiting for expect string is necessary + if expect_string is not None: + # Read until expect string detected + output_summary = self.read_until_pattern( + pattern=expect_string, read_timeout=expect_timeout + ) + return output_summary diff --git a/netmiko/ssh_dispatcher.py b/netmiko/ssh_dispatcher.py index 31ba526a8..35c708fdd 100755 --- a/netmiko/ssh_dispatcher.py +++ b/netmiko/ssh_dispatcher.py @@ -82,6 +82,7 @@ from netmiko.fiberstore import FiberstoreFsosSSH from netmiko.flexvnf import FlexvnfSSH from netmiko.fortinet import FortinetSSH +from netmiko.garderos import GarderosSSH from netmiko.hillstone import HillstoneStoneosSSH from netmiko.hp import HPProcurveSSH, HPProcurveTelnet, HPComwareSSH, HPComwareTelnet from netmiko.huawei import HuaweiSSH, HuaweiVrpv8SSH, HuaweiTelnet @@ -226,6 +227,8 @@ "fiberstore_fsos": FiberstoreFsosSSH, "flexvnf": FlexvnfSSH, "fortinet": FortinetSSH, + "garderos": GarderosSSH, + "garderos_grs": GarderosSSH, "generic": GenericSSH, "generic_termserver": TerminalServerSSH, "hillstone_stoneos": HillstoneStoneosSSH, From 50e9f35b7a0e801f3f00ed0a897f895a31af20b2 Mon Sep 17 00:00:00 2001 From: Enrico Straehler Date: Tue, 23 Apr 2024 17:12:29 +0200 Subject: [PATCH 02/10] Added sleep command to commit function to pass config test --- netmiko/garderos/garderos_ssh.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/netmiko/garderos/garderos_ssh.py b/netmiko/garderos/garderos_ssh.py index f1047275b..ebbe7935d 100644 --- a/netmiko/garderos/garderos_ssh.py +++ b/netmiko/garderos/garderos_ssh.py @@ -109,6 +109,10 @@ def commit(self, commit: str = "commit") -> str: ) elif not commit_result.__contains__("Values will be reloaded"): raise ValueError(f"Commit was unsuccessful. Device said: {commit_result}") + # Garderos needs a second to apply the config + # If the "show configuration running" command is executed to quickly after committing + # it will result in error "No running configuration found." + sleep(1) # Return device output commit_result = str(commit_result) return commit_result From 76de12a8b0e2cb5cc9323ad69ae91c1891644e0a Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Fri, 3 May 2024 15:54:38 -0700 Subject: [PATCH 03/10] Add NOS name to garderos driver --- netmiko/garderos/__init__.py | 4 ++-- netmiko/garderos/{garderos_ssh.py => garderos_grs.py} | 2 +- netmiko/ssh_dispatcher.py | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) rename netmiko/garderos/{garderos_ssh.py => garderos_grs.py} (99%) diff --git a/netmiko/garderos/__init__.py b/netmiko/garderos/__init__.py index 41011c18c..0aaa2489b 100644 --- a/netmiko/garderos/__init__.py +++ b/netmiko/garderos/__init__.py @@ -1,3 +1,3 @@ -from netmiko.garderos.garderos_ssh import GarderosSSH +from netmiko.garderos.garderos_grs import GarderosGrsSSH -__all__ = ["GarderosSSH"] +__all__ = ["GarderosGrsSSH"] diff --git a/netmiko/garderos/garderos_ssh.py b/netmiko/garderos/garderos_grs.py similarity index 99% rename from netmiko/garderos/garderos_ssh.py rename to netmiko/garderos/garderos_grs.py index ebbe7935d..01b7a382f 100644 --- a/netmiko/garderos/garderos_ssh.py +++ b/netmiko/garderos/garderos_grs.py @@ -13,7 +13,7 @@ ) -class GarderosSSH(CiscoSSHConnection): +class GarderosGrsSSH(CiscoSSHConnection): def session_preparation(self) -> None: """Prepare the session after the connection has been established""" self.ansi_escape_codes = True diff --git a/netmiko/ssh_dispatcher.py b/netmiko/ssh_dispatcher.py index 35c708fdd..6019849f9 100755 --- a/netmiko/ssh_dispatcher.py +++ b/netmiko/ssh_dispatcher.py @@ -82,7 +82,7 @@ from netmiko.fiberstore import FiberstoreFsosSSH from netmiko.flexvnf import FlexvnfSSH from netmiko.fortinet import FortinetSSH -from netmiko.garderos import GarderosSSH +from netmiko.garderos import GarderosGrsSSH from netmiko.hillstone import HillstoneStoneosSSH from netmiko.hp import HPProcurveSSH, HPProcurveTelnet, HPComwareSSH, HPComwareTelnet from netmiko.huawei import HuaweiSSH, HuaweiVrpv8SSH, HuaweiTelnet @@ -227,8 +227,7 @@ "fiberstore_fsos": FiberstoreFsosSSH, "flexvnf": FlexvnfSSH, "fortinet": FortinetSSH, - "garderos": GarderosSSH, - "garderos_grs": GarderosSSH, + "garderos_grs": GarderosGrsSSH, "generic": GenericSSH, "generic_termserver": TerminalServerSSH, "hillstone_stoneos": HillstoneStoneosSSH, From 22fe6fdc14709f168b8de9915a8add15c63d6614 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Fri, 3 May 2024 15:59:02 -0700 Subject: [PATCH 04/10] Simplifying send_command() method --- netmiko/garderos/garderos_grs.py | 53 ++++++++------------------------ 1 file changed, 12 insertions(+), 41 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index 01b7a382f..66588a355 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -23,54 +23,25 @@ def session_preparation(self) -> None: def send_command( self, - command_string: str, - expect_string: Optional[str] = None, - read_timeout: float = 10.0, - delay_factor: Optional[float] = None, - max_loops: Optional[int] = None, - auto_find_prompt: bool = True, - strip_prompt: bool = True, - strip_command: bool = True, - normalize: bool = True, - use_textfsm: bool = False, - textfsm_template: Optional[str] = None, - use_ttp: bool = False, - ttp_template: Optional[str] = None, - use_genie: bool = False, - cmd_verify: bool = True, + *args: Any, + **kwargs: Any, ) -> Union[str, List[Any], Dict[str, Any]]: """Add strip() command to output of send_command()""" - # First check if command contains a newline character + + # First check if command contains a newline/carriage-return. # This is not allowed in Garderos GRS - newline_chars = ["\n", "\r"] - for newline in newline_chars: - if newline in command_string: - raise ValueError( - "The following command contains an illegal newline character: " - + command_string - ) + command_string = args[0] if args else kwargs["command_string"] + if "\n" in command_string or "\r" in command_string: + raise ValueError( + f"The command contains an illegal newline/carriage-return: {command_string}" + ) + # Send command to device - result = super().send_command( - command_string=command_string, - expect_string=expect_string, - read_timeout=read_timeout, - delay_factor=delay_factor, - max_loops=max_loops, - auto_find_prompt=auto_find_prompt, - strip_prompt=strip_prompt, - strip_command=strip_command, - normalize=normalize, - use_textfsm=use_textfsm, - textfsm_template=textfsm_template, - use_ttp=use_ttp, - ttp_template=ttp_template, - use_genie=use_genie, - cmd_verify=cmd_verify, - ) + result = super().send_command(*args, **kwargs) + # Optimize output of strings if isinstance(result, str): result = result.strip() - # Return result return result def check_config_mode( From 4f38f194ad20bb42e8b10b304a4f7815b7f79ca4 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Fri, 3 May 2024 16:01:55 -0700 Subject: [PATCH 05/10] Garderos: minor changes to 'commit' method --- netmiko/garderos/garderos_grs.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index 66588a355..eef75b4eb 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -68,24 +68,25 @@ def exit_config_mode(self, exit_config: str = "exit", pattern: str = "#") -> str def commit(self, commit: str = "commit") -> str: """Commit the candidate configuration.""" - # Verify device is not in configuration mode + if self.check_config_mode(): raise ValueError("Device is in configuration mode. Please exit first.") + # Run commit command - commit_result = self.send_command(commit) + commit_result = self._send_command_str(commit) + # Verify success - if commit_result.__contains__("No configuration to commit"): + if "No configuration to commit" in commit_result: raise ValueError( "No configuration to commit. Please configure device first." ) - elif not commit_result.__contains__("Values will be reloaded"): + elif "Values will be reloaded" not in commit_result: raise ValueError(f"Commit was unsuccessful. Device said: {commit_result}") + # Garderos needs a second to apply the config # If the "show configuration running" command is executed to quickly after committing # it will result in error "No running configuration found." sleep(1) - # Return device output - commit_result = str(commit_result) return commit_result def save_config( From 7e6a7479f1880978a13d2df3b1dfa8cd181ceed0 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Fri, 3 May 2024 16:04:02 -0700 Subject: [PATCH 06/10] Garderos: minor changes to 'save_config' method --- netmiko/garderos/garderos_grs.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index eef75b4eb..a49370d69 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -89,33 +89,31 @@ def commit(self, commit: str = "commit") -> str: sleep(1) return commit_result - def save_config( + def save_config( self, cmd: str = "write startup-configuration", confirm: bool = False, confirm_response: str = "", ) -> str: """Saves Config.""" - # Verify device is not in configuration mode + if self.check_config_mode(): raise ValueError("Device is in configuration mode. Please exit first.") - # Variables confirm and confirm_response are not relevant for Garderos + if confirm: raise ValueError( "Garderos saves the config without the need of confirmation. " "Please set variable 'confirm' to False!" ) - # Run write command - save_config_result = self.send_command(cmd) + + save_config_result = self._send_command_str(cmd) + # Verify success - if not save_config_result.__contains__( - "Values are persistently saved to STARTUP-CONF" - ): + if "Values are persistently saved to STARTUP-CONF" not in save_config_result: raise ValueError( f"Saving configuration was unsuccessful. Device said: {save_config_result}" ) - # Return device output - save_config_result = str(save_config_result) + return save_config_result def check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> bool: From 25aa14307401edc6e5141623be4faf1035650b3b Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Fri, 3 May 2024 16:06:19 -0700 Subject: [PATCH 07/10] Convert linux methods to private methods --- netmiko/garderos/garderos_grs.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index a49370d69..42777e47b 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -89,7 +89,7 @@ def commit(self, commit: str = "commit") -> str: sleep(1) return commit_result - def save_config( + def save_config( self, cmd: str = "write startup-configuration", confirm: bool = False, @@ -116,7 +116,7 @@ def save_config( return save_config_result - def check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> bool: + def _check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> bool: """Checks if the device is in Linux mode or not. :param check_string: Identification of configuration mode from the device @@ -127,7 +127,7 @@ def check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> bool output = self.read_until_prompt(read_entire_line=True) return check_string in output - def linux_mode(self, linux_command: str = "linux-shell", pattern: str = "") -> str: + def _linux_mode(self, linux_command: str = "linux-shell", pattern: str = "") -> str: """Enter into Linux mode. :param config_command: Linux command to send to the device @@ -135,14 +135,14 @@ def linux_mode(self, linux_command: str = "linux-shell", pattern: str = "") -> s :param pattern: Pattern to terminate reading of channel """ output = "" - if not self.check_linux_mode(): + if not self._check_linux_mode(): self.write_channel(self.normalize_cmd(linux_command)) output = self.read_until_pattern(pattern=pattern) - if not self.check_linux_mode(): + if not self._check_linux_mode(): raise ValueError("Failed to enter Linux mode.") return output - def exit_linux_mode(self, exit_linux: str = "exit", pattern: str = "#") -> str: + def _exit_linux_mode(self, exit_linux: str = "exit", pattern: str = "#") -> str: """Exit from Linux mode. :param exit_config: Command to exit Linux mode @@ -150,10 +150,10 @@ def exit_linux_mode(self, exit_linux: str = "exit", pattern: str = "#") -> str: :param pattern: Pattern to terminate reading of channel """ output = "" - if self.check_linux_mode(): + if self._check_linux_mode(): self.write_channel(self.normalize_cmd(exit_linux)) output = self.read_until_pattern(pattern=pattern) - if self.check_linux_mode(): + if self._check_linux_mode(): raise ValueError("Failed to exit Linux mode") return output From d4a3c3dadff58b77833e030b3bcae8dc01abcd62 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Wed, 22 May 2024 17:59:34 -0700 Subject: [PATCH 08/10] Remove 'ssh_connect' and 'ssh_send_command' from Garderos driver --- netmiko/garderos/garderos_grs.py | 123 ------------------------------- 1 file changed, 123 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index 42777e47b..8565c09cc 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -333,126 +333,3 @@ def send_config_set( # Return all results # Will only be executed if no error occured return config_results - - def ssh_connect( - self, - host: str, - username: str = "", - password: str = "", - timeout: int = 5, - expect_prompt: str = "#", - ) -> str: - """ - Opening a nested SSH connection to another device - - :param host: IP address (or hostname if DNS is configured) - - :param username: Username to be used for SSH connection - - :param password: Password to be used for SSH connection - - :param timeout: Maximum time in seconds to wait for username and password prompt - - :param expect_prompt: Device prompt (or part of it ) to expect after successful login - - """ - # Send SSH command to Garderos - self.write_channel("ssh " + host + "\n") - # Wait for username and password prompt - # Maximum waiting time in seconds is set by variable timeout - password_sent = False - login_completed = False - counter = 0 - output_current = "" - output_summary = "" - while not (password_sent and login_completed) and counter < timeout: - sleep(1) - output_current = self.read_channel() - output_summary += output_current - if "exited" in output_current: - # SSH session terminated - # Clear the receive channel before raising exception // Also retrieves error message - output_current = self.read_channel() - raise ConnectionError( - "Error connecting to host {}. Session terminated unexpectedly. \n{}".format( - host, output_current.strip() - ) - ) - elif not password_sent and "user" in output_current.lower(): - # Send username when prompted - self.write_channel(username + "\n") - elif not password_sent and "password" in output_current.lower(): - # Send password when prompted for the first time - self.write_channel(password + "\n") - password_sent = True - elif password_sent and "password" in output_current.lower(): - # If password has already been sent but a second password prompt appears, - # this means the password was incorrect - # Cancel nested SSH connection: CTRL+C followed by ENTER - self.write_channel(b"\x03".decode("utf-8")) - sleep(1) - self.write_channel("\n") - sleep(2) - # Clear the receive channel before raising exception - output_current = self.read_channel() - raise ValueError( - "Authentication error while connecting to host {}.".format(host) - ) - elif ( - password_sent - and not login_completed - and not (expect_prompt in output_current) - ): - # After password has been sent, try to send ENTER to device to get the device prompt - self.write_channel("\n") - elif ( - password_sent - and not login_completed - and expect_prompt in output_current - ): - # Device prompt successfully received - login_completed = True - # Increase counter in any case - counter += 1 - # On exit of loop check whether password has been sent - if not password_sent or not login_completed: - # Cancel nested SSH connection: CTRL+C followed by ENTER - self.write_channel(b"\x03".decode("utf-8")) - sleep(1) - self.write_channel("\n") - sleep(2) - # Clear the receive channel before raising exception - output_current = self.read_channel() - raise TimeoutError( - "Timeout error while connecting to host {}. Transcript of session: \n{}".format( - host, output_summary - ) - ) - return output_current.strip() - - def ssh_send_command( - self, - command_string: str, - expect_string: Optional[str] = None, - expect_timeout: float = 5.0, - ) -> str: - """ - Sending a command through an active nested SSH connection - - :param command_string: Command to be send through nested SSH session - - :param expect_string: Output to be expected from remote device, e.g. command promt - - :param expect_timeout: Timeout in seconds to wait for expect_string in output - - """ - # Send command to channel - self.write_channel(command_string + "\n") - output_summary = "" - # Check if waiting for expect string is necessary - if expect_string is not None: - # Read until expect string detected - output_summary = self.read_until_pattern( - pattern=expect_string, read_timeout=expect_timeout - ) - return output_summary From 5a3ef49ec0b82f29892afeba65257dab4115759c Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Tue, 9 Jul 2024 19:42:54 -0700 Subject: [PATCH 09/10] Updates to Garderos Driver --- netmiko/garderos/garderos_grs.py | 104 +++++-------------------------- 1 file changed, 15 insertions(+), 89 deletions(-) diff --git a/netmiko/garderos/garderos_grs.py b/netmiko/garderos/garderos_grs.py index 8565c09cc..3e264821f 100644 --- a/netmiko/garderos/garderos_grs.py +++ b/netmiko/garderos/garderos_grs.py @@ -84,7 +84,7 @@ def commit(self, commit: str = "commit") -> str: raise ValueError(f"Commit was unsuccessful. Device said: {commit_result}") # Garderos needs a second to apply the config - # If the "show configuration running" command is executed to quickly after committing + # If the "show configuration running" command is executed too quickly after committing # it will result in error "No running configuration found." sleep(1) return commit_result @@ -127,7 +127,9 @@ def _check_linux_mode(self, check_string: str = "]#", pattern: str = "#") -> boo output = self.read_until_prompt(read_entire_line=True) return check_string in output - def _linux_mode(self, linux_command: str = "linux-shell", pattern: str = "") -> str: + def _linux_mode( + self, linux_command: str = "linux-shell", pattern: str = r"#" + ) -> str: """Enter into Linux mode. :param config_command: Linux command to send to the device @@ -157,49 +159,7 @@ def _exit_linux_mode(self, exit_linux: str = "exit", pattern: str = "#") -> str: raise ValueError("Failed to exit Linux mode") return output - def remove_and_replace_control_chars(self, s: str) -> str: - """Removing all occurrences of "\r\n\r" except of the last occurrence - Last occurence will be replaced by "\n" - - :param s: String that needs to be cleansed - """ - # Because the sequence "\r\n\r" also matches "\r\n\r\n", - # we need to replace "\r\n\r\n" with "\n\n" first - s = s.replace("\r\n\r\n", "\n\n") - # Now we have eliminated "\r\n\r\n" - # and can begin working on the remaining "\r\n\r" occurrences - control_seq = "\r\n\r" - if s.count(control_seq) == 0: - return s - else: - index_last_occurrence = s.rfind(control_seq) - index_rest_of_string = index_last_occurrence + len(control_seq) - return ( - s[:index_last_occurrence].replace(control_seq, "") - + "\n" - + s[index_rest_of_string:] - ) - - def normalize_linefeeds(self, a_string: str) -> str: - """Optimised normalisation of line feeds - - :param a_string: A string that may have non-normalized line feeds - i.e. output returned from device, or a device prompt - """ - # Garderos has special behavior in terms of line feeds: - # The echo of commands sometimes contains "\r\n\r" - # which breaks the functionality of _sanitize_output(). - # Therefore, this character sequence needs to be fixed - # before passing the string to normalize_linefeeds(). - - # First we will remove all the occurrences of "\r\n\r" except of the last one. - # The last occurrence will be replaced by "\r\n". - a_string = self.remove_and_replace_control_chars(a_string) - - # Then we will pass the string to normalize_linefeeds() to replace all line feeds with "\n" - return super().normalize_linefeeds(a_string=a_string) - - def send_config_command( + def _send_config_command( self, command_string: str, expect_string: Optional[str] = None, @@ -216,13 +176,13 @@ def send_config_command( ttp_template: Optional[str] = None, use_genie: bool = False, cmd_verify: bool = True, - ) -> Union[str, List[Any], Dict[str, Any]]: + ) -> str: """ Execute a command in configuration mode and raise error if command execution failed. Function neither checks if device is configuration mode nor turns on configuration mode. """ # Send command to device - command_result = self.send_command( + command_result = self._send_command_str( command_string=command_string, expect_string=expect_string, read_timeout=read_timeout, @@ -265,49 +225,16 @@ def send_config_set( terminator: str = r"#", bypass_commands: Optional[str] = None, ) -> str: - """ - Optimised version of send_config_set() for Garderos. - Checks whether single config commands executed successfully. - - Automatically exits/enters configuration mode. - - :param config_commands: Multiple configuration commands to be sent to the device - - :param exit_config_mode: Determines whether or not to exit config mode after complete - :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5. - - :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5. - - :param strip_prompt: Determines whether or not to strip the prompt - - :param strip_command: Determines whether or not to strip the command - - :param read_timeout: Absolute timer to send to read_channel_timing. Also adjusts - read_timeout in read_until_pattern calls. - - :param config_mode_command: The command to enter into config mode - - :param cmd_verify: Whether or not to verify command echo for each command in config_set - - :param enter_config_mode: Do you enter config mode before sending config commands - - :param error_pattern: Regular expression pattern to detect config errors in the - output. - - :param terminator: Regular expression pattern to use as an alternate terminator in certain - situations. - - :param bypass_commands: Regular expression pattern indicating configuration commands - where cmd_verify is automatically disabled. - """ # The result of all commands will be collected to config_results config_results = "" + # Set delay_factor to given value if delay_factor is None: delay_factor = self.select_delay_factor(0) else: delay_factor = self.select_delay_factor(delay_factor) + # Verify if config_commands is an array if config_commands is None: return config_results @@ -315,21 +242,20 @@ def send_config_set( config_commands = (config_commands,) if not hasattr(config_commands, "__iter__"): raise ValueError("Invalid argument passed into send_config_set") + # Go to config mode. Use given config_mode_command if necessary. if enter_config_mode: if config_mode_command: config_results += self.config_mode(config_mode_command) else: config_results += self.config_mode() + # Send all commands to the router and verify their successful execution for command in config_commands: - # Verification is done in send_config_command() function + # Verification is done in send_config_command() # Will raise error on execution failure - result = self.send_config_command(command) - config_results = f"{command}\n{result}\n" - # Exit config mode if needed + config_results += self._send_config_command(command) + if exit_config_mode: - self.exit_config_mode() - # Return all results - # Will only be executed if no error occured + config_results += self.exit_config_mode() return config_results From e2218ba8fe8e43d894c3f2a50d128b42b9a55993 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Tue, 9 Jul 2024 19:49:07 -0700 Subject: [PATCH 10/10] Add Garderos GRS to plaforms --- PLATFORMS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/PLATFORMS.md b/PLATFORMS.md index 504696ceb..927506764 100644 --- a/PLATFORMS.md +++ b/PLATFORMS.md @@ -96,6 +96,7 @@ - F5 TMSH - F5 Linux - Fortinet +- Garderos GRS - MRV Communications OptiSwitch - MRV LX - Nokia/Alcatel SR-OS