Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
THMonster committed Oct 13, 2023
1 parent 6404343 commit 0e118ae
Show file tree
Hide file tree
Showing 31 changed files with 2,251 additions and 2,271 deletions.
1,572 changes: 967 additions & 605 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ tokio = { version = "1", features = ["full"] }
# tokio-rustls = "0.22"
# webpki-roots = "*"
# tokio-tungstenite = { version = "0.18", features = ["rustls-tls"] }
tokio-tungstenite = { version = "0.18", features = ["native-tls-vendored"] }
tokio-tungstenite = { version = "0.20", features = ["native-tls-vendored"] }
reqwest = { version = "0.11", default-features = false, features = ["brotli", "deflate", "gzip", "json", "native-tls-vendored", "cookies"] }
log = "0.4"
env_logger = "0.10"
Expand All @@ -19,29 +19,28 @@ serde_json = "1.0"
serde = { version = "1", features = ["derive"] }
bincode = "1.3"
rand = "0.8"
regex = "1.7"
regex = "1.10"
fancy-regex = "0.11"
uuid = { version = "1.3", features = ["v4"] }
uuid = { version = "1.4", features = ["v4"] }
chrono = "0.4"
url = "2.3"
url = "2.4"
urlencoding = "2.1"
base64 = "0.21"
libc = "0.2"
toml = "0.7"
toml = "0.8"
html-escape = "0.2"
futures = "0.3"
roxmltree = "0.18"
async-channel = "1.8"
directories = "4.0"
async-channel = "1.9"
directories = "5.0"
anyhow = "1"
bytes = "1.4"
bytes = "1.5"
# boa_engine = { features = ["console"], version = "0.15.0" }
tars-stream = { path = "tars-stream"}

ring = "*"
cbc = "*"
aes = "*"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] }
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite"] }
async-compression = { version = "0.4", features = ["tokio", "deflate", "brotli"] }

[profile.release]
Expand Down
1 change: 1 addition & 0 deletions src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub enum BVideoType {
Video,
Bangumi
}

pub struct BVideoInfo {
pub base_url: String,
pub video_type: BVideoType,
Expand Down
121 changes: 62 additions & 59 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
pub mod config;

use reqwest::Url;
use std::{
path::Path,
sync::atomic::AtomicBool,
};
use tokio::sync::RwLock;
use tokio::{
fs::OpenOptions,
io::AsyncWriteExt,
};

use self::config::{BVideoInfo, BVideoType, Config};
use crate::utils::is_android;
use crate::Args;
use reqwest::Url;
use std::cell::{Cell, RefCell};
use std::path::Path;
use tokio::{fs::OpenOptions, io::AsyncWriteExt};

use self::config::{
BVideoInfo,
BVideoType,
Config,
};

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum Platform {
Linux,
LinuxTcp,
Android,
}
pub enum RunMode {
Play,
Record,
}
#[derive(Clone, Copy)]
pub enum StreamType {
FLV,
HLS,
Expand All @@ -38,30 +34,31 @@ pub enum Site {
}

pub struct ConfigManager {
pub plat: u8,
pub plat: Platform,
pub bcookie: String,
pub cookies_from_browser: String,
pub plive: bool,
pub quiet: bool,
pub wait_interval: u64,
pub font_scale: RwLock<f64>,
pub font_alpha: RwLock<f64>,
pub danmaku_speed: RwLock<u64>,
pub display_fps: RwLock<(u64, u64)>,
pub font_scale: Cell<f64>,
pub font_alpha: Cell<f64>,
pub danmaku_speed: Cell<u64>,
pub display_fps: Cell<(u64, u64)>,
pub room_url: String,
pub http_address: Option<String>,
pub run_mode: RunMode,
pub site: Site,
pub stream_type: RwLock<StreamType>,
pub bvideo_info: RwLock<BVideoInfo>,
on_writing: AtomicBool,
pub stream_type: Cell<StreamType>,
pub bvideo_info: RefCell<BVideoInfo>,
pub title: RefCell<String>,
on_writing: Cell<bool>,
}

impl ConfigManager {
pub fn new(config_path: impl AsRef<Path>, args: &Args) -> Self {
let mut plat = if cfg!(target_os = "linux") { 0 } else { 1 };
let mut plat = Platform::Linux;
if args.tcp {
plat = 1;
plat = Platform::LinuxTcp;
}
let mut bvinfo = BVideoInfo {
base_url: "".into(),
Expand Down Expand Up @@ -111,66 +108,72 @@ impl ConfigManager {
};
Self {
room_url: room_url.replace("dmlive://", "https://"),
stream_type: RwLock::new(StreamType::FLV),
stream_type: Cell::new(StreamType::FLV),
run_mode,
site,
font_scale: RwLock::new(c.font_scale.unwrap_or(1.0)),
font_alpha: RwLock::new(c.font_alpha.unwrap_or(0.0)),
danmaku_speed: RwLock::new(c.danmaku_speed.unwrap_or(8000)),
bvideo_info: RwLock::new(bvinfo),
font_scale: Cell::new(c.font_scale.unwrap_or(1.0)),
font_alpha: Cell::new(c.font_alpha.unwrap_or(0.0)),
danmaku_speed: Cell::new(c.danmaku_speed.unwrap_or(8000)),
bvideo_info: RefCell::new(bvinfo),
bcookie: c.bcookie.unwrap_or_else(|| "".into()),
http_address: args.http_address.as_ref().map(|it| it.into()),
plive: args.plive,
quiet: args.quiet,
wait_interval: args.wait_interval.unwrap_or(0),
on_writing: AtomicBool::new(false),
on_writing: Cell::new(false),
plat,
cookies_from_browser: c.cookies_from_browser.unwrap_or_else(|| "".into()),
display_fps: RwLock::new((60, 0)),
display_fps: Cell::new((60, 0)),
title: RefCell::new("".to_string()),
}
}

pub async fn set_stream_type(&self, url: &str) {
pub async fn init(&mut self) -> anyhow::Result<()> {
if is_android().await {
self.plat = Platform::Android;
}
Ok(())
}

pub fn set_stream_type(&self, url: &str) {
if url.contains(".m3u8") {
*self.stream_type.write().await = StreamType::HLS;
self.stream_type.set(StreamType::HLS);
} else if url.contains(".flv") {
*self.stream_type.write().await = StreamType::FLV;
self.stream_type.set(StreamType::FLV);
} else {
*self.stream_type.write().await = StreamType::DASH;
self.stream_type.set(StreamType::DASH);
}
if matches!(self.site, Site::BiliVideo) {
*self.stream_type.write().await = StreamType::DASH;
self.stream_type.set(StreamType::DASH);
}
}

pub async fn write_config(&self) -> anyhow::Result<()> {
if !self.on_writing.load(std::sync::atomic::Ordering::SeqCst) {
self.on_writing.store(true, std::sync::atomic::Ordering::SeqCst);
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
if !self.on_writing.get() {
self.on_writing.set(true);
// tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
let proj_dirs = directories::ProjectDirs::from("com", "THMonster", "dmlive").unwrap();
let d = proj_dirs.config_dir();
let _ = tokio::fs::create_dir_all(&d).await;
let config_path = d.join("config.toml");
if !config_path.exists() {
let _ = tokio::fs::File::create(&config_path).await;
}
{
let mut f = OpenOptions::new().write(true).truncate(true).open(config_path).await?;
f.write_all(
toml::to_string_pretty(&Config {
bcookie: Some(self.bcookie.clone()),
cookies_from_browser: Some(self.cookies_from_browser.clone()),
danmaku_speed: Some(*self.danmaku_speed.read().await),
font_alpha: Some(*self.font_alpha.read().await),
font_scale: Some(*self.font_scale.read().await),
})
.unwrap()
.as_bytes(),
)
.await?;
f.sync_all().await?;
}
self.on_writing.store(false, std::sync::atomic::Ordering::SeqCst);
let mut f = OpenOptions::new().write(true).truncate(true).open(config_path).await?;
f.write_all(
toml::to_string_pretty(&Config {
bcookie: Some(self.bcookie.clone()),
cookies_from_browser: Some(self.cookies_from_browser.clone()),
danmaku_speed: Some(self.danmaku_speed.get()),
font_alpha: Some(self.font_alpha.get()),
font_scale: Some(self.font_scale.get()),
})
.unwrap()
.as_bytes(),
)
.await?;
f.sync_all().await?;
self.on_writing.set(false);
}
Ok(())
}
Expand Down
43 changes: 13 additions & 30 deletions src/danmaku/bilibili.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
};
use std::collections::{HashMap, VecDeque};

use bincode::Options;
use futures::{stream::StreamExt, SinkExt};
use log::info;
use log::warn;
use reqwest::Url;
use serde::Deserialize;
use serde_json::json;
Expand All @@ -17,6 +12,8 @@ use tokio::{
};
use tokio_tungstenite::{connect_async, tungstenite::Message::Binary};

use crate::dmlerr;

const API_BUVID: &'static str = "https://data.bilibili.com/v/";
const API_ROOMINIT: &'static str = "https://api.live.bilibili.com/room/v1/Room/room_init";
const API_DMINFO: &'static str = "https://api.live.bilibili.com/xlive/web-room/v1/index/getDanmuInfo";
Expand Down Expand Up @@ -59,17 +56,13 @@ impl Bilibili {
.await?
.json::<serde_json::Value>()
.await?;
let token = resp
.pointer("/data/token")
.ok_or(anyhow::anyhow!("err gdt a1"))?
.as_str()
.ok_or(anyhow::anyhow!("err gdt a12"))?;
let token = resp.pointer("/data/token").ok_or_else(|| dmlerr!())?.as_str().ok_or_else(|| dmlerr!())?;
Ok(token.to_string())
}

async fn get_ws_info(&self, url: &str) -> Result<(String, Vec<u8>), Box<dyn std::error::Error>> {
async fn get_ws_info(&self, url: &str) -> anyhow::Result<(String, Vec<u8>)> {
let rid =
Url::parse(url)?.path_segments().ok_or("rid parse error 1")?.last().ok_or("rid parse error 2")?.to_string();
Url::parse(url)?.path_segments().ok_or_else(|| dmlerr!())?.last().ok_or_else(|| dmlerr!())?.to_string();
let mut reg_data: Vec<u8> = Vec::new();
let client = reqwest::Client::builder().user_agent(crate::utils::gen_ua()).build()?;
let param1 = vec![("id", rid.as_str())];
Expand All @@ -81,7 +74,7 @@ impl Bilibili {
.await?
.json::<serde_json::Value>()
.await?;
let rid = resp.pointer("/data/room_id").ok_or("gwi pje 1")?.as_u64().ok_or("gwi pje 1-2")?;
let rid = resp.pointer("/data/room_id").ok_or_else(|| dmlerr!())?.as_u64().ok_or_else(|| dmlerr!())?;
let buvid = self.get_buvid(&client).await?;
let token = self.get_dm_token(&client, url, rid.to_string().as_str()).await?;
// let rn = rand::random::<u64>();
Expand All @@ -108,12 +101,7 @@ impl Bilibili {
if header.op == 5 {
let j: serde_json::Value = serde_json::from_slice(data)?;
// warn!("{:?}", &j);
let msg_type = match j
.pointer("/cmd")
.ok_or_else(|| anyhow::anyhow!("cmd parse failed 1"))?
.as_str()
.ok_or_else(|| anyhow::anyhow!("cmd parse failed 12"))?
{
let msg_type = match j.pointer("/cmd").ok_or_else(|| dmlerr!())?.as_str().ok_or_else(|| dmlerr!())? {
"SEND_GIFT" => "gift",
"SUPER_CHAT_MESSAGE" => "superchat",
"WELCOME" => "enter",
Expand Down Expand Up @@ -205,9 +193,7 @@ impl Bilibili {
Ok(ret)
}

pub async fn run(
&self, url: &str, dtx: async_channel::Sender<(String, String, String)>,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn run(&self, url: &str, dtx: async_channel::Sender<(String, String, String)>) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(10);
let (ws, reg_data) = self.get_ws_info(url).await?;
let (ws_stream, _) = connect_async(&ws).await?;
Expand All @@ -228,25 +214,23 @@ impl Bilibili {
};

let (dmq_tx, mut dmq_rx) = mpsc::channel(1000);
let dm_cnt = RefCell::new(0u64);
let dm_cnt = std::cell::Cell::new(0u64);
let decode_task = async {
while let Some(mut it) = rx.recv().await {
let mut dm = self.decode_msg(&mut it, &tx).await?;
// dm_queue.append(&mut dm);
for d in dm.drain(..) {
dmq_tx.send(d).await?;
let mut dm_cnt = dm_cnt.borrow_mut();
*dm_cnt = dm_cnt.saturating_add(1);
dm_cnt.set(dm_cnt.get() + 1);
}
}
anyhow::Ok(())
};
let balance_task = async {
while let Some(d) = dmq_rx.recv().await {
let itvl = {
let mut dm_cnt = dm_cnt.borrow_mut();
let itvl = 2000u64.saturating_div(*dm_cnt);
*dm_cnt = dm_cnt.saturating_sub(1);
let itvl = 2000u64.saturating_div(dm_cnt.get());
dm_cnt.set(dm_cnt.get().saturating_sub(1));
itvl
};
if d.get("msg_type").unwrap_or(&"other".into()).eq("danmaku") {
Expand All @@ -272,7 +256,6 @@ impl Bilibili {
it = decode_task => { it?; },
it = balance_task => { it?; },
}
info!("ws closed!");
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/danmaku/bilivideo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Result;
use bytes::BufMut;
use log::info;
use tokio::io::AsyncWriteExt;
Expand All @@ -12,7 +11,7 @@ impl Bilibili {

pub async fn run(
&self, url: &str, dtx: async_channel::Sender<(String, String, String)>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> anyhow::Result<()> {
let client = reqwest::Client::builder()
.deflate(false)
.user_agent(crate::utils::gen_ua())
Expand Down Expand Up @@ -42,6 +41,7 @@ impl Bilibili {
.await?;
}
}
dtx.close();
Ok(())
}
}
Loading

0 comments on commit 0e118ae

Please sign in to comment.