-
Notifications
You must be signed in to change notification settings - Fork 33
/
videodl.py
145 lines (136 loc) · 5.61 KB
/
videodl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# -*- coding: utf-8 -*-
import requests
import json
from os import remove, mkdir
from os.path import exists
from re import compile as regex_compile
from pagermaid import bot, log, version
from pagermaid.listener import listener
from pagermaid.utils import alias_command
from telethon.tl.types import DocumentAttributeVideo
from time import sleep
@listener(outgoing=True, command=alias_command("vdl"),
description="下载 YouTube/bilibili 视频并上传",
parameters="<url>")
async def vdl(context):
url = context.arguments
reply = await context.get_reply_message()
reply_id = None
await context.edit("视频获取中 . . .")
if reply:
reply_id = reply.id
if url is None:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
bilibili_pattern = regex_compile(
r"^(http(s)?://)?((w){3}.)?bilibili(\.com)?/.+")
youtube_pattern = regex_compile(
r"^(http(s)?://)?((w){3}.)?youtu(be|.be)?(\.com)?/.+")
if youtube_pattern.match(url):
try:
from pytube import YouTube
except ImportError:
await context.edit('`pytube`支持库未安装,YouTube视频无法下载\n请使用 `-sh pip3 install --user '
'git+https://github.com/nficano/pytube 或 -sh pip3 install pytube --upgrade ` '
'安装,或自行ssh安装\n\n已安装过 `pytube3` 的用户请使用 `-sh pip3 '
'uninstall pytube3 -y` 进行卸载')
return
url = url.replace('www.youtube.com/watch?v=', 'youtu.be/')
try:
if not await youtube_dl(url, context, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
sleep(3)
except:
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
sleep(3)
await log(f"已拉取 YouTube 视频,地址: {url}.")
await context.delete()
elif bilibili_pattern.match(url):
if not await bilibili_dl(url, context, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
sleep(3)
await log(f"已拉取 bilibili 视频,地址: {url}.")
await context.delete()
else:
await context.edit("出错了呜呜呜 ~ 无效的网址。")
async def youtube_dl(url, context, reply_id):
from pytube import YouTube
video = YouTube(url)
title = video.title
filename = title + ".mp4"
duration = video.length
await context.edit("视频下载中 . . .")
YouTube(url).streams.filter(
file_extension='mp4').get_highest_resolution().download()
if not exists(filename):
return False
await upload(False, filename, context, reply_id, title, duration)
return True
async def bilibili_dl(url, context, reply_id):
url = 'https://tenapi.cn/bilivideo/?url=' + url
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063', "X-Real-IP": "223.252.199.66"}
for _ in range(20): # 最多尝试20次
status = False
try:
req = requests.request('GET', url, headers=headers)
if req.status_code == 200:
req = json.loads(req.content)
if req['title']:
title = req['title']
url = req['url']
status=True
break
else:
return False
except:
continue
if status is False:
await context.client.send_message(context.chat_id, "出错了呜呜呜 ~ 试了好多好多次都无法访问到 API 服务器 。")
return False
filename = title + ".flv"
video = requests.get(url, headers=headers)
await context.edit("视频下载中 . . .")
with open(filename, 'wb') as f:
f.write(video.content)
await upload(True, filename, context, reply_id, title)
return True
async def upload(as_file, filename, context, reply_id, caption, duration=0):
if not exists("plugins/VideoDL/FastTelethon.py"):
if not exists("plugins/VideoDLExtra"):
mkdir("plugins/VideoDLExtra")
faster = requests.request(
"GET", "https://gist.githubusercontent.com/TNTcraftHIM/ca2e6066ed5892f67947eb2289dd6439/raw"
"/86244b02c7824a3ca32ce01b2649f5d9badd2e49/FastTelethon.py")
for _ in range(6): # 最多尝试6次
if faster.status_code == 200:
with open("plugins/VideoDLExtra/FastTelethon.py", "wb") as f:
f.write(faster.content)
break
try:
from VideoDLExtra.FastTelethon import upload_file
file = await upload_file(context.client, open(filename, 'rb'), filename)
except:
file = filename
await context.client.send_message(context.chat_id, '(`FastTelethon`支持文件导入失败,上传速度可能受到影响)')
await context.edit("视频上传中 . . .")
if as_file is True:
await context.client.send_file(
context.chat_id,
file,
caption=caption,
link_preview=False,
force_document=True,
reply_to=reply_id
)
else:
await context.client.send_file(
context.chat_id,
file,
caption=caption,
link_preview=False,
force_document=False,
attributes=(DocumentAttributeVideo(duration, 0, 0),),
reply_to=reply_id
)
remove(filename)