From c3ad677e8c8832765288536f6e5f97ce64fe1163 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Sat, 2 Mar 2024 12:42:13 +0800 Subject: [PATCH] fix: make qbittorrent async to be awaitable async not queued async, n^2 retry --- Cargo.lock | 141 +++++- crates/recorder/Cargo.toml | 13 +- crates/recorder/src/downloaders/aria.rs | 67 --- crates/recorder/src/downloaders/bytes.rs | 4 +- crates/recorder/src/downloaders/defs.rs | 186 +++++++- crates/recorder/src/downloaders/error.rs | 19 +- crates/recorder/src/downloaders/html.rs | 4 +- crates/recorder/src/downloaders/mod.rs | 1 - crates/recorder/src/downloaders/qbitorrent.rs | 410 ++++++++++++++++-- .../src/downloaders/torrent_downloader.rs | 42 +- crates/recorder/src/lib.rs | 3 +- .../src/parsers/mikan/mikan_rss_parser.rs | 2 - crates/recorder/src/path/mod.rs | 5 +- crates/recorder/src/path/vfs_path.rs | 15 +- 14 files changed, 743 insertions(+), 169 deletions(-) delete mode 100644 crates/recorder/src/downloaders/aria.rs diff --git a/Cargo.lock b/Cargo.lock index 9a9605c..cd3869f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -846,6 +846,24 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "commoncrypto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d056a8586ba25a1e4d61cb090900e495952c7886786fc55f909ab2f819b69007" +dependencies = [ + "commoncrypto-sys", +] + +[[package]] +name = "commoncrypto-sys" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fed34f46747aa73dfaa578069fd8279d2818ade2b55f38f22a9401c7f4083e2" +dependencies = [ + "libc", +] + [[package]] name = "console" version = "0.15.8" @@ -1060,6 +1078,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-hash" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a77162240fd97248d19a564a565eb563a3f592b386e4136fb300909e67dddca" +dependencies = [ + "commoncrypto", + "hex 0.3.2", + "openssl", + "winapi", +] + [[package]] name = "cssparser" version = "0.33.0" @@ -1286,6 +1316,15 @@ dependencies = [ "chrono", ] +[[package]] +name = "directories" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -1296,6 +1335,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -1798,6 +1849,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" +[[package]] +name = "hex" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" + [[package]] name = "hex" version = "0.4.3" @@ -2314,6 +2371,66 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "librqbit-bencode" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25214563468dda753fbae62f5ed3b1def14219f2faa416f76f7a449efb8a8092" +dependencies = [ + "anyhow", + "librqbit-buffers", + "librqbit-clone-to-owned", + "librqbit-sha1-wrapper", + "serde", +] + +[[package]] +name = "librqbit-buffers" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "378dc12b4994dec9edf0558e39144972167569458f837dc0c67ddeb044ff9a00" +dependencies = [ + "librqbit-clone-to-owned", + "serde", +] + +[[package]] +name = "librqbit-clone-to-owned" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33f149bc6cef41a9f24ad43ece20c87e0617fc88affa01d95850eb68210daac" + +[[package]] +name = "librqbit-core" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d055eac3cd062d4b7728feccb45495682b834305bf234f275440b32814276a65" +dependencies = [ + "anyhow", + "directories", + "hex 0.4.3", + "itertools 0.12.1", + "librqbit-bencode", + "librqbit-buffers", + "librqbit-clone-to-owned", + "parking_lot", + "serde", + "tokio", + "tokio-util", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "librqbit-sha1-wrapper" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45016d84e0f1751ad9b645378117adf648dd6649e1b371a398862cd6a10356fe" +dependencies = [ + "crypto-hash", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -2768,6 +2885,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "ordered-float" version = "2.10.1" @@ -3275,8 +3398,7 @@ dependencies = [ [[package]] name = "qbit-rs" version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167a5e28adf918639d3b180cfe5c919ed38200d5517c88f9b132a2e54a995468" +source = "git+https://github.com/George-Miao/qbit.git?rev=ad5af6a#ad5af6a55b93b2c91b17d12d1b2ce54537df2355" dependencies = [ "mod_use", "reqwest", @@ -3400,6 +3522,7 @@ dependencies = [ "insta", "itertools 0.12.1", "lazy_static", + "librqbit-core", "lightningcss", "loco-rs", "maplit", @@ -3536,7 +3659,7 @@ dependencies = [ "chrono", "form_urlencoded", "getrandom", - "hex", + "hex 0.4.3", "hmac", "home", "http 0.2.11", @@ -3934,7 +4057,7 @@ dependencies = [ "cron_clock", "gethostname", "heck", - "hex", + "hex 0.4.3", "num_cpus", "rand", "redis", @@ -4306,7 +4429,7 @@ checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe" dependencies = [ "base64 0.13.1", "chrono", - "hex", + "hex 0.4.3", "indexmap 1.9.3", "serde", "serde_json", @@ -4650,7 +4773,7 @@ dependencies = [ "futures-io", "futures-util", "hashlink", - "hex", + "hex 0.4.3", "indexmap 2.2.3", "log", "memchr", @@ -4698,7 +4821,7 @@ dependencies = [ "dotenvy", "either", "heck", - "hex", + "hex 0.4.3", "once_cell", "proc-macro2", "quote", @@ -4737,7 +4860,7 @@ dependencies = [ "futures-io", "futures-util", "generic-array", - "hex", + "hex 0.4.3", "hkdf", "hmac", "itoa", @@ -4781,7 +4904,7 @@ dependencies = [ "futures-core", "futures-io", "futures-util", - "hex", + "hex 0.4.3", "hkdf", "hmac", "home", diff --git a/crates/recorder/Cargo.toml b/crates/recorder/Cargo.toml index f64c925..7bb9a3d 100644 --- a/crates/recorder/Cargo.toml +++ b/crates/recorder/Cargo.toml @@ -17,10 +17,10 @@ tracing = "0.1.40" chrono = "0.4" validator = { version = "0.16" } sea-orm = { version = "1.0.0-rc.1", features = [ - "sqlx-sqlite", - "sqlx-postgres", - "runtime-tokio-rustls", - "macros", + "sqlx-sqlite", + "sqlx-postgres", + "runtime-tokio-rustls", + "macros", ] } axum = "0.7.1" @@ -28,7 +28,7 @@ include_dir = "0.7" uuid = { version = "1.6.0", features = ["v4"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] } sea-orm-migration = { version = "1.0.0-rc.1", features = [ - "runtime-tokio-rustls", + "runtime-tokio-rustls", ] } reqwest = "0.11.24" thiserror = "1.0.57" @@ -36,7 +36,7 @@ rss = "2.0.7" bytes = "1.5.0" futures = "0.3.30" itertools = "0.12.1" -qbit-rs = "0.4.1" +qbit-rs = { git = "https://github.com/George-Miao/qbit.git", rev = "ad5af6a", features = ["default", "builder"] } url = "2.5.0" fancy-regex = "0.13.0" regex = "1.10.3" @@ -47,6 +47,7 @@ tl = { version = "0.7.8", features = ["simd"] } lightningcss = "1.0.0-alpha.54" html-escape = "0.2.13" opendal = "0.45.0" +librqbit-core = "3.5.0" [lib] name = "recorder" diff --git a/crates/recorder/src/downloaders/aria.rs b/crates/recorder/src/downloaders/aria.rs deleted file mode 100644 index 1f3e236..0000000 --- a/crates/recorder/src/downloaders/aria.rs +++ /dev/null @@ -1,67 +0,0 @@ -#![allow(unused_variables)] -use super::{ - defs::{Torrent, TorrentFilter, TorrentSources}, - torrent_downloader::TorrentDownloader, -}; -use crate::path::{VFSPathBuf, VFSSubPath}; - -#[derive(Debug)] -pub struct AriaDownloader {} - -#[async_trait::async_trait] -impl TorrentDownloader for AriaDownloader { - async fn get_torrents_info( - &self, - status_filter: TorrentFilter, - category: String, - tag: Option, - ) -> eyre::Result> { - unimplemented!() - } - - async fn add_torrents( - &self, - source: TorrentSources, - save_path: String, - category: Option, - ) -> eyre::Result<()> { - unimplemented!() - } - - async fn delete_torrents(&self, hashes: Vec) -> eyre::Result<()> { - unimplemented!() - } - - async fn rename_torrent_file( - &self, - hash: &str, - old_path: &str, - new_path: &str, - ) -> eyre::Result<()> { - unimplemented!() - } - - async fn move_torrents(&self, hashes: Vec, new_path: &str) -> eyre::Result<()> { - unimplemented!() - } - - async fn get_torrent_path(&self, hashes: String) -> eyre::Result> { - unimplemented!() - } - - async fn check_connection(&self) -> eyre::Result<()> { - unimplemented!() - } - - async fn set_torrents_category(&self, hashes: Vec, category: &str) -> eyre::Result<()> { - unimplemented!() - } - - async fn add_torrent_tags(&self, hashes: Vec, tags: Vec) -> eyre::Result<()> { - unimplemented!() - } - - fn get_save_path(&self, sub_path: &VFSSubPath) -> VFSPathBuf { - unimplemented!() - } -} diff --git a/crates/recorder/src/downloaders/bytes.rs b/crates/recorder/src/downloaders/bytes.rs index 0bddf9f..56fbfbf 100644 --- a/crates/recorder/src/downloaders/bytes.rs +++ b/crates/recorder/src/downloaders/bytes.rs @@ -1,11 +1,11 @@ use bytes::Bytes; use reqwest::IntoUrl; -use super::defs::DEFAULT_USER_AEGNT; +use super::defs::DEFAULT_USER_AGENT; pub async fn download_bytes(url: T) -> eyre::Result { let request_client = reqwest::Client::builder() - .user_agent(DEFAULT_USER_AEGNT) + .user_agent(DEFAULT_USER_AGENT) .build()?; let bytes = request_client.get(url).send().await?.bytes().await?; Ok(bytes) diff --git a/crates/recorder/src/downloaders/defs.rs b/crates/recorder/src/downloaders/defs.rs index 4e3ee81..bb0cd8f 100644 --- a/crates/recorder/src/downloaders/defs.rs +++ b/crates/recorder/src/downloaders/defs.rs @@ -1,12 +1,22 @@ +use itertools::Itertools; +use lazy_static::lazy_static; +use librqbit_core::{ + magnet::Magnet, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned}, +}; pub use qbit_rs::model::{ Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource, }; +use regex::Regex; use serde::{Deserialize, Serialize}; use url::Url; +use crate::downloaders::{bytes::download_bytes, error::DownloaderError}; + pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent"; -pub const DEFAULT_USER_AEGNT: &str = "Wget/1.13.4 (linux-gnu)"; +pub const MAGNET_SCHEMA: &str = "magnet"; +pub const DEFAULT_USER_AGENT: &str = "Wget/1.13.4 (linux-gnu)"; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -42,35 +52,144 @@ impl From for QbitTorrentFilter { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum TorrentSources { - Urls { urls: Vec }, - TorrentFiles { torrents: Vec }, +lazy_static! { + static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap(); + static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap(); } -impl From for QbitTorrentSource { - fn from(value: TorrentSources) -> Self { - match value { - TorrentSources::Urls { urls } => QbitTorrentSource::Urls { - urls: qbit_rs::model::Sep::from(urls), - }, - TorrentSources::TorrentFiles { torrents } => { - QbitTorrentSource::TorrentFiles { torrents } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TorrentSource { + MagnetUrl { + url: Url, + hash: String, + }, + TorrentUrl { + url: Url, + hash: String, + }, + TorrentFile { + torrent: Vec, + hash: String, + name: Option, + }, +} + +impl TorrentSource { + pub async fn parse(url: &str) -> eyre::Result { + let url = Url::parse(url)?; + let source = if url.scheme() == MAGNET_SCHEMA { + TorrentSource::from_magnet_url(url)? + } else if let Some(basename) = url + .clone() + .path_segments() + .and_then(|segments| segments.last()) + { + if let (Some(match_hash), true) = ( + TORRENT_HASH_RE.find(basename), + TORRENT_EXT_RE.is_match(basename), + ) { + TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())? + } else { + let contents = download_bytes(url).await?; + TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))? } + } else { + let contents = download_bytes(url).await?; + TorrentSource::from_torrent_file(contents.to_vec(), None)? + }; + Ok(source) + } + + pub fn from_torrent_file(file: Vec, name: Option) -> eyre::Result { + let torrent: TorrentMetaV1Owned = + torrent_from_bytes(&file).map_err(|_| DownloaderError::InvalidTorrentFileFormat)?; + let hash = torrent.info_hash.as_string(); + Ok(TorrentSource::TorrentFile { + torrent: file, + hash, + name, + }) + } + + pub fn from_magnet_url(url: Url) -> eyre::Result { + if url.scheme() != MAGNET_SCHEMA { + Err(DownloaderError::InvalidUrlSchema { + found: url.scheme().to_string(), + expected: MAGNET_SCHEMA.to_string(), + } + .into()) + } else { + let magnet = + Magnet::parse(url.as_str()).map_err(|_| DownloaderError::InvalidMagnetFormat { + url: url.as_str().to_string(), + })?; + let hash = magnet.info_hash.as_string(); + Ok(TorrentSource::MagnetUrl { url, hash }) + } + } + + pub fn from_torrent_url(url: Url, hash: String) -> eyre::Result { + Ok(TorrentSource::TorrentUrl { url, hash }) + } + + pub fn hash(&self) -> &str { + match self { + TorrentSource::MagnetUrl { hash, .. } => hash, + TorrentSource::TorrentUrl { hash, .. } => hash, + TorrentSource::TorrentFile { hash, .. } => hash, + } + } +} + +impl From for QbitTorrentSource { + fn from(value: TorrentSource) -> Self { + match value { + TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls { + urls: qbit_rs::model::Sep::from([url]), + }, + TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls { + urls: qbit_rs::model::Sep::from([url]), + }, + TorrentSource::TorrentFile { + torrent: torrents, .. + } => QbitTorrentSource::TorrentFiles { torrents }, } } } pub trait TorrentContent { fn get_name(&self) -> &str; + + fn get_all_size(&self) -> u64; + + fn get_progress(&self) -> f64; + + fn get_curr_size(&self) -> u64; } impl TorrentContent for QbitTorrentContent { fn get_name(&self) -> &str { self.name.as_str() } + + fn get_all_size(&self) -> u64 { + self.size + } + + fn get_progress(&self) -> f64 { + self.progress + } + + fn get_curr_size(&self) -> u64 { + u64::clamp( + f64::round(self.get_all_size() as f64 * self.get_progress()) as u64, + 0, + self.get_all_size(), + ) + } } +#[derive(Debug, Clone)] pub enum Torrent { Qbit { torrent: QbitTorrent, @@ -86,4 +205,45 @@ impl Torrent { } } } + + pub fn get_name(&self) -> Option<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.name.as_deref(), + } + } + + pub fn get_hash(&self) -> Option<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.hash.as_deref(), + } + } + + pub fn get_save_path(&self) -> Option<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.save_path.as_deref(), + } + } + + pub fn get_content_path(&self) -> Option<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.content_path.as_deref(), + } + } + + pub fn get_tags(&self) -> Vec<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.tags.as_deref().map_or_else(Vec::new, |s| { + s.split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect_vec() + }), + } + } + + pub fn get_category(&self) -> Option<&str> { + match self { + Torrent::Qbit { torrent, .. } => torrent.category.as_deref(), + } + } } diff --git a/crates/recorder/src/downloaders/error.rs b/crates/recorder/src/downloaders/error.rs index 639727f..b70299d 100644 --- a/crates/recorder/src/downloaders/error.rs +++ b/crates/recorder/src/downloaders/error.rs @@ -1,11 +1,26 @@ +use std::borrow::Cow; use thiserror::Error; +use std::time::Duration; #[derive(Error, Debug)] pub enum DownloaderError { #[error("Invalid mime (expected {expected:?}, got {found:?})")] InvalidMime { expected: String, found: String }, - #[error("Invalid url format")] - InvalidUrlFormat(#[from] url::ParseError), + #[error("Invalid url schema (expected {expected:?}, got {found:?})")] + InvalidUrlSchema { expected: String, found: String }, + #[error("Invalid url parse: {0:?}")] + InvalidUrlParse(#[from] url::ParseError), + #[error("Invalid url format: {reason}")] + InvalidUrlFormat { reason: Cow<'static, str> }, #[error("QBit api error: {0:?}")] QBitAPIError(#[from] qbit_rs::Error), + #[error("Timeout error ({action} timeouts out of {timeout:?})")] + TimeoutError { + action: Cow<'static, str>, + timeout: Duration, + }, + #[error("Invalid torrent file format")] + InvalidTorrentFileFormat, + #[error("Invalid magnet file format (url = {url})")] + InvalidMagnetFormat { url: String }, } diff --git a/crates/recorder/src/downloaders/html.rs b/crates/recorder/src/downloaders/html.rs index 3424f5a..d1eed9e 100644 --- a/crates/recorder/src/downloaders/html.rs +++ b/crates/recorder/src/downloaders/html.rs @@ -1,10 +1,10 @@ use reqwest::IntoUrl; -use super::defs::DEFAULT_USER_AEGNT; +use super::defs::DEFAULT_USER_AGENT; pub async fn download_html(url: U) -> eyre::Result { let request_client = reqwest::Client::builder() - .user_agent(DEFAULT_USER_AEGNT) + .user_agent(DEFAULT_USER_AGENT) .build()?; let content = request_client.get(url).send().await?.text().await?; Ok(content) diff --git a/crates/recorder/src/downloaders/mod.rs b/crates/recorder/src/downloaders/mod.rs index db7c408..26dd556 100644 --- a/crates/recorder/src/downloaders/mod.rs +++ b/crates/recorder/src/downloaders/mod.rs @@ -1,4 +1,3 @@ -pub mod aria; pub mod bytes; pub mod defs; pub mod error; diff --git a/crates/recorder/src/downloaders/qbitorrent.rs b/crates/recorder/src/downloaders/qbitorrent.rs index 45b86fc..8d076f6 100644 --- a/crates/recorder/src/downloaders/qbitorrent.rs +++ b/crates/recorder/src/downloaders/qbitorrent.rs @@ -1,21 +1,23 @@ -use std::fmt::Debug; +use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; use eyre::OptionExt; use futures::future::try_join_all; use qbit_rs::{ - model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr}, + model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData}, Qbit, }; +use tokio::{sync::RwLock, time::sleep}; use url::Url; use super::{ - defs::{Torrent, TorrentFilter, TorrentSources}, + defs::{Torrent, TorrentFilter, TorrentSource}, error::DownloaderError, torrent_downloader::TorrentDownloader, }; use crate::{ + downloaders::defs::TorrentContent, models::{entities::downloaders, prelude::DownloaderCategory}, - path::{VFSPathBuf, VFSSubPath}, + path::{path_str_equals, VFSPathBuf, VFSSubPath}, }; pub struct QBittorrentDownloader { @@ -23,6 +25,8 @@ pub struct QBittorrentDownloader { pub endpoint_url: Url, pub client: Qbit, pub save_path: String, + pub rid: Arc>, + pub wait_sync_timeout: Duration, } impl QBittorrentDownloader { @@ -36,7 +40,7 @@ impl QBittorrentDownloader { let endpoint_url = model .endpoint_url() - .map_err(DownloaderError::InvalidUrlFormat)?; + .map_err(DownloaderError::InvalidUrlParse)?; let credential = Credential::new(model.username, model.password); let client = Qbit::new(endpoint_url.clone(), credential); @@ -45,18 +49,103 @@ impl QBittorrentDownloader { .await .map_err(DownloaderError::QBitAPIError)?; + let init_sync_id = client.sync(None).await?.rid; + Ok(Self { client, endpoint_url, subscriber_id: model.subscriber_id, save_path: model.save_path, + rid: Arc::new(RwLock::new(init_sync_id)), + wait_sync_timeout: Duration::from_millis(10000), }) } - async fn api_version(&self) -> eyre::Result { - let result = self.client.get_version().await?; + pub async fn api_version(&self) -> eyre::Result { + let result = self.client.get_webapi_version().await?; Ok(result) } + + pub async fn last_cached_sync_id(&self) -> i64 { + *self.rid.read().await + } + + pub async fn sync_main_data(&self, sync_id: Option) -> eyre::Result { + let result = self.client.sync(sync_id).await?; + { + let mut sync_id = self.rid.write().await; + *sync_id = result.rid; + } + Ok(result) + } + + async fn wait_until_torrent_contents) -> bool>( + &self, + hash: &str, + mut stop_wait_fn: F, + timeout: Option, + ) -> eyre::Result<()> { + let mut next_wait_ms = 32u64; + let mut all_wait_ms = 0u64; + let timeout = timeout.unwrap_or(self.wait_sync_timeout); + loop { + sleep(Duration::from_millis(next_wait_ms)).await; + all_wait_ms += next_wait_ms; + if all_wait_ms >= timeout.as_millis() as u64 { + // full update + let sync_data = self.client.get_torrent_contents(hash, None).await?; + if stop_wait_fn(sync_data) { + break; + } else { + return Err(DownloaderError::TimeoutError { + action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), + timeout, + } + .into()); + } + } + let sync_data = self.client.get_torrent_contents(hash, None).await?; + if stop_wait_fn(sync_data) { + break; + } + next_wait_ms *= 2; + } + Ok(()) + } + + pub async fn wait_until bool>( + &self, + start_sync_id: i64, + mut stop_wait_fn: F, + timeout: Option, + ) -> eyre::Result<()> { + let mut next_wait_ms = 32u64; + let mut all_wait_ms = 0u64; + let timeout = timeout.unwrap_or(self.wait_sync_timeout); + loop { + sleep(Duration::from_millis(next_wait_ms)).await; + all_wait_ms += next_wait_ms; + if all_wait_ms >= timeout.as_millis() as u64 { + // full update + let sync_data = self.sync_main_data(None).await?; + if stop_wait_fn(sync_data) { + break; + } else { + return Err(DownloaderError::TimeoutError { + action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), + timeout, + } + .into()); + } + } + let sync_data = self.sync_main_data(Some(start_sync_id)).await?; + if stop_wait_fn(sync_data) { + break; + } + next_wait_ms *= 2; + } + Ok(()) + } } #[async_trait::async_trait] @@ -64,12 +153,12 @@ impl TorrentDownloader for QBittorrentDownloader { async fn get_torrents_info( &self, status_filter: TorrentFilter, - category: String, + category: Option, tag: Option, ) -> eyre::Result> { let arg = GetTorrentListArg { filter: Some(status_filter.into()), - category: Some(category), + category, tag, ..Default::default() }; @@ -91,23 +180,71 @@ impl TorrentDownloader for QBittorrentDownloader { async fn add_torrents( &self, - source: TorrentSources, + source: TorrentSource, save_path: String, - category: Option, + category: Option<&str>, ) -> eyre::Result<()> { let arg = AddTorrentArg { - source: source.into(), + source: source.clone().into(), savepath: Some(save_path), - category, + category: category.map(String::from), auto_torrent_management: Some(false), ..Default::default() }; - self.client.add_torrent(arg).await?; + let start_last_id = self.last_cached_sync_id().await; + let add_result = self.client.add_torrent(arg.clone()).await; + if let ( + Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)), + Some(category), + ) = (&add_result, category) + { + self.add_category(category).await?; + self.client.add_torrent(arg).await?; + } else { + add_result?; + } + let source_hash = source.hash(); + self.wait_until( + start_last_id, + |sync_data| { + sync_data + .torrents + .map_or(false, |t| t.contains_key(source_hash)) + }, + None, + ) + .await?; Ok(()) } async fn delete_torrents(&self, hashes: Vec) -> eyre::Result<()> { - self.client.delete_torrents(hashes, None).await?; + let start_last_id = self.last_cached_sync_id().await; + let existed_list = self + .client + .get_torrent_list( + GetTorrentListArg::builder() + .hashes(hashes.clone().join("|")) + .build(), + ) + .await?; + if !existed_list.is_empty() { + self.client + .delete_torrents(hashes.clone(), Some(true)) + .await?; + self.wait_until( + start_last_id, + |sync_data| -> bool { + sync_data.torrents_removed.map_or(false, |tr| -> bool { + let tr = tr.into_iter().collect::>(); + hashes.iter().all(|s| tr.contains(s)) + }) && sync_data + .torrents + .map_or(true, |t| hashes.iter().all(|h| !t.contains_key(h))) + }, + None, + ) + .await?; + } Ok(()) } @@ -118,11 +255,40 @@ impl TorrentDownloader for QBittorrentDownloader { new_path: &str, ) -> eyre::Result<()> { self.client.rename_file(hash, old_path, new_path).await?; + self.wait_until_torrent_contents( + hash, + |contents| -> bool { + contents + .iter() + .any(|c| path_str_equals(c.get_name(), new_path).unwrap_or(false)) + }, + None, + ) + .await?; Ok(()) } async fn move_torrents(&self, hashes: Vec, new_path: &str) -> eyre::Result<()> { - self.client.set_torrent_location(hashes, new_path).await?; + let start_last_id = self.last_cached_sync_id().await; + self.client + .set_torrent_location(hashes.clone(), new_path) + .await?; + self.wait_until( + start_last_id, + |sync_data| -> bool { + hashes.iter().all(|hash| { + sync_data.torrents.as_ref().map_or(false, |t| { + t.get(hash).map_or(false, |t| { + t.save_path + .as_ref() + .map_or(false, |p| path_str_equals(p, new_path).unwrap_or(false)) + }) + }) + }) + }, + None, + ) + .await?; Ok(()) } @@ -144,30 +310,87 @@ impl TorrentDownloader for QBittorrentDownloader { } async fn set_torrents_category(&self, hashes: Vec, category: &str) -> eyre::Result<()> { - if category.is_empty() { - return Err(eyre::anyhow!("Category cannot be empty")); - } + let start_last_id = self.last_cached_sync_id().await; let result = self .client .set_torrent_category(hashes.clone(), category) .await; - if let Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)) = result { + if let Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)) = &result { + self.add_category(category).await?; self.client - .add_category( - NonEmptyStr::new(category) - .unwrap_or_else(|| unreachable!("Category cannot be empty")), - self.save_path.as_str(), - ) + .set_torrent_category(hashes.clone(), category) .await?; - self.client.set_torrent_category(hashes, category).await?; } else { result?; } + self.wait_until( + start_last_id, + |sync_data| { + sync_data.torrents.map_or(false, |ts| { + hashes.iter().all(|h| { + ts.get(h).map_or(false, |t| { + t.category.as_ref().map_or(false, |c| c == category) + }) + }) + }) + }, + None, + ) + .await?; Ok(()) } async fn add_torrent_tags(&self, hashes: Vec, tags: Vec) -> eyre::Result<()> { - self.client.add_torrent_tags(hashes, tags).await?; + if tags.is_empty() { + return Err(eyre::eyre!("add torrent tags can not be empty")); + } + let start_last_id = self.last_cached_sync_id().await; + self.client + .add_torrent_tags(hashes.clone(), tags.clone()) + .await?; + let tag_sets = tags.iter().map(|s| s.as_str()).collect::>(); + self.wait_until( + start_last_id, + |sync_data| { + sync_data.torrents.map_or(false, |ts| { + hashes.iter().all(|h| { + ts.get(h).map_or(false, |t| { + t.tags.as_ref().map_or(false, |t| { + t.split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect::>() + .is_superset(&tag_sets) + }) + }) + }) + }) + }, + None, + ) + .await?; + Ok(()) + } + + async fn add_category(&self, category: &str) -> eyre::Result<()> { + let start_sync_id = self.last_cached_sync_id().await; + self.client + .add_category( + NonEmptyStr::new(category).ok_or_eyre("category can not be empty")?, + self.save_path.as_str(), + ) + .await?; + self.wait_until( + start_sync_id, + |sync_data| { + sync_data + .categories + .map_or(false, |s| s.contains_key(category)) + }, + None, + ) + .await?; + Ok(()) } @@ -187,18 +410,22 @@ impl Debug for QBittorrentDownloader { #[cfg(test)] mod tests { + use itertools::Itertools; + use super::*; fn get_tmp_qbit_test_folder() -> &'static str { if cfg!(windows) { - "~/AppData/Local/Temp/konobangu/qbit" + "C:\\Windows\\Temp\\konobangu\\qbit" } else { "/tmp/konobangu/qbit" } } #[tokio::test] - async fn test_add_torrents() { + async fn test_add_torrents_and_get_info() { + let base_save_path = VFSSubPath::new(get_tmp_qbit_test_folder()); + let downloader = QBittorrentDownloader::from_downloader_model(downloaders::Model { created_at: Default::default(), updated_at: Default::default(), @@ -208,14 +435,131 @@ mod tests { password: "".to_string(), username: "".to_string(), subscriber_id: 0, - save_path: get_tmp_qbit_test_folder().to_string(), + save_path: base_save_path.to_string(), }) .await - .expect("should create downloader success"); + .unwrap(); + + downloader.check_connection().await.unwrap(); downloader - .check_connection() + .delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()]) .await - .expect("should check connection success"); + .unwrap(); + + let torrent_source = TorrentSource::parse( + "https://mikanani.me/Download/20240301/47ee2d69e7f19af783ad896541a07b012676f858.torrent" + ).await.unwrap(); + + let mut save_path = base_save_path.join(format!( + "test_add_torrents_{}", + chrono::Utc::now().timestamp() + )); + + downloader + .add_torrents(torrent_source, save_path.to_string(), Some("bangumi")) + .await + .unwrap(); + + let get_torrent = async || -> eyre::Result { + let torrent_infos = downloader + .get_torrents_info(TorrentFilter::All, None, None) + .await?; + + let result = torrent_infos + .into_iter() + .find(|t| { + t.get_hash() + .map_or(false, |s| s == "47ee2d69e7f19af783ad896541a07b012676f858") + }) + .ok_or_eyre("no torrent")?; + + Ok(result) + }; + + let target_torrent = get_torrent().await.unwrap(); + + let files = target_torrent.iter_files().collect_vec(); + assert!(!files.is_empty()); + + let first_file = files[0]; + assert_eq!( + first_file.get_name(), + r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"# + ); + + let test_tag = format!("test_tag_{}", chrono::Utc::now().timestamp()); + + downloader + .add_torrent_tags( + vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], + vec![test_tag.clone()], + ) + .await + .unwrap(); + + let target_torrent = get_torrent().await.unwrap(); + + assert!(target_torrent.get_tags().iter().any(|s| s == &test_tag)); + + let test_category = format!("test_category_{}", chrono::Utc::now().timestamp()); + + downloader + .set_torrents_category( + vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], + &test_category, + ) + .await + .unwrap(); + + let target_torrent = get_torrent().await.unwrap(); + + assert_eq!(Some(test_category.as_str()), target_torrent.get_category()); + + let moved_save_path = base_save_path.join(format!( + "moved_test_add_torrents_{}", + chrono::Utc::now().timestamp() + )); + + downloader + .move_torrents( + vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], + moved_save_path.as_str(), + ) + .await + .unwrap(); + + let target_torrent = get_torrent().await.unwrap(); + + let content_path = target_torrent.iter_files().next().unwrap().get_name(); + + let new_content_path = &format!("new_{}", content_path); + + downloader + .rename_torrent_file( + "47ee2d69e7f19af783ad896541a07b012676f858", + content_path, + new_content_path, + ) + .await + .unwrap(); + + let target_torrent = get_torrent().await.unwrap(); + + let content_path = target_torrent.iter_files().next().unwrap().get_name(); + + assert_eq!(content_path, new_content_path); + + downloader + .delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()]) + .await + .unwrap(); + + let torrent_infos1 = downloader + .get_torrents_info(TorrentFilter::All, None, None) + .await + .unwrap(); + + assert!(torrent_infos1.is_empty()); } } diff --git a/crates/recorder/src/downloaders/torrent_downloader.rs b/crates/recorder/src/downloaders/torrent_downloader.rs index 20d321f..8e4a7a1 100644 --- a/crates/recorder/src/downloaders/torrent_downloader.rs +++ b/crates/recorder/src/downloaders/torrent_downloader.rs @@ -3,8 +3,7 @@ use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, IntoActiveModel use url::Url; use super::{ - bytes::download_bytes, - defs::{Torrent, TorrentFilter, TorrentSources}, + defs::{Torrent, TorrentFilter, TorrentSource}, qbitorrent::QBittorrentDownloader, }; use crate::{ @@ -17,15 +16,15 @@ pub trait TorrentDownloader { async fn get_torrents_info( &self, status_filter: TorrentFilter, - category: String, + category: Option, tag: Option, ) -> eyre::Result>; async fn add_torrents( &self, - source: TorrentSources, + source: TorrentSource, save_path: String, - category: Option, + category: Option<&str>, ) -> eyre::Result<()>; async fn delete_torrents(&self, hashes: Vec) -> eyre::Result<()>; @@ -47,9 +46,11 @@ pub trait TorrentDownloader { async fn add_torrent_tags(&self, hashes: Vec, tags: Vec) -> eyre::Result<()>; + async fn add_category(&self, category: &str) -> eyre::Result<()>; + fn get_save_path(&self, sub_path: &VFSSubPath) -> VFSPathBuf; - async fn add_downlods_for_bangumi<'a, 'b>( + async fn add_downloads_for_bangumi<'a, 'b>( &self, db: &'a DatabaseConnection, downloads: &[&downloads::Model], @@ -72,10 +73,12 @@ pub trait TorrentDownloader { torrent_urls.push(Url::parse(&m.url as &str)?); } - let source = build_torrent_source_from_urls(torrent_urls.into_iter()).await?; - - self.add_torrents(source, sub_path.to_string(), Some("bangumi".to_string())) - .await?; + // make sequence to prevent too fast to be banned + for d in downloads.iter() { + let source = TorrentSource::parse(&d.url).await?; + self.add_torrents(source, sub_path.clone(), Some("bangumi")) + .await?; + } Ok(bangumi) } @@ -90,22 +93,3 @@ pub async fn build_torrent_downloader_from_downloader_model( } })) } - -pub async fn build_torrent_source_from_url(url: Url) -> eyre::Result { - let source = if url.scheme() == "magnet" { - TorrentSources::Urls { urls: vec![url] } - } else { - let bytes = download_bytes(url).await?; - TorrentSources::TorrentFiles { - torrents: bytes.into(), - } - }; - Ok(source) -} - -pub async fn build_torrent_source_from_urls>( - urls: IU, -) -> eyre::Result { - let urls = urls.collect::>(); - Ok(TorrentSources::Urls { urls }) -} diff --git a/crates/recorder/src/lib.rs b/crates/recorder/src/lib.rs index 6cb9313..997bc8f 100644 --- a/crates/recorder/src/lib.rs +++ b/crates/recorder/src/lib.rs @@ -1,4 +1,5 @@ -#![feature(async_closure)] +#![feature(async_closure, duration_constructors)] + pub mod app; pub mod config; pub mod controllers; diff --git a/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs b/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs index 484589f..5175809 100644 --- a/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs +++ b/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs @@ -61,8 +61,6 @@ pub async fn parse_mikan_rss_items_from_rss_link( #[cfg(test)] mod tests { - use url::Url; - use super::parse_mikan_rss_items_from_rss_link; use crate::downloaders::defs::BITTORRENT_MIME_TYPE; diff --git a/crates/recorder/src/path/mod.rs b/crates/recorder/src/path/mod.rs index 095ceee..e4858ca 100644 --- a/crates/recorder/src/path/mod.rs +++ b/crates/recorder/src/path/mod.rs @@ -1,4 +1,7 @@ pub mod torrent_path; pub mod vfs_path; -pub use vfs_path::{VFSComponent, VFSComponents, VFSPath, VFSPathBuf, VFSSubPath, VFSSubPathBuf}; +pub use vfs_path::{ + path_str_equals, path_str_to_file_url, VFSComponent, VFSComponents, VFSPath, VFSPathBuf, + VFSSubPath, VFSSubPathBuf, +}; diff --git a/crates/recorder/src/path/vfs_path.rs b/crates/recorder/src/path/vfs_path.rs index 79f70a2..5acbebd 100644 --- a/crates/recorder/src/path/vfs_path.rs +++ b/crates/recorder/src/path/vfs_path.rs @@ -2,10 +2,22 @@ use std::path::PathBuf; use lazy_static::lazy_static; pub use uni_path::{Path as VFSSubPath, PathBuf as VFSSubPathBuf}; +use url::Url; use crate::parsers::errors::ParseError; +pub fn path_str_to_file_url(path: &str) -> eyre::Result { + Url::parse(&format!("file:///{path}")).map_err(|e| e.into()) +} + +pub fn path_str_equals(p1: &str, p2: &str) -> eyre::Result { + let p1 = path_str_to_file_url(p1)?; + let p2 = path_str_to_file_url(p2)?; + Ok(p1.as_str() == p2.as_str()) +} + const VFS_EMPTY_STR: &str = ""; + lazy_static! { pub static ref VFS_SUB_ROOT_BUF: VFSSubPathBuf = VFSSubPathBuf::from("/"); pub static ref VFS_SUB_ROOT: &'static VFSSubPath = &VFS_SUB_ROOT_BUF.as_path(); @@ -14,6 +26,7 @@ lazy_static! { pub type VFSComponents<'a> = uni_path::Components<'a>; pub type VFSComponent<'a> = uni_path::Component<'a>; +#[derive(Debug, Clone)] pub struct VFSPath<'a> { pub root: &'a str, pub sub: &'a VFSSubPath, @@ -62,7 +75,7 @@ impl<'a> VFSPath<'a> { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub struct VFSPathBuf { pub root: String, pub sub: VFSSubPathBuf,