From a01baadfa8b023abffb18a42e79f78621388fd79 Mon Sep 17 00:00:00 2001 From: Joe Testa Date: Fri, 22 Nov 2024 12:28:02 -0500 Subject: [PATCH] Additional cleanups after merging #304. --- README.md | 116 +++++++------- src/ssh_audit/outputbuffer.py | 4 +- src/ssh_audit/ssh_audit.py | 284 +++++++++++++--------------------- src/ssh_audit/utils.py | 2 +- test/test_auditconf.py | 3 +- 5 files changed, 173 insertions(+), 236 deletions(-) diff --git a/README.md b/README.md index d8ba30b7..04e417ec 100644 --- a/README.md +++ b/README.md @@ -41,64 +41,61 @@ ## Usage ``` -usage: ssh-audit.py [options] - - -h, --help print this help - -1, --ssh1 force ssh version 1 only - -2, --ssh2 force ssh version 2 only - -4, --ipv4 enable IPv4 (order of precedence) - -6, --ipv6 enable IPv6 (order of precedence) - -b, --batch batch output - -c, --client-audit starts a server on port 2222 to audit client - software config (use -p to change port; - use -t to change timeout) - --conn-rate-test=N[:max_rate] perform a connection rate test (useful - for collecting metrics related to - susceptibility of the DHEat vuln). - Testing is conducted with N concurrent - sockets with an optional maximum rate - of connections per second. - -d, --debug Enable debug output. - --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack - (CVE-2002-20001) against the target using N - concurrent sockets. Optionally, a specific - key exchange algorithm can be specified - instead of allowing it to be automatically - chosen. Additionally, a small length of - the fake e value sent to the server can - be chosen for a more efficient attack (such - as 4). - -g, --gex-test= dh gex modulus size test - - - -j, --json JSON output (use -jj to enable indents) - -l, --level= minimum output level (info|warn|fail) - -L, --list-policies list all the official, built-in policies. Use with -v - to view policy change logs. - --lookup= looks up an algorithm(s) without - connecting to a server - -m, --manual print the man page (Docker, PyPI, Snap, and Windows - builds only) - -M, --make-policy= creates a policy based on the target server - (i.e.: the target server has the ideal - configuration that other servers should - adhere to) - -n, --no-colors disable colors - -p, --port= port to connect - -P, --policy=<"policy name" | policy.txt> run a policy test using the - specified policy - --skip-rate-test skip the connection rate test during standard audits - (used to safely infer whether the DHEat attack - is viable) - -t, --timeout= timeout (in seconds) for connection and reading - (default: 5) - -T, --targets= a file containing a list of target hosts (one - per line, format HOST[:PORT]). Use -p/--port - to set the default port for all hosts. Use - --threads to control concurrent scans. - --threads= number of threads to use when scanning multiple - targets (-T/--targets) (default: 32) - -v, --verbose verbose output +usage: ssh-audit.py [-h] [-1] [-2] [-4] [-6] [-b] [-c] [-d] + [-g / ] [-j] [-l {info,warn,fail}] [-L] + [-M custom_policy.txt] [-m] [-n] [-P "Built-In Policy Name" / custom_policy.txt] [-p N] + [-T targets.txt] [-t N] [-v] [--conn-rate-test N[:max_rate]] [--dheat N[:kex[:e_len]]] + [--lookup alg1[,alg2,...]] [--skip-rate-test] [--threads N] + [host] + +positional arguments: + host target hostname or IPv4/IPv6 address + +optional arguments: + -h, --help show this help message and exit + -1, --ssh1 force ssh version 1 only + -2, --ssh2 force ssh version 2 only + -4, --ipv4 enable IPv4 (order of precedence) + -6, --ipv6 enable IPv6 (order of precedence) + -b, --batch batch output + -c, --client-audit starts a server on port 2222 to audit client software config (use -p to change port; use -t + to change timeout) + -d, --debug enable debugging output + -g / , --gex-test / + conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum, + preferred, and maximum values, or a range of values with an optional incremental step amount + -j, --json enable JSON output (use -jj to enable indentation for better readability) + -l {info,warn,fail}, --level {info,warn,fail} + minimum output level (default: info) + -L, --list-policies list all the official, built-in policies. Combine with -v to view policy change logs + -M custom_policy.txt, --make-policy custom_policy.txt + creates a policy based on the target server (i.e.: the target server has the ideal + configuration that other servers should adhere to), and stores it in the file path specified + -m, --manual print the man page (Docker, PyPI, Snap, and Windows builds only) + -n, --no-colors disable colors (automatic when the NO_COLOR environment variable is set) + -P "Built-In Policy Name" / custom_policy.txt, --policy "Built-In Policy Name" / custom_policy.txt + run a policy test using the specified policy (use -L to see built-in policies, or specify + filesystem path to custom policy created by -M) + -p N, --port N the TCP port to connect to (or to listen on when -c is used) + -T targets.txt, --targets targets.txt + a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port + to set the default port for all hosts. Use --threads to control concurrent scans + -t N, --timeout N timeout (in seconds) for connection and reading (default: 5) + -v, --verbose enable verbose output + --conn-rate-test N[:max_rate] + perform a connection rate test (useful for collecting metrics related to susceptibility of + the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum + rate of connections per second + --dheat N[:kex[:e_len]] + continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N + concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead + of allowing it to be automatically chosen. Additionally, a small length of the fake e value + sent to the server can be chosen for a more efficient attack (such as 4). + --lookup alg1[,alg2,...] + looks up an algorithm(s) without connecting to a server. + --skip-rate-test skip the connection rate test during standard audits (used to safely infer whether the DHEat + attack is viable) + --threads N number of threads to use when scanning multiple targets (-T/--targets) (default: 32) ``` * if both IPv4 and IPv6 are used, order of precedence can be set by using either `-46` or `-64`. * batch flag `-b` will output sections without header and without empty lines (implies verbose flag). @@ -219,6 +216,9 @@ For convenience, a web front-end on top of the command-line tool is available at ## ChangeLog +### v3.4.0-dev + - Migrated from deprecated `getopt` module to `argparse`; partial credit [oam7575](https://github.com/oam7575). + ### v3.3.0 (2024-10-15) - Added Python 3.13 support. - Added built-in policies for Ubuntu 24.04 LTS server & client, OpenSSH 9.8, and OpenSSH 9.9. diff --git a/src/ssh_audit/outputbuffer.py b/src/ssh_audit/outputbuffer.py index 38723e3c..0058c8e6 100644 --- a/src/ssh_audit/outputbuffer.py +++ b/src/ssh_audit/outputbuffer.py @@ -145,8 +145,10 @@ def head(self, s: str, line_ended: bool = True) -> 'OutputBuffer': self._print('head', s, line_ended) return self - def fail(self, s: str, line_ended: bool = True) -> 'OutputBuffer': + def fail(self, s: str, line_ended: bool = True, write_now: bool = False) -> 'OutputBuffer': self._print('fail', s, line_ended) + if write_now: + self.write() return self def warn(self, s: str, line_ended: bool = True) -> 'OutputBuffer': diff --git a/src/ssh_audit/ssh_audit.py b/src/ssh_audit/ssh_audit.py index 9c320d4d..0837a91b 100755 --- a/src/ssh_audit/ssh_audit.py +++ b/src/ssh_audit/ssh_audit.py @@ -83,63 +83,6 @@ # no_idna_workaround = True -def usage(uout: OutputBuffer, err: Optional[str] = None) -> None: - retval = exitcodes.GOOD - p = os.path.basename(sys.argv[0]) - uout.head('# {} {}, https://github.com/jtesta/ssh-audit\n'.format(p, VERSION)) - if err is not None and len(err) > 0: - uout.fail(err + '\n') - retval = exitcodes.UNKNOWN_ERROR - uout.info('usage: {0} [options] -ip \n'.format(p)) - uout.info(' -h, --help print this help') - uout.info(' -1, --ssh1 force ssh version 1 only') - uout.info(' -2, --ssh2 force ssh version 2 only') - uout.info(' -4, --ipv4 enable IPv4 (order of precedence)') - uout.info(' -6, --ipv6 enable IPv6 (order of precedence)') - uout.info(' -b, --batch batch output') - uout.info(' -c, --client-audit starts a server on port 2222 to audit client\n software config (use -p to change port;\n use -t to change timeout)') - uout.info(' --conn-rate-test=N[:max_rate] perform a connection rate test (useful') - uout.info(' for collecting metrics related to') - uout.info(' susceptibility of the DHEat vuln).') - uout.info(' Testing is conducted with N concurrent') - uout.info(' sockets with an optional maximum rate') - uout.info(' of connections per second.') - uout.info(' -d, --debug debug output') - uout.info(' --dheat=N[:kex[:e_len]] continuously perform the DHEat DoS attack') - uout.info(' (CVE-2002-20001) against the target using N') - uout.info(' concurrent sockets. Optionally, a specific') - uout.info(' key exchange algorithm can be specified') - uout.info(' instead of allowing it to be automatically') - uout.info(' chosen. Additionally, a small length of') - uout.info(' the fake e value sent to the server can') - uout.info(' be chosen for a more efficient attack (such') - uout.info(' as 4).') - uout.info(' -g, --gex-test= dh gex modulus size test') - uout.info(' ') - uout.info(' ') - uout.info(' --hostname hostname of target to scan') - uout.info(' -ip, --ip-address ip address of target to scan') - uout.info(' -j, --json JSON output (use -jj to enable indents)') - uout.info(' -l, --level= minimum output level (info|warn|fail)') - uout.info(' -L, --list-policies list all the official, built-in policies. Use with -v') - uout.info(' to view policy change logs.') - uout.info(' --lookup= looks up an algorithm(s) without\n connecting to a server') - uout.info(' -M, --make-policy= creates a policy based on the target server\n (i.e.: the target server has the ideal\n configuration that other servers should\n adhere to)') - uout.info(' -m, --manual print the man page (Docker, PyPI, Snap, and Windows\n builds only)') - uout.info(' -n, --no-colors disable colors (automatic when the NO_COLOR') - uout.info(' environment variable is set)') - uout.info(' -p, --port= port to connect') - uout.info(' -P, --policy= run a policy test using the specified policy') - uout.info(' --skip-rate-test skip the connection rate test during standard audits\n (used to safely infer whether the DHEat attack\n is viable)') - uout.info(' -t, --timeout= timeout (in seconds) for connection and reading\n (default: 5)') - uout.info(' -T, --targets= a file containing a list of target hosts (one\n per line, format HOST[:PORT]). Use -p/--port\n to set the default port for all hosts. Use\n --threads to control concurrent scans.') - uout.info(' --threads= number of threads to use when scanning multiple\n targets (-T/--targets) (default: 32)') - uout.info(' -v, --verbose verbose output') - uout.sep() - uout.write() - sys.exit(retval) - - def output_algorithms(out: OutputBuffer, title: str, alg_db: Dict[str, Dict[str, List[List[Optional[str]]]]], alg_type: str, algorithms: List[str], unknown_algs: List[str], is_json_output: bool, program_retval: int, maxlen: int = 0, host_keys: Optional[Dict[str, Dict[str, Union[bytes, str, int]]]] = None, dh_modulus_sizes: Optional[Dict[str, int]] = None) -> int: # pylint: disable=too-many-arguments with out: for algorithm in algorithms: @@ -374,7 +317,7 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm notes = " (%s)" % notes fm = '(rec) {0}{1}{2}-- {3} algorithm to {4}{5} ' - fn(fm.format(sg, name, p, alg_type, an, notes)) + fn(fm.format(sg, name, p, alg_type, an, notes)) # type: ignore[operator] if not out.is_section_empty() and not is_json_output: if software is not None: @@ -826,7 +769,7 @@ def make_policy(aconf: AuditConf, banner: Optional['Banner'], kex: Optional['SSH print(err) -def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[..., None]) -> 'AuditConf': # pylint: disable=too-many-statements +def process_commandline(out: OutputBuffer, args: List[str]) -> 'AuditConf': # pylint: disable=too-many-statements # pylint: disable=too-many-branches aconf = AuditConf() @@ -841,117 +784,88 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. aconf.ssh1, aconf.ssh2 = False, False host: str = '' + port: int = 22 - parser = argparse.ArgumentParser(prog='SSH Audit Tool', description='SSH Audit Tool', add_help=False, allow_abbrev=False) + parser = argparse.ArgumentParser(description="# {} {}, https://github.com/jtesta/ssh-audit".format(os.path.basename(sys.argv[0]), VERSION), allow_abbrev=False) # Add short options to the parser - parser.add_argument('-1', '--ssh1', action="store_true", dest='ssh1', default=None) - parser.add_argument('-2', '--ssh2', action="store_true", dest='ssh2', default=None) - parser.add_argument('-4', '--ipv4', action="store_true", dest='ipv4', default=None) - parser.add_argument('-6', '--ipv6', action="store_true", dest='ipv6', default=None) - parser.add_argument('-b', '--batch', action="store_true", dest='batch', default=None) - parser.add_argument('-c', '--client-audit', action="store_true", dest='client_audit', default=None) - parser.add_argument('-d', '--debug', action="store_true", dest='debug', default=None) - parser.add_argument('-g', '--gex-test', action="store", dest='gex_test', default=None) - parser.add_argument('-h', '--help', action="store_true", dest='help', default=None) - parser.add_argument('-ip', '--ip-address', '--hostname', action="store", dest='host', type=str) - parser.add_argument('-j', '--json', action="store_true", dest='json', default=None) - parser.add_argument('-jj', '--json-indent', action="store_true", dest='json_indent', default=None) - parser.add_argument('-l', '--level', action="store", dest='level', type=str, default='info') - parser.add_argument('-L', '--list-policies', action="store_true", dest='list_policies', default=None) - parser.add_argument('-M', '--make-policy', action="store", dest='make_policy', default=None) - parser.add_argument('-m', '--manual', action="store_true", dest='manual', default=None) - parser.add_argument('-n', '--no-colors', action="store_true", dest='no_colors', default=None) - parser.add_argument('-P', '--policy', action="store", dest='policy', default=None) - parser.add_argument('-p', '--port', action="store", dest='port', default='22', type=int) - parser.add_argument('-T', '--targets', action="store", dest='targets', default=None) - parser.add_argument('-t', '--timeout', action="store", dest='timeout', default='5', type=int) - parser.add_argument('-v', '--verbose', action="store_true", dest='verbose', default=None) - + parser.add_argument("-1", "--ssh1", action="store_true", dest="ssh1", default=False, help="force ssh version 1 only") + parser.add_argument("-2", "--ssh2", action="store_true", dest="ssh2", default=False, help="force ssh version 2 only") + parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="enable IPv4 (order of precedence)") + parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="enable IPv6 (order of precedence)") + parser.add_argument("-b", "--batch", action="store_true", dest="batch", default=False, help="batch output") + parser.add_argument("-c", "--client-audit", action="store_true", dest="client_audit", default=False, help="starts a server on port 2222 to audit client software config (use -p to change port; use -t to change timeout)") + parser.add_argument("-d", "--debug", action="store_true", dest="debug", default=False, help="enable debugging output") + parser.add_argument("-g", "--gex-test", action="store", dest="gex_test", metavar=" / ", type=str, default=None, help="conducts a very customized Diffie-Hellman GEX modulus size test. Tests an array of minimum, preferred, and maximum values, or a range of values with an optional incremental step amount") + parser.add_argument("-j", "--json", action="count", dest="json", default=0, help="enable JSON output (use -jj to enable indentation for better readability)") + parser.add_argument("-l", "--level", action="store", dest="level", type=str, choices=["info", "warn", "fail"], default="info", help="minimum output level (default: %(default)s)") + parser.add_argument("-L", "--list-policies", action="store_true", dest="list_policies", default=False, help="list all the official, built-in policies. Combine with -v to view policy change logs") + parser.add_argument("-M", "--make-policy", action="store", dest="make_policy", metavar="custom_policy.txt", type=str, default=None, help="creates a policy based on the target server (i.e.: the target server has the ideal configuration that other servers should adhere to), and stores it in the file path specified") + parser.add_argument("-m", "--manual", action="store_true", dest="manual", default=False, help="print the man page (Docker, PyPI, Snap, and Windows builds only)") + parser.add_argument("-n", "--no-colors", action="store_true", dest="no_colors", default=False, help="disable colors (automatic when the NO_COLOR environment variable is set)") + parser.add_argument("-P", "--policy", action="store", dest="policy", metavar="\"Built-In Policy Name\" / custom_policy.txt", type=str, default=None, help="run a policy test using the specified policy (use -L to see built-in policies, or specify filesystem path to custom policy created by -M)") + parser.add_argument("-p", "--port", action="store", dest="oport", metavar="N", type=int, default=None, help="the TCP port to connect to (or to listen on when -c is used)") + parser.add_argument("-T", "--targets", action="store", dest="targets", metavar="targets.txt", type=str, default=None, help="a file containing a list of target hosts (one per line, format HOST[:PORT]). Use -p/--port to set the default port for all hosts. Use --threads to control concurrent scans") + parser.add_argument("-t", "--timeout", action="store", dest="timeout", metavar="N", type=int, default=5, help="timeout (in seconds) for connection and reading (default: %(default)s)") + parser.add_argument("-v", "--verbose", action="store_true", dest="verbose", default=False, help="enable verbose output") # Add long options to the parser - parser.add_argument('--conn-rate-test', action="store", dest='conn_rate_test', default='0', type=int) - parser.add_argument('--dheat', action="store", dest='dheat', default='0', type=int) - parser.add_argument('--lookup', action="store", dest='lookup', default=None) - parser.add_argument('--skip-rate-test', action="store_true", dest='skip_rate_test', default=None) - parser.add_argument('--threads', action="store", dest='threads', default='32', type=int) + parser.add_argument("--conn-rate-test", action="store", dest="conn_rate_test", metavar="N[:max_rate]", type=str, default=None, help="perform a connection rate test (useful for collecting metrics related to susceptibility of the DHEat vuln). Testing is conducted with N concurrent sockets with an optional maximum rate of connections per second") + parser.add_argument("--dheat", action="store", dest="dheat", metavar="N[:kex[:e_len]]", type=str, default=None, help="continuously perform the DHEat DoS attack (CVE-2002-20001) against the target using N concurrent sockets. Optionally, a specific key exchange algorithm can be specified instead of allowing it to be automatically chosen. Additionally, a small length of the fake e value sent to the server can be chosen for a more efficient attack (such as 4).") + parser.add_argument("--lookup", action="store", dest="lookup", metavar="alg1[,alg2,...]", type=str, default=None, help="looks up an algorithm(s) without connecting to a server.") + parser.add_argument("--skip-rate-test", action="store_true", dest="skip_rate_test", default=False, help="skip the connection rate test during standard audits (used to safely infer whether the DHEat attack is viable)") + parser.add_argument("--threads", action="store", dest="threads", metavar="N", type=int, default=32, help="number of threads to use when scanning multiple targets (-T/--targets) (default: %(default)s)") - try: - argument = parser.parse_args() + # The mandatory target option. Or rather, mandatory when -L, -T, or --lookup are not used. + parser.add_argument("host", nargs="?", action="store", type=str, default="", help="target hostname or IPv4/IPv6 address") - if argument.help is True: - usage_cb(out) + # If no arguments were given, print the help and exit. + if len(args) < 1: + parser.print_help() + sys.exit(exitcodes.UNKNOWN_ERROR) - aconf.host = argument.host - host = argument.host - port = argument.port - aconf.ssh1 = argument.ssh1 - aconf.ssh2 = argument.ssh2 + oport: Optional[int] = None + try: + argument = parser.parse_args(args=args) + + # Set simple flags. + aconf.client_audit = argument.client_audit aconf.ipv4 = argument.ipv4 aconf.ipv6 = argument.ipv6 - - aconf.json = argument.json - if argument.json_indent is True: - setattr(argument, 'json', True) - aconf.json = argument.json - aconf.json_print_indent = argument.json_indent + aconf.level = argument.level + aconf.list_policies = argument.list_policies + aconf.manual = argument.manual + aconf.skip_rate_test = argument.skip_rate_test + aconf.ssh1 = argument.ssh1 + aconf.ssh2 = argument.ssh2 + oport = argument.oport if argument.batch is True: aconf.batch = True aconf.verbose = True - aconf.client_audit = argument.client_audit - - ttime = argument.timeout - if ttime != 5: - aconf.timeout = float(argument.timeout) - aconf.timeout_set = True - - if argument.verbose is True: - aconf.verbose = True - out.verbose = True - - # Get error level regex - err_level = argument.level - if err_level in ["info", "warn", "fail"]: - aconf.level = str(argument.level) - else: - usage_cb(out, 'Error level : {} is not valid'.format(err_level)) - - if getattr(argument, 'make_policy') is True: - aconf.make_policy = True - aconf.policy_file = argument.make_policy - - if getattr(argument, 'policy') is True: - aconf.policy_file = argument.policy - - if getattr(argument, 'targets') is True: - aconf.target_file = argument.targets - - if argument.threads != 32: - aconf.threads = argument.threads - - if getattr(argument, 'list_policies') is True: - aconf.list_policies = True - - if getattr(argument, 'lookup') is True: - aconf.lookup = argument.lookup + # If one -j was given, turn on JSON output. If -jj was given, enable indentation. + aconf.json = argument.json > 0 + if argument.json > 1: + aconf.json_print_indent = True - if getattr(argument, 'manual') is True: - aconf.manual = True - else: - aconf.manual = False + if argument.conn_rate_test is not None: + aconf.conn_rate_test = argument.conn_rate_test if argument.debug is True: aconf.debug = True out.debug = True - if getattr(argument, 'gex_test') is True: + if argument.dheat is not None: + aconf.dheat = argument.dheat + + if argument.gex_test is not None: dh_gex = argument.gex_test permitted_syntax = get_permitted_syntax_for_gex_test() if not any(re.search(regex_str, dh_gex) for regex_str in permitted_syntax.values()): - usage_cb(out, '{} is not valid'.format(dh_gex)) + out.fail('{} is not valid'.format(dh_gex), write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) if re.search(permitted_syntax['RANGE'], dh_gex): extracted_digits = re.findall(r'\d+', dh_gex) @@ -963,55 +877,80 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. bits_step = int(extracted_digits[2]) if bits_step <= 0: - usage_cb(out, '{} {} is not valid'.format(dh_gex, bits_step)) + out.fail('the step field cannot be 0 or less: {}'.format(bits_step), write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) if all(x < 0 for x in (bits_left_bound, bits_right_bound)): - usage_cb(out, '{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound)) + out.fail('{} {} {} is not valid'.format(dh_gex, bits_left_bound, bits_right_bound), write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) - aconf.gex_test = argument.gex_test + aconf.gex_test = dh_gex + if argument.lookup is not None: + aconf.lookup = argument.lookup - if int(argument.dheat) > 0: - aconf.dheat = argument.dheat + if argument.make_policy is not None: + aconf.make_policy = True + aconf.policy_file = argument.make_policy - aconf.skip_rate_test = argument.skip_rate_test + if argument.policy is not None: + aconf.policy_file = argument.policy - if int(argument.conn_rate_test) > 0: - aconf.conn_rate_test = argument.conn_rate_test + if argument.targets is not None: + aconf.target_file = argument.targets + + if argument.threads is not None: + aconf.threads = argument.threads + + if argument.timeout is not None: + aconf.timeout = float(argument.timeout) + aconf.timeout_set = True + + if argument.verbose is True: + aconf.verbose = True + out.verbose = True except argparse.ArgumentError as err: - usage_cb(out, str(err)) + out.fail(str(err), write_now=True) + parser.print_help() + sys.exit(exitcodes.UNKNOWN_ERROR) - if argument.host is None and argument.client_audit is None and argument.targets is None and argument.list_policies is None and argument.lookup is None and argument.manual is None: - usage_cb(out) + if argument.host == "" and argument.client_audit is False and argument.targets is None and argument.list_policies is False and argument.lookup is None and argument.manual is False: + out.fail("target host must be specified, unless -c, -m, -L, -T, or --lookup are used", write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) if aconf.manual: return aconf - if aconf.lookup != '': + if aconf.lookup != "": return aconf if aconf.list_policies: list_policies(out, aconf.verbose) sys.exit(exitcodes.GOOD) - if aconf.client_audit is None and aconf.target_file is None: - host = argument.host - port = argument.port + if aconf.client_audit is False and aconf.target_file is None: + if oport is not None: + host = argument.host + else: + host, port = Utils.parse_host_and_port(argument.host) - if argument.host is None and aconf.target_file is None: - usage_cb(out, 'host is empty') + if not host and aconf.target_file is None: + out.fail("target host is not specified", write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) - if aconf.client_audit is True: # The default port to listen on during a client audit is 2222. + if oport is None and aconf.client_audit: # The default port to listen on during a client audit is 2222. port = 2222 - if argument.port != 22: - port = Utils.parse_int(argument.port) - if port <= 0 or port > 65535: - usage_cb(out, 'port {} is not valid'.format(argument.port)) + if oport is not None: + port = Utils.parse_int(oport) + if port < 1 or port > 65535: + out.fail("port must be greater than 0 and less than 65535: {}".format(oport), write_now=True) + sys.exit(exitcodes.UNKNOWN_ERROR) aconf.host = host aconf.port = port + if not (aconf.ssh1 or aconf.ssh2): aconf.ssh1, aconf.ssh2 = True, True @@ -1040,20 +979,17 @@ def process_commandline(out: OutputBuffer, args: List[str], usage_cb: Callable[. try: aconf.policy = Policy(policy_file=aconf.policy_file, json_output=aconf.json) except Exception as e: - out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc())) - out.write() + out.fail("Error while loading policy file: %s: %s" % (str(e), traceback.format_exc()), write_now=True) sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a client audit, but provided a server policy, terminate. if aconf.client_audit and aconf.policy.is_server_policy(): - out.fail("Error: client audit selected, but server policy provided.") - out.write() + out.fail("Error: client audit selected, but server policy provided.", write_now=True) sys.exit(exitcodes.UNKNOWN_ERROR) # If the user wants to do a server audit, but provided a client policy, terminate. if aconf.client_audit is False and aconf.policy.is_server_policy() is False: - out.fail("Error: server audit selected, but client policy provided.") - out.write() + out.fail("Error: server audit selected, but client policy provided.", write_now=True) sys.exit(exitcodes.UNKNOWN_ERROR) return aconf @@ -1543,7 +1479,7 @@ def run_gex_granular_modulus_size_test(out: OutputBuffer, s: 'SSH_Socket', kex: def main() -> int: out = OutputBuffer() - aconf = process_commandline(out, sys.argv[1:], usage) + aconf = process_commandline(out, sys.argv[1:]) # If we're on Windows, but the colorama module could not be imported, print a warning if we're in verbose mode. if (sys.platform == 'win32') and ('colorama' not in sys.modules): diff --git a/src/ssh_audit/utils.py b/src/ssh_audit/utils.py index a17ecb6f..08f4705c 100644 --- a/src/ssh_audit/utils.py +++ b/src/ssh_audit/utils.py @@ -129,7 +129,7 @@ def parse_float(v: Any) -> float: return -1.0 @staticmethod - def parse_host_and_port(host_and_port: str, default_port: int = 0) -> Tuple[str, int]: + def parse_host_and_port(host_and_port: str, default_port: int = 22) -> Tuple[str, int]: '''Parses a string into a tuple of its host and port. The port is 0 if not specified.''' host = host_and_port port = default_port diff --git a/test/test_auditconf.py b/test/test_auditconf.py index 00366ecc..5b6aeaae 100644 --- a/test/test_auditconf.py +++ b/test/test_auditconf.py @@ -8,7 +8,6 @@ class TestAuditConf: def init(self, ssh_audit): self.AuditConf = ssh_audit.AuditConf self.OutputBuffer = ssh_audit.OutputBuffer() - self.usage = ssh_audit.usage self.process_commandline = process_commandline @staticmethod @@ -107,7 +106,7 @@ def test_audit_conf_level(self): def test_audit_conf_process_commandline(self): # pylint: disable=too-many-statements - c = lambda x: self.process_commandline(self.OutputBuffer, x.split(), self.usage) # noqa + c = lambda x: self.process_commandline(self.OutputBuffer, x.split()) # noqa with pytest.raises(SystemExit): conf = c('') with pytest.raises(SystemExit):