forked from yjqiang/bili2.0
-
Notifications
You must be signed in to change notification settings - Fork 0
/
web_session.py
117 lines (101 loc) · 4.32 KB
/
web_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import sys
import asyncio
from typing import Any
import aiohttp
import printer
from exceptions import LogoutError, ForbiddenError
from json_rsp_ctrl import Ctrl, JsonRspType, DEFAULT_CTRL
sem = asyncio.Semaphore(3)
class WebSession:
__slots__ = ('session',)
DEFAULT_OK_STATUS_CODES = (200,)
def __init__(self):
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=4))
@staticmethod
async def _recv_json(rsp: aiohttp.ClientResponse):
return await rsp.json(content_type=None)
@staticmethod
async def _recv_str(rsp: aiohttp.ClientResponse):
return await rsp.text()
@staticmethod
async def _recv_bytes(rsp: aiohttp.ClientResponse):
return await rsp.read()
# 基本就是通用的 request
async def _orig_req(self, parse_rsp, method, url, **kwargs):
i = 0
while True:
i += 1
if i >= 10:
printer.warn(f'反复请求多次未成功, {url}, {kwargs}')
await asyncio.sleep(0.75)
try:
async with self.session.request(method, url, **kwargs) as rsp:
if rsp.status == 200:
body = await parse_rsp(rsp)
if body:
return body
except asyncio.CancelledError:
raise
except:
# print('当前网络不好,正在重试,请反馈开发者!!!!')
print(sys.exc_info()[0], sys.exc_info()[1], url)
async def orig_req_json(self,
method,
url,
**kwargs) -> Any:
return await self._orig_req(self._recv_json, method, url, **kwargs)
# 为 bilibili 这边加了一些东西的 request
async def _req(self, parse_rsp, method, url, ok_status_codes=None, **kwargs):
if ok_status_codes is None:
ok_status_codes = self.DEFAULT_OK_STATUS_CODES
async with sem:
i = 0
while True:
i += 1
if i >= 10:
printer.warn(f'反复请求多次未成功, {url}, {kwargs}')
await asyncio.sleep(0.75)
try:
async with self.session.request(method, url, **kwargs) as rsp:
if rsp.status in ok_status_codes:
body = await parse_rsp(rsp)
if body: # 有时候是 None 或空,直接屏蔽。read 或 text 类似,禁止返回空的东西
return body
elif rsp.status in (412, 403):
printer.warn(f'403频繁, {url}, {kwargs}')
raise ForbiddenError(msg=url)
except asyncio.CancelledError:
raise
except ForbiddenError:
raise
except:
# print('当前网络不好,正在重试,请反馈开发者!!!!')
print(sys.exc_info()[0], sys.exc_info()[1], url)
async def request_json(self,
method,
url,
ctrl: Ctrl = DEFAULT_CTRL,
**kwargs) -> dict:
while True:
body = await self._req(self._recv_json, method, url, **kwargs)
if not isinstance(body, dict): # 这里是强制定制的,与b站配合的!!!!
continue
json_rsp_type = ctrl.verify(body)
if json_rsp_type == JsonRspType.OK:
return body
elif json_rsp_type == JsonRspType.IGNORE:
await asyncio.sleep(0.75)
elif json_rsp_type == JsonRspType.LOGOUT:
print('api提示没有登录')
print(body)
raise LogoutError(msg='提示没有登陆')
async def request_binary(self,
method,
url,
**kwargs) -> bytes:
return await self._req(self._recv_bytes, method, url, **kwargs)
async def request_text(self,
method,
url,
**kwargs) -> str:
return await self._req(self._recv_str, method, url, **kwargs)