-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
315 lines (293 loc) · 11.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#coding:utf-8
'''
@author: ZainCheung
@LastEditors: ZainCheung
@description:网易云音乐全自动每日打卡300首歌升级账号等级,使用前请先到init.config文件配置
@Date: 2020-06-25 14:28:48
@LastEditTime: 2020-09-01 18:20:00
'''
from configparser import ConfigParser
from threading import Timer
import requests
import random
import hashlib
import datetime
import time
import json
import logging
import math
import os
'''
使用绝对路径时,切换到项目的当前目录。
'''
os.chdir(os.path.dirname(os.path.abspath(__file__)))
logFile = open("run.log", encoding="utf-8", mode="a")
logging.basicConfig(stream=logFile, format="%(asctime)s %(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
grade = [10,40,70,130,200,400,1000,3000,8000,20000]
api = ''
class Task(object):
'''
对象的构造函数
'''
def __init__(self, uin, pwd, sckey, countrycode=86):
self.uin = uin
self.pwd = pwd
self.countrycode = countrycode
self.sckey = sckey
'''
带上用户的cookie去发送数据
url:完整的URL路径
postJson:要以post方式发送的数据
返回response
'''
def getResponse(self, url, postJson):
response = requests.post(url, data=postJson, headers={'Content-Type':'application/x-www-form-urlencoded'},cookies=self.cookies)
return response
'''
登录
'''
def login(self):
data = {"uin":self.uin,"pwd":self.pwd,"countrycode":self.countrycode,"r":random.random()}
if '@' in self.uin:
url = api + '?do=email'
else:
url = api + '?do=login'
response = requests.post(url, data=data, headers={'Content-Type':'application/x-www-form-urlencoded'})
code = json.loads(response.text)['code']
self.name = json.loads(response.text)['profile']['nickname']
self.uid = json.loads(response.text)['account']['id']
if code==200:
self.error = ''
else:
self.error = '登录失败,请检查账号'
self.cookies = response.cookies.get_dict()
self.log('登录成功')
logging.info("登录成功")
'''
每日签到
'''
def sign(self):
url = api + '?do=sign'
response = self.getResponse(url, {"r":random.random()})
data = json.loads(response.text)
if data['code'] == 200:
self.log('签到成功')
logging.info('签到成功')
else:
self.log('重复签到')
logging.info('重复签到')
'''
每日打卡300首歌
'''
def daka(self):
url = api + '?do=daka'
response = self.getResponse(url, {"r":random.random()})
self.log(response.text)
'''
查询用户详情
'''
def detail(self):
url = api + '?do=detail'
data = {"uid":self.uid, "r":random.random()}
response = self.getResponse(url, data)
data = json.loads(response.text)
self.level = data['level']
self.listenSongs = data['listenSongs']
self.log('获取用户详情成功')
logging.info('获取用户详情成功')
'''
Server推送
'''
def server(self):
if self.sckey == '':
return
url = 'https://sc.ftqq.com/' + self.sckey + '.send'
self.diyText() # 构造发送内容
response = requests.get(url,params={"text":self.title, "desp":self.content})
data = json.loads(response.text)
if data['errno'] == 0:
self.log('用户:' + self.name + ' Server酱推送成功')
logging.info('用户:' + self.name + ' Server酱推送成功')
else:
self.log('用户:' + self.name + ' Server酱推送失败,请检查sckey是否正确')
logging.info('用户:' + self.name + ' Server酱推送失败,请检查sckey是否正确')
'''
自定义要推送到微信的内容
title:消息的标题
content:消息的内容,支持MarkDown格式
'''
def diyText(self):
# today = datetime.date.today()
# kaoyan_day = datetime.date(2020,12,21) #2021考研党的末日
# date = (kaoyan_day - today).days
one = requests.get('https://api.qinor.cn/soup/').text # 每日一句的api
for count in grade:
if self.level < 10:
if self.listenSongs < 20000:
if self.listenSongs < count:
self.tip = '还需听歌' + str(count-self.listenSongs) + '首即可升级'
break
else:
self.tip = '你已经听够20000首歌曲,如果登录天数达到800天即可满级'
else:
self.tip = '恭喜你已经满级!'
if self.error == '':
state = ("- 目前已完成签到\n"
"- 今日共打卡" + str(self.dakanum) + "次\n"
"- 今日共播放" + str(self.dakaSongs) + "首歌\n"
"- 还需要打卡" + str(self.day) +"天")
self.title = ("网易云今日打卡" + str(self.dakaSongs) + "首,已播放" + str(self.listenSongs) + "首")
else:
state = self.error
self.title = '网易云听歌任务出现问题!'
self.content = (
"------\n"
"#### 账户信息\n"
"- 用户名称:" + str(self.name) + "\n"
"- 当前等级:" + str(self.level) + "级\n"
"- 累计播放:" + str(self.listenSongs) + "首\n"
"- 升级提示:" + self.tip + "\n\n"
"------\n"
"#### 任务状态\n" + str(state) + "\n\n"
"------\n"
"#### 注意事项\n- 网易云音乐等级数据每天下午2点更新 \n\n"
"------\n"
"#### 打卡日志\n" + self.dakaSongs_list + "\n\n"
"------\n"
"#### 今日一句\n- " + one + "\n\n")
'''
打印日志
'''
def log(self, text):
time_stamp = datetime.datetime.now()
print(time_stamp.strftime('%Y.%m.%d-%H:%M:%S') + ' ' + str(text))
self.time =time_stamp.strftime('%H:%M:%S')
self.list.append("- [" + self.time + "] " + str(text) + "\n\n")
'''
开始执行
'''
def start(self):
try:
self.list = []
self.list.append("- 初始化完成\n\n")
self.login()
self.sign()
self.detail()
counter = self.listenSongs
self.list.append("- 开始打卡\n\n")
for i in range(1,10):
self.daka()
# self.log('用户:' + self.name + ' 第' + str(i) + '次打卡成功,即将休眠30秒')
self.log('第' + str(i) + '次打卡成功')
logging.info('用户:' + self.name + ' 第' + str(i) + '次打卡成功,即将休眠30秒')
time.sleep(10)
self.dakanum =i
self.detail()
self.dakaSongs = self.listenSongs - counter
self.log('今日已打卡' + str(self.dakaSongs) + '首')
if self.dakaSongs == 300:
break
if self.listenSongs >= 20000:
self.day = 0
else:
self.day = math.ceil((20000 - self.listenSongs)/300)
self.list.append("- 打卡结束\n\n")
self.list.append("- 消息推送\n\n")
self.dakaSongs_list = ''.join(self.list)
self.server()
except:
self.log('用户任务执行中断,请检查账号密码是否正确')
logging.error('用户任务执行中断,请检查账号密码是否正确========================================')
else:
self.log('用户:' + self.name + ' 今日任务已完成')
logging.info('用户:' + self.name + ' 今日任务已完成========================================')
'''
初始化:读取配置,配置文件为init.config
返回字典类型的配置对象
'''
def init():
global api # 初始化时设置api
config = ConfigParser()
config.read('init.config', encoding='UTF-8-sig')
uin = config['token']['account']
pwd = config['token']['password']
countrycode = config['token']['countrycode']
api = config['setting']['api']
md5Switch = config.getboolean('setting','md5Switch')
peopleSwitch = config.getboolean('setting','peopleSwitch')
sckey = config['setting']['sckey']
print('配置文件读取完毕')
logging.info('配置文件读取完毕')
conf = {
'uin': uin,
'pwd': pwd,
'countrycode': countrycode,
'api': api,
'md5Switch': md5Switch,
'peopleSwitch':peopleSwitch,
'sckey':sckey
}
return conf
'''
MD5加密
str:待加密字符
返回加密后的字符
'''
def md5(str):
hl = hashlib.md5()
hl.update(str.encode(encoding='utf-8'))
return hl.hexdigest()
'''
加载Json文件
jsonPath:json文件的名字,例如account.json
'''
def loadJson(jsonPath):
with open(jsonPath,encoding='utf-8') as f:
account = json.load(f)
return account
'''
检查api
'''
def check():
url = api + '?do=check'
respones = requests.get(url)
if respones.status_code == 200:
print('api测试正常')
logging.info('api测试正常')
else:
print('api测试异常')
logging.error('api测试异常')
'''
任务池
'''
def taskPool():
config = init()
check() # 每天对api做一次检查
if config['peopleSwitch'] is True:
print('多人开关已打开,即将开始执行多人任务')
logging.info('多人开关已打开,即将执行进行多人任务')
account = loadJson("account.json")
for man in account:
print('账号: ' + man['account'] + ' 开始执行')
logging.info('账号: ' + man['account'] + ' 开始执行========================================')
task = Task(man['account'], man['password'], man['sckey'])
task.start()
time.sleep(10)
print('所有账号已全部完成任务,服务进入休眠中,等待明天重新启动')
logging.info('所有账号已全部完成任务,服务进入休眠中,等待明天重新启动')
else :
print('账号: ' + config['uin'] + ' 开始执行')
logging.info('账号: ' + config['uin'] + ' 开始执行========================================')
if config['md5Switch'] is True:
print('MD5开关已打开,即将开始为你加密,密码不会上传至服务器,请知悉')
logging.info('MD5开关已打开,即将开始为你加密,密码不会上传至服务器,请知悉')
config['pwd'] = md5(config['pwd'])
task = Task(config['uin'], config['pwd'], config['sckey'], config['countrycode'])
task.start()
'''
程序的入口
'''
if __name__ == '__main__':
while True:
Timer(0, taskPool, ()).start()
time.sleep(60*60*24) # 间隔一天