-
Notifications
You must be signed in to change notification settings - Fork 0
/
Fofa_Spider.py
74 lines (64 loc) · 2.56 KB
/
Fofa_Spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import aiohttp
import asyncio
import base64
import requests
import json
from loguru import logger
def banner():
print("""
______ ______ ______ ______ ______ ______ _____ _____ ______ ______
| | / | | \ | | | | | | / | | | | \ | | | | \ \ | | | | | \
| |---- | | | | | |---- | |__| | '------. | |__|_/ | | | | | | | |---- | |__| |
|_| \_|__|_/ |_| |_| |_| ____|_/ |_| _|_|_ |_|_/_/ |_|____ |_| \_\
""")
def get_fofa(api_url):
"""请求FofaAPI获取结果
Args:
api_url: api url地址
Returns:
返回一个url列表
"""
try:
res = requests.get(url = api_url).text
results = json.loads(res)['results'] # json结果转换为字典
results = ['http://' + url[0] if not url[0].startswith('http') else url[0] for url in results] # url中没有协议则默认添加http协议
print("[*]成功获取结果%d条..." % len(results))
print("[+]正在进行存活检查...")
return results
except Exception as e:
print(e)
async def check_url(url, output_format):
"""检查url是否存活(可访问)
Args:
url: url
Returns:
pass
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
try:
async with session.get(url = url, timeout=5) as resp:
text = await resp.text()
logger.success(url)
if output_format == 'y' or output_format == 'N':
with open(filename, 'a+') as f:
f.write(url+'\n')
except Exception as e:
logger.warning(url)
if output_format == 'N':
with open(filename, 'a+') as f:
f.write(url+'\n')
if __name__ == '__main__':
banner()
api = input(">API:")
filename = api[api.find('qbase64=') + 8:]
output = input("请选择是否只输出存活url[y/N导出全部] ")
results = get_fofa(api)
loop = asyncio.get_event_loop() # 获取当前事件循环, 如果没有则创建并将其设为当前时间循环
try:
if results: # 判断是否有url
tasks = [asyncio.ensure_future(check_url(url, output)) for url in results] # ensure_future(obj), 如果obj是一个协程则加入执行计划
loop.run_until_complete(asyncio.wait(tasks)) # wait(aws) aws需是可迭代且不为空对象, 返回done和panding集合. run_until_complete(future)运行和停止事件循环, 返回 Future 的结果 或者引发相关异常.
else:
print("Error:没有搜索到结果!")
except Exception as e:
print(e)