Skip to content

Commit

Permalink
update(Alist2Strm):分离AutoFilm与Alist2Strm
Browse files Browse the repository at this point in the history
  • Loading branch information
Akimio521 committed Jun 27, 2024
1 parent f6cbfad commit 373942f
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 151 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ RUN sed -i 's/\r//' entrypoint.sh \
&& chmod +x entrypoint.sh

RUN apk update \
&& apk upgrade \
&& apk add bash \
apk upgrade \
apk add bash \
&& rm -rf \
/tep \
/var/lib/apt/lists \
Expand All @@ -27,6 +27,7 @@ RUN pip install --upgrade pip \
WORKDIR "/app"

COPY main.py autofilm.py version.py /app/
COPY modules /app/modules

VOLUME ["/app/config", "/app/media"]
ENTRYPOINT ["/entrypoint.sh"]
163 changes: 15 additions & 148 deletions autofilm.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
#! /usr/bin/env python3
# -*- coding:utf-8 -*-

import logging
import hmac
import hashlib
import base64
import asyncio
from aiohttp import ClientSession
from pathlib import Path
from typing import Optional

import yaml
from alist import AlistFileSystem, AlistPath

from modules import Alist2Strm


class AutoFilm:
Expand All @@ -30,11 +22,6 @@ def __init__(

self.config_data = {}

self.video_ext = ("mp4", "mkv", "flv", "avi", "wmv", "ts", "rmvb", "webm")
self.subtitle_ext = ("ass", "srt", "ssa", "sub")
self.img_ext = ("png", "jpg")
self.all_ext = (*self.video_ext, *self.subtitle_ext, *self.img_ext, "nfo")

try:
with self.config_path.open(mode="r", encoding="utf-8") as f:
self.config_data = yaml.safe_load(f)
Expand Down Expand Up @@ -66,144 +53,24 @@ def __init__(

logging.info(f"输出目录:{self.output_dir}".center(50, "="))

def run(self) -> None:
def run_Alist2Strm(self) -> None:
try:
alist_server_list: list[dict] = self.config_data["AlistServerList"]
except Exception as e:
logging.error(f"Alist服务器列表读取失败,错误信息:{str(e)}")
else:
logging.debug("Alist服务器加载成功")
for alist_server in alist_server_list:
alist_server_url: str = alist_server.get("url")
alist_server_username: str = alist_server.get("username")
alist_server_password: str = alist_server.get("password")
alist_server_base_path: Optional[str] = alist_server.get("base_path")
alist_server_token: Optional[str] = alist_server.get("token")

alist_server_url = alist_server_url.rstrip("/")
if alist_server_base_path == None or alist_server_base_path == "":
alist_server_base_path = "/"
alist_server_base_path = "/" + alist_server_base_path.strip("/") + "/"

if not all(
[alist_server_url, alist_server_username, alist_server_password]
):

logging.error(
f"Alist服务器{alist_server}配置错误,请检查配置文件:{alist_server}"
)
return

logging.debug(
f"Alist服务器URL:{alist_server_url},用户名:{alist_server_username},密码:{alist_server_password},基本路径:{alist_server_base_path},token:{alist_server_token}"
alist2strm = Alist2Strm(
alist_server.get("url"),
alist_server.get("username"),
alist_server.get("password"),
alist_server.get("base_path"),
alist_server.get("token"),
self.output_dir,
self.subtitle,
self.img,
self.nfo,
self.library_mode,
)
asyncio.run(
self._processer(
alist_server_url,
alist_server_username,
alist_server_password,
alist_server_base_path,
alist_server_token,
)
)

async def _processer(
self,
alist_server_url: str,
alist_server_username: str,
alist_server_password: str,
alist_server_base_path: str,
alist_server_token: str,
) -> None:
try:
fs = AlistFileSystem.login(
alist_server_url, alist_server_username, alist_server_password
)
except Exception as e:
logging.critical(
f"登录失败,错误信息:{str(e)},请检查Alist地址:{alist_server_url},用户名:{alist_server_username},密码:{alist_server_password}是否正确"
)
return
try:
fs.chdir(alist_server_base_path)
except Exception as e:
logging.critical(
f"切换目录失败,请检查Alist服务器中是否存在该目录:{alist_server_base_path},错误信息:{str(e)}"
)
return

async with ClientSession() as session:
tasks = [
asyncio.create_task(
self._file_process(
path, session, alist_server_base_path, alist_server_token
)
)
for path in fs.rglob("*.*")
]
await asyncio.gather(*tasks)

async def _file_process(
self,
alist_path_cls: AlistPath,
session: ClientSession,
base_path: Path,
token: str,
) -> None:
if not alist_path_cls.name.lower().endswith(self.all_ext):
return

file_output_path: Path = (
self.output_dir / alist_path_cls.name
if self.library_mode
else self.output_dir / str(alist_path_cls).replace(base_path, "")
)

file_alist_abs_path: str = alist_path_cls.url[
alist_path_cls.url.index("/d/") + 2 :
]

file_download_url: str = alist_path_cls.url + self._sign(
secret_key=token, data=file_alist_abs_path
)

logging.debug(
f"正在处理:{alist_path_cls.name},本地文件目录:{file_output_path},文件远程路径:{file_alist_abs_path},下载URL:{file_download_url}"
)

if alist_path_cls.name.lower().endswith(self.video_ext):
file_output_path.parent.mkdir(parents=True, exist_ok=True)
file_output_path = file_output_path.with_suffix(".strm")
with file_output_path.open(mode="w", encoding="utf-8") as f:
f.write(file_download_url)
logging.debug(
f"{file_output_path.name}创建成功,文件本地目录:{file_output_path.parent}"
)
return

if alist_path_cls.name.lower().endswith(self.img_ext) and not self.img:
return
elif (
alist_path_cls.name.lower().endswith(self.subtitle_ext)
and not self.subtitle
):
return
elif alist_path_cls.name.lower().endswith("nfo") and not self.nfo:
return
else:
file_output_path.parent.mkdir(parents=True, exist_ok=True)
async with session.get(file_download_url) as resp:
if resp.status == 200:
with file_output_path.open(mode="wb") as f:
f.write(await resp.read())
logging.debug(
f"{file_output_path.name}下载成功,文件本地目录:{file_output_path.parent}"
)

def _sign(self, secret_key: Optional[str], data: str) -> str:
if secret_key == "" or secret_key == None:
return ""
h = hmac.new(secret_key.encode(), digestmod=hashlib.sha256)
expire_time_stamp = str(0)
h.update((data + ":" + expire_time_stamp).encode())
return f"?sign={base64.urlsafe_b64encode(h.digest()).decode()}:0"
alist2strm.run()
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
logging.info(f"配置文件路径:{args.config_path}")

my_autofilm = autofilm.AutoFilm(config_path=args.config_path)
my_autofilm.run()
my_autofilm.run_Alist2Strm()
139 changes: 139 additions & 0 deletions modules/Alist2Strm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
import hmac
import hashlib
import base64
import asyncio
from aiohttp import ClientSession
from pathlib import Path
from typing import Optional

from alist import AlistFileSystem, AlistPath

VIDEO_EXT = ("mp4", "mkv", "flv", "avi", "wmv", "ts", "rmvb", "webm")
SUBTITLE_EXT = ("ass", "srt", "ssa", "sub")
IMG_EXT = ("png", "jpg")
ALL_EXT = (*VIDEO_EXT, *SUBTITLE_EXT, *IMG_EXT, "nfo")


class Alist2Strm:
def __init__(
self,
alist_server_url: str,
alist_server_username: str,
alist_server_password: str,
alist_server_base_dir: Optional[str],
token: Optional[str],
output_dir: Path,
subtitle: bool = False,
img: bool = False,
nfo: bool = False,
library_mode: bool = True,
) -> None:
self.alist_server_url = alist_server_url.rstrip("/")
self.alist_server_username = alist_server_username
self.alist_server_password = alist_server_password
if alist_server_base_dir == None or alist_server_base_dir == "":
self.alist_server_base_dir = "/"
else:
self.alist_server_base_dir = "/" + alist_server_base_dir.strip("/") + "/"
self.token = token if token else None
self.output_dir = output_dir
self.subtitle = subtitle
self.img = img
self.nfo = nfo
self.library_mode = library_mode

logging.debug(
f"Alist2Strm配置:Alist地址:{self.alist_server_url},Alist用户名:{self.alist_server_username},Alist密码:{self.alist_server_password},Alist基本路径:{self.alist_server_base_dir},Alist签名Token:{self.token},输出目录:{self.output_dir},是否下载字幕:{self.subtitle},是否下载图片:{self.img},是否下载NFO:{self.nfo},是否为库模式:{self.library_mode}"
)

def run(self) -> None:
asyncio.run(self._processer())

async def _processer(self):
try:
fs = AlistFileSystem.login(
self.alist_server_url,
self.alist_server_username,
self.alist_server_password,
)
except Exception as e:
logging.critical(
f"登录失败,错误信息:{str(e)},请检查Alist地址:{self.alist_server_url},用户名:{self.alist_server_username},密码:{self.alist_server_password}是否正确"
)
return

try:
fs.chdir(self.alist_server_base_dir)
except Exception as e:
logging.critical(
f"切换目录失败,错误信息:{str(e)},请检查Alist服务器中是否存在该目录:{self.alist_server_base_dir}"
)
return

async with ClientSession() as session:
tasks = [
asyncio.create_task(self._file_processer(alist_path_cls, session))
for alist_path_cls in fs.rglob("*.*")
]
await asyncio.gather(*tasks)

async def _file_processer(
self, alist_path_cls: AlistPath, session: ClientSession
) -> None:
if not alist_path_cls.name.lower().endswith(ALL_EXT):
return

file_output_path: Path = (
self.output_dir / alist_path_cls.name
if self.library_mode
else self.output_dir
/ str(alist_path_cls).replace(self.alist_server_base_dir, "")
)

file_alist_abs_path: str = alist_path_cls.url[
alist_path_cls.url.index("/d/") + 2 :
]

file_download_url: str = alist_path_cls.url + self._sign(
secret_key=self.token, data=file_alist_abs_path
)

logging.debug(
f"正在处理:{alist_path_cls.name},本地文件目录:{file_output_path},文件远程路径:{file_alist_abs_path},下载URL:{file_download_url}"
)

if alist_path_cls.name.lower().endswith(VIDEO_EXT):
file_output_path.parent.mkdir(parents=True, exist_ok=True)
file_output_path = file_output_path.with_suffix(".strm")
with file_output_path.open(mode="w", encoding="utf-8") as f:
f.write(file_download_url)
logging.debug(
f"{file_output_path.name}创建成功,文件本地目录:{file_output_path.parent}"
)
return

if alist_path_cls.name.lower().endswith(IMG_EXT) and not self.img:
return
elif alist_path_cls.name.lower().endswith(SUBTITLE_EXT) and not self.subtitle:
return
elif alist_path_cls.name.lower().endswith("nfo") and not self.nfo:
return
else:
file_output_path.parent.mkdir(parents=True, exist_ok=True)
async with session.get(file_download_url) as resp:
if resp.status == 200:
with file_output_path.open(mode="wb") as f:
f.write(await resp.read())
logging.debug(
f"{file_output_path.name}下载成功,文件本地目录:{file_output_path.parent}"
)

def _sign(self, secret_key: Optional[str], data: str) -> str:
if secret_key == "" or secret_key == None:
return ""

h = hmac.new(secret_key.encode(), digestmod=hashlib.sha256)
expire_time_stamp = str(0)
h.update((data + ":" + expire_time_stamp).encode())
return f"?sign={base64.urlsafe_b64encode(h.digest()).decode()}:0"
1 change: 1 addition & 0 deletions modules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .Alist2Strm import Alist2Strm

0 comments on commit 373942f

Please sign in to comment.