Skip to content

Commit

Permalink
Added support for IPv6 to DHEat tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jtesta committed Jun 29, 2024
1 parent 46ec4e3 commit c94824e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/ssh_audit/dheat.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ def __init__(self, out: 'OutputBuffer', aconf: 'AuditConf', banner: Optional['Ba
# The SSH2_Kex object that we recieved from the server in a prior connection. We'll use it as a template to craft our own kex.
self.kex = kex

# Resolve the target to an IP address depending on the user preferences (IPv4 or IPv6).
self.debug("Resolving target %s..." % self.target)
self.target_address_family, self.target_ip_address = DHEat._resolve_hostname(self.target, aconf.ip_version_preference)
self.debug("Resolved %s to %s (address family %u)" % (self.target, self.target_ip_address, self.target_address_family))

# The connection and read timeouts.
self.connect_timeout = aconf.timeout
self.read_timeout = aconf.timeout
Expand Down Expand Up @@ -369,6 +374,11 @@ def _close_socket(socket_dict: Dict[socket.socket, float], s: socket.socket) ->
else:
out.d("DHEat.dh_rate_test(): starting test; parameters: %f seconds, %u max connections, %u concurrent sockets." % (max_time, max_connections, concurrent_sockets), write_now=True)

# Resolve the target into an IP address
out.d("Resolving target %s..." % aconf.host)
target_address_family, target_ip_address = DHEat._resolve_hostname(aconf.host, aconf.ip_version_preference)
out.d("Resolved %s to %s (address family %u)" % (aconf.host, target_ip_address, target_address_family))

num_attempted_connections = 0
num_opened_connections = 0
num_exceeded_maxstartups = 0
Expand Down Expand Up @@ -426,11 +436,11 @@ def _close_socket(socket_dict: Dict[socket.socket, float], s: socket.socket) ->

# Open new sockets until we've hit the number of concurrent sockets, or if we exceeded the number of maximum connections.
while (len(socket_dict) < concurrent_sockets) and (len(socket_dict) + num_opened_connections < max_connections):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = socket.socket(target_address_family, socket.SOCK_STREAM)
s.setblocking(False)

# out.d("Creating socket (%u of %u already exist)..." % (len(socket_dict), concurrent_sockets), write_now=True)
ret = s.connect_ex((aconf.host, aconf.port))
ret = s.connect_ex((target_ip_address, aconf.port))
num_attempted_connections += 1
if ret in [0, 115]: # Check if connection is successful or EINPROGRESS.
socket_dict[s] = now
Expand Down Expand Up @@ -743,6 +753,22 @@ def run(self) -> None:
print()


@staticmethod
def _resolve_hostname(host: str, ip_version_preference: List[int]) -> Tuple[int, str]:
'''Resolves a hostname to its IPv4 or IPv6 address, depending on user preference.'''

family = socket.AF_UNSPEC
if len(ip_version_preference) == 1:
family = socket.AF_INET if ip_version_preference[0] == 4 else socket.AF_INET6

r = socket.getaddrinfo(host, 0, family, socket.SOCK_STREAM)
for address_family, socktype, _, _, addr in r:
if socktype == socket.SOCK_STREAM:
return address_family, addr[0]

return -1, ''


def _run(self) -> bool:
'''Where all the magic happens.'''

Expand Down Expand Up @@ -894,7 +920,8 @@ def _close_socket(s: socket.socket) -> None:

# Copy variables from the object (which might exist in another process?). This might cut down on inter-process overhead.
connect_timeout = self.connect_timeout
target = self.target
target_ip_address = self.target_ip_address
target_address_family = self.target_address_family
port = self.port

# Determine if we are attacking with a GEX.
Expand Down Expand Up @@ -945,17 +972,17 @@ def _close_socket(s: socket.socket) -> None:
num_socket_exceptions = 0
num_openssh_throttled_connections = 0

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = socket.socket(target_address_family, socket.SOCK_STREAM)
s.settimeout(connect_timeout)

# Loop until a successful TCP connection is made.
connected = False
while not connected:

# self.debug("Connecting to %s:%d" % (self.target, self.port))
# self.debug("Connecting to %s:%d" % (self.target_ip_address, self.port))
try:
num_attempted_tcp_connections += 1
s.connect((target, port))
s.connect((target_ip_address, port))
connected = True
except OSError as e:
self.debug("Failed to connect: %s" % str(e))
Expand Down
2 changes: 2 additions & 0 deletions src/ssh_audit/ssh_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ def output_recommendations(out: OutputBuffer, algs: Algorithms, algorithm_recomm

fn = level_to_output[level]

an = '?'
sg = '?'
if action == 'del':
an, sg = 'remove', '-'
ret = False
Expand Down

0 comments on commit c94824e

Please sign in to comment.