From 1ff8a311ae5af6b7969924fa4d1347880203a071 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Wed, 9 Apr 2025 02:26:23 +0800 Subject: [PATCH] feat(downloader): add rqbit impl --- Cargo.lock | 31 ++- Cargo.toml | 2 +- apps/recorder/Cargo.toml | 7 +- apps/recorder/src/auth/oidc.rs | 10 +- apps/recorder/src/errors/ext.rs | 1 - apps/recorder/src/errors/mod.rs | 1 - apps/recorder/src/extract/mikan/client.rs | 3 +- packages/downloader/Cargo.toml | 1 + packages/downloader/src/bittorrent/task.rs | 5 +- packages/downloader/src/core.rs | 13 +- packages/downloader/src/errors.rs | 5 + packages/downloader/src/qbit/downloader.rs | 20 +- packages/downloader/src/qbit/task.rs | 39 ++- packages/downloader/src/rqbit/downloader.rs | 279 +++++++++++++++++++- packages/downloader/src/rqbit/task.rs | 186 ++++++------- 15 files changed, 457 insertions(+), 146 deletions(-) delete mode 100644 apps/recorder/src/errors/ext.rs diff --git a/Cargo.lock b/Cargo.lock index e1d466c..9160185 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,6 +1534,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "dashmap 6.1.0", "fetch", "futures", "itertools 0.14.0", @@ -3976,9 +3977,9 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "opendal" -version = "0.51.2" +version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1063ea459fa9e94584115743b06330f437902dd1d9f692b863ef1875a20548" +checksum = "b5ebd1183902124c6b3ee0a9383683513dd8cca3d25a5d065593f969a44f979e" dependencies = [ "anyhow", "async-trait", @@ -3991,7 +3992,6 @@ dependencies = [ "http", "log", "md-5", - "once_cell", "percent-encoding", "quick-xml 0.36.2", "reqwest", @@ -4912,6 +4912,7 @@ dependencies = [ "serde_yaml", "serial_test", "snafu", + "string-interner", "tera", "testcontainers", "testcontainers-ext", @@ -5095,9 +5096,9 @@ dependencies = [ [[package]] name = "reqwest-middleware" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e8975513bd9a7a43aad01030e79b3498e05db14e9d945df6483e8cf9b8c4c4" +checksum = "57f17d28a6e6acfe1733fe24bcd30774d13bffa4b8a22535b4c8c98423088d4e" dependencies = [ "anyhow", "async-trait", @@ -5132,9 +5133,9 @@ dependencies = [ [[package]] name = "reqwest-tracing" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c88a8d9cfe3319b5adc10f3ffc3db75c7346837a1f857f8269f6361f3b2744" +checksum = "d75b0eee96990cfb4c09545847385e89b2d2d2e571143d55264a05d77c713780" dependencies = [ "anyhow", "async-trait", @@ -6353,6 +6354,16 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7beae5182595e9a8b683fa98c4317f956c9a2dec3b9716990d20023cc60c766" +[[package]] +name = "string-interner" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23de088478b31c349c9ba67816fa55d9355232d63c3afea8bf513e31f0f1d2c0" +dependencies = [ + "hashbrown 0.15.2", + "serde", +] + [[package]] name = "string_cache" version = "0.8.9" @@ -7283,7 +7294,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ "getrandom 0.3.2", + "js-sys", "serde", + "wasm-bindgen", ] [[package]] @@ -7927,9 +7940,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e97b544156e9bebe1a0ffbc03484fc1ffe3100cbce3ffb17eac35f7cdd7ab36" +checksum = "63d3fcd9bba44b03821e7d699eeee959f3126dcc4aa8e4ae18ec617c2a5cea10" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 0598db2..3c20fb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] } serde_json = "1" async-trait = "0.1" tracing = "0.1" -url = "2.5" +url = "2.5.2" anyhow = "1" itertools = "0.14" chrono = "0.4" diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index da03746..a78c0b4 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -61,15 +61,15 @@ sea-orm-migration = { version = "1.1", features = ["runtime-tokio-rustls"] } rss = "2" fancy-regex = "0.14" maplit = "1.0.2" -lightningcss = "1.0.0-alpha.61" +lightningcss = "1.0.0-alpha.65" html-escape = "0.2.13" -opendal = { version = "0.51.0", features = ["default", "services-fs"] } +opendal = { version = "0.53", features = ["default", "services-fs"] } zune-image = "0.4.15" once_cell = "1.20.2" scraper = "0.23" jwt-authorizer = "0.15.0" -log = "0.4.22" +log = "0.4" async-graphql = { version = "7", features = [] } async-graphql-axum = "7" seaography = { version = "1.1" } @@ -100,6 +100,7 @@ serde_yaml = "0.9.34" downloader = { workspace = true } util = { workspace = true } fetch = { workspace = true } +string-interner = "0.19.0" [dev-dependencies] serial_test = "3" diff --git a/apps/recorder/src/auth/oidc.rs b/apps/recorder/src/auth/oidc.rs index 1a57008..37e28d7 100644 --- a/apps/recorder/src/auth/oidc.rs +++ b/apps/recorder/src/auth/oidc.rs @@ -35,7 +35,7 @@ use crate::{app::AppContextTrait, errors::RecorderError, models::auth::AuthType} pub struct OidcHttpClient(pub Arc); -impl<'a> Deref for OidcHttpClient { +impl Deref for OidcHttpClient { type Target = HttpClient; fn deref(&self) -> &Self::Target { @@ -170,8 +170,8 @@ pub struct OidcAuthService { } impl OidcAuthService { - pub async fn build_authorization_request<'a>( - &'a self, + pub async fn build_authorization_request( + &self, redirect_uri: &str, ) -> Result { let oidc_provider_client = OidcHttpClient(self.oidc_provider_client.clone()); @@ -247,8 +247,8 @@ impl OidcAuthService { Ok(result) } - pub async fn extract_authorization_request_callback<'a>( - &'a self, + pub async fn extract_authorization_request_callback( + &self, query: OidcAuthCallbackQuery, ) -> Result { let oidc_http_client = OidcHttpClient(self.oidc_provider_client.clone()); diff --git a/apps/recorder/src/errors/ext.rs b/apps/recorder/src/errors/ext.rs deleted file mode 100644 index 8b13789..0000000 --- a/apps/recorder/src/errors/ext.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/apps/recorder/src/errors/mod.rs b/apps/recorder/src/errors/mod.rs index 887cd6b..af0dd14 100644 --- a/apps/recorder/src/errors/mod.rs +++ b/apps/recorder/src/errors/mod.rs @@ -1,5 +1,4 @@ pub mod app_error; -pub mod ext; pub mod response; pub use app_error::{RecorderError, RecorderResult}; diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index 85af79a..c431e0a 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -1,6 +1,6 @@ use std::{fmt::Debug, ops::Deref}; -use fetch::{FetchError, HttpClient, HttpClientTrait, client::HttpClientCookiesAuth}; +use fetch::{HttpClient, HttpClientTrait, client::HttpClientCookiesAuth}; use serde::{Deserialize, Serialize}; use url::Url; @@ -24,7 +24,6 @@ impl Debug for MikanAuthSecrecy { impl MikanAuthSecrecy { pub fn into_cookie_auth(self, url: &Url) -> Result { HttpClientCookiesAuth::from_cookies(&self.cookie, url, self.user_agent) - .map_err(FetchError::from) .map_err(RecorderError::from) } } diff --git a/packages/downloader/Cargo.toml b/packages/downloader/Cargo.toml index 3b03957..a11d0dc 100644 --- a/packages/downloader/Cargo.toml +++ b/packages/downloader/Cargo.toml @@ -42,6 +42,7 @@ librqbit = { version = "8", features = ["async-bt", "watch"] } util = { workspace = true } testing-torrents = { workspace = true, optional = true } fetch = { workspace = true } +dashmap = "6.1.0" [dev-dependencies] diff --git a/packages/downloader/src/bittorrent/task.rs b/packages/downloader/src/bittorrent/task.rs index 388416a..3a42a01 100644 --- a/packages/downloader/src/bittorrent/task.rs +++ b/packages/downloader/src/bittorrent/task.rs @@ -24,9 +24,10 @@ where Self::State: TorrentStateTrait, Self::Id: TorrentHashTrait, { - fn hash_info(&self) -> &str; + fn hash_info(&self) -> Cow<'_, str>; + fn name(&self) -> Cow<'_, str> { - Cow::Borrowed(self.hash_info()) + self.hash_info() } fn tags(&self) -> impl Iterator>; diff --git a/packages/downloader/src/core.rs b/packages/downloader/src/core.rs index 8c07d8e..e3658cb 100644 --- a/packages/downloader/src/core.rs +++ b/packages/downloader/src/core.rs @@ -7,7 +7,18 @@ use async_trait::async_trait; use super::DownloaderError; -pub trait DownloadStateTrait: Sized + Debug {} +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DownloadSimpleState { + Paused, + Active, + Completed, + Error, + Unknown, +} + +pub trait DownloadStateTrait: Sized + Debug { + fn to_download_state(&self) -> DownloadSimpleState; +} pub trait DownloadIdTrait: Hash + Sized + Clone + Send + Debug {} diff --git a/packages/downloader/src/errors.rs b/packages/downloader/src/errors.rs index 4f31c4a..3ef54ca 100644 --- a/packages/downloader/src/errors.rs +++ b/packages/downloader/src/errors.rs @@ -35,6 +35,11 @@ pub enum DownloaderError { #[snafu(source(from(Box, OptDynErr::some)))] source: OptDynErr, }, + #[snafu(display("{source}"))] + RqbitError { + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, #[snafu(display("{message}"))] Whatever { message: String, diff --git a/packages/downloader/src/qbit/downloader.rs b/packages/downloader/src/qbit/downloader.rs index 576f02d..998cc54 100644 --- a/packages/downloader/src/qbit/downloader.rs +++ b/packages/downloader/src/qbit/downloader.rs @@ -17,7 +17,7 @@ use qbit_rs::{ Torrent as QbitTorrent, TorrentFile, TorrentSource, }, }; -use quirks_path::{Path, PathBuf}; +use quirks_path::PathBuf; use snafu::{OptionExt, whatever}; use tokio::{ sync::{RwLock, watch}, @@ -26,6 +26,7 @@ use tokio::{ use tracing::instrument; use url::Url; +use super::QBittorrentHashSelector; use crate::{ DownloaderError, bittorrent::{ @@ -33,7 +34,7 @@ use crate::{ source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource}, task::TORRENT_TAG_NAME, }, - core::{DownloadIdSelector, DownloaderTrait}, + core::DownloaderTrait, qbit::task::{ QBittorrentCreation, QBittorrentHash, QBittorrentSelector, QBittorrentState, QBittorrentTask, @@ -41,6 +42,7 @@ use crate::{ utils::path_equals_as_file_url, }; +#[derive(Debug)] pub struct QBittorrentDownloaderCreation { pub endpoint: String, pub username: String, @@ -130,6 +132,7 @@ pub struct QBittorrentDownloader { } impl QBittorrentDownloader { + #[instrument(level = "debug")] pub async fn from_creation( creation: QBittorrentDownloaderCreation, ) -> Result, DownloaderError> { @@ -253,10 +256,6 @@ impl QBittorrentDownloader { Ok(()) } - pub fn get_save_path(&self, sub_path: &Path) -> PathBuf { - self.save_path.join(sub_path) - } - #[instrument(level = "debug", skip(self))] pub async fn add_torrent_tags( &self, @@ -324,6 +323,7 @@ impl QBittorrentDownloader { Ok(()) } + #[instrument(level = "debug", skip(self))] pub async fn get_torrent_path( &self, hashes: String, @@ -406,6 +406,7 @@ impl DownloaderTrait for QBittorrentDownloader { type Creation = QBittorrentCreation; type Selector = QBittorrentSelector; + #[instrument(level = "debug", skip(self))] async fn add_downloads( &self, creation: ::Creation, @@ -524,6 +525,7 @@ impl DownloaderTrait for QBittorrentDownloader { ::remove_downloads(self, selector).await } + #[instrument(level = "debug", skip(self))] async fn query_downloads( &self, selector: QBittorrentSelector, @@ -555,13 +557,13 @@ impl DownloaderTrait for QBittorrentDownloader { #[async_trait] impl TorrentDownloaderTrait for QBittorrentDownloader { - type IdSelector = DownloadIdSelector; + type IdSelector = QBittorrentHashSelector; #[instrument(level = "debug", skip(self))] async fn pause_torrents( &self, hashes: ::IdSelector, - ) -> Result<::IdSelector, DownloaderError> { + ) -> Result { self.client.pause_torrents(hashes.clone()).await?; Ok(hashes) } @@ -579,7 +581,7 @@ impl TorrentDownloaderTrait for QBittorrentDownloader { async fn remove_torrents( &self, hashes: ::IdSelector, - ) -> Result<::IdSelector, DownloaderError> { + ) -> Result { self.client .delete_torrents(hashes.clone(), Some(true)) .await?; diff --git a/packages/downloader/src/qbit/task.rs b/packages/downloader/src/qbit/task.rs index 7221c6c..f147371 100644 --- a/packages/downloader/src/qbit/task.rs +++ b/packages/downloader/src/qbit/task.rs @@ -13,8 +13,8 @@ use crate::{ task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait}, }, core::{ - DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait, - DownloadTaskTrait, + DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadSimpleState, + DownloadStateTrait, DownloadTaskTrait, }, }; @@ -35,7 +35,34 @@ impl From> for QBittorrentState { } } -impl DownloadStateTrait for QBittorrentState {} +impl DownloadStateTrait for QBittorrentState { + fn to_download_state(&self) -> DownloadSimpleState { + if let Some(ref state) = self.0 { + match state { + State::ForcedUP + | State::Uploading + | State::PausedUP + | State::QueuedUP + | State::StalledUP + | State::CheckingUP => DownloadSimpleState::Completed, + State::Error | State::MissingFiles => DownloadSimpleState::Error, + State::Unknown => DownloadSimpleState::Unknown, + State::PausedDL => DownloadSimpleState::Paused, + State::Allocating + | State::Moving + | State::MetaDL + | State::ForcedDL + | State::CheckingResumeData + | State::QueuedDL + | State::Downloading + | State::StalledDL + | State::CheckingDL => DownloadSimpleState::Active, + } + } else { + DownloadSimpleState::Unknown + } + } +} impl TorrentStateTrait for QBittorrentState {} @@ -129,8 +156,8 @@ impl DownloadTaskTrait for QBittorrentTask { } impl TorrentTaskTrait for QBittorrentTask { - fn hash_info(&self) -> &str { - &self.hash_info + fn hash_info(&self) -> Cow<'_, str> { + Cow::Borrowed(&self.hash_info) } fn tags(&self) -> impl Iterator> { @@ -177,6 +204,7 @@ impl TorrentCreationTrait for QBittorrentCreation { pub type QBittorrentHashSelector = DownloadIdSelector; +#[derive(Debug)] pub struct QBittorrentComplexSelector { pub query: GetTorrentListArg, } @@ -197,6 +225,7 @@ impl DownloadSelectorTrait for QBittorrentComplexSelector { type Task = QBittorrentTask; } +#[derive(Debug)] pub enum QBittorrentSelector { Hash(QBittorrentHashSelector), Complex(QBittorrentComplexSelector), diff --git a/packages/downloader/src/rqbit/downloader.rs b/packages/downloader/src/rqbit/downloader.rs index b8c2e85..8832a5e 100644 --- a/packages/downloader/src/rqbit/downloader.rs +++ b/packages/downloader/src/rqbit/downloader.rs @@ -1 +1,278 @@ -pub struct RqbitDownloaderCreation {} +use std::{str::FromStr, sync::Arc}; + +use async_trait::async_trait; +use librqbit::{ + AddTorrent, AddTorrentOptions, ManagedTorrent, Session, SessionOptions, api::TorrentIdOrHash, +}; +use librqbit_core::Id20; +use snafu::ResultExt; +use tracing::instrument; +use util::errors::AnyhowResultExt; + +use super::task::{RqbitCreation, RqbitHash, RqbitSelector, RqbitState, RqbitTask}; +use crate::{ + DownloaderError, + bittorrent::{ + downloader::TorrentDownloaderTrait, + source::{HashTorrentSource, HashTorrentSourceTrait}, + }, + core::{DownloadIdSelector, DownloaderTrait}, + errors::RqbitSnafu, +}; + +#[derive(Debug)] +pub struct RqbitDownloaderCreation { + pub save_path: String, + pub subscriber_id: i32, + pub downloader_id: i32, +} + +impl RqbitDownloaderCreation {} + +pub struct RqbitDownloader { + pub save_path: String, + pub subscriber_id: i32, + pub downloader_id: i32, + pub session: Arc, +} + +impl RqbitDownloader { + #[instrument(level = "debug")] + pub async fn from_creation( + creation: RqbitDownloaderCreation, + ) -> Result, DownloaderError> { + let session_opt = SessionOptions { + ..Default::default() + }; + let session = Session::new_with_opts(creation.save_path.clone().into(), session_opt) + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + Ok(Arc::new(Self { + session, + save_path: creation.save_path, + subscriber_id: creation.subscriber_id, + downloader_id: creation.downloader_id, + })) + } + + pub async fn add_torrent( + &self, + source: HashTorrentSource, + opt: Option, + ) -> Result { + let hash = Id20::from_str(&source.hash_info() as &str) + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + let source = match source { + HashTorrentSource::TorrentFile(file) => AddTorrent::TorrentFileBytes(file.payload), + HashTorrentSource::MagnetUrl(magnet) => AddTorrent::Url(magnet.url.into()), + }; + let response = self + .session + .add_torrent(source, opt) + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + let handle = response + .into_handle() + .ok_or_else(|| anyhow::anyhow!("failed to get handle of add torrent task")) + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + handle + .wait_until_initialized() + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + Ok(hash) + } + + fn query_torrent_impl(&self, hash: RqbitHash) -> Result, DownloaderError> { + let torrent = self + .session + .get(TorrentIdOrHash::Hash(hash)) + .ok_or_else(|| anyhow::anyhow!("could not find torrent by hash {}", hash.as_string())) + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + Ok(torrent) + } + + pub fn query_torrent(&self, hash: RqbitHash) -> Result { + let torrent = self.query_torrent_impl(hash)?; + + let task = RqbitTask::from_query(torrent)?; + + Ok(task) + } + + pub async fn pause_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> { + let t = self.query_torrent_impl(hash)?; + self.session + .pause(&t) + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + Ok(()) + } + + pub async fn resume_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> { + let t = self.query_torrent_impl(hash)?; + self.session + .unpause(&t) + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + Ok(()) + } + + pub async fn delete_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> { + self.session + .delete(TorrentIdOrHash::Hash(hash), true) + .await + .to_dyn_boxed() + .context(RqbitSnafu {})?; + + Ok(()) + } +} + +#[async_trait] +impl DownloaderTrait for RqbitDownloader { + type State = RqbitState; + type Id = RqbitHash; + type Task = RqbitTask; + type Creation = RqbitCreation; + type Selector = RqbitSelector; + + #[instrument(level = "debug", skip(self))] + async fn add_downloads( + &self, + creation: RqbitCreation, + ) -> Result::Id>, DownloaderError> { + let mut sources = creation.sources; + if sources.len() == 1 { + let hash = self + .add_torrent( + sources.pop().unwrap(), + Some(AddTorrentOptions { + paused: false, + output_folder: Some(self.save_path.clone()), + ..Default::default() + }), + ) + .await?; + Ok(vec![hash]) + } else { + let tasks = sources + .into_iter() + .map(|s| { + self.add_torrent( + s, + Some(AddTorrentOptions { + paused: false, + output_folder: Some(self.save_path.clone()), + ..Default::default() + }), + ) + }) + .collect::>(); + let results = futures::future::try_join_all(tasks).await?; + Ok(results) + } + } + + async fn pause_downloads( + &self, + selector: ::Selector, + ) -> Result, DownloaderError> { + ::pause_downloads(self, selector).await + } + + async fn resume_downloads( + &self, + selector: ::Selector, + ) -> Result, DownloaderError> { + ::resume_downloads(self, selector).await + } + + async fn remove_downloads( + &self, + selector: ::Selector, + ) -> Result, DownloaderError> { + ::remove_downloads(self, selector).await + } + + #[instrument(level = "debug", skip(self))] + async fn query_downloads( + &self, + selector: RqbitSelector, + ) -> Result::Task>, DownloaderError> { + let hashes = selector.into_iter(); + + let tasks = hashes + .map(|h| self.query_torrent(h)) + .collect::, DownloaderError>>()?; + + Ok(tasks) + } +} + +#[async_trait] +impl TorrentDownloaderTrait for RqbitDownloader { + type IdSelector = DownloadIdSelector; + + #[instrument(level = "debug", skip(self))] + async fn pause_torrents( + &self, + selector: Self::IdSelector, + ) -> Result { + let mut hashes: Vec<_> = selector.clone(); + + if hashes.len() == 1 { + self.pause_torrent(hashes.pop().unwrap()).await?; + } else { + futures::future::try_join_all(hashes.into_iter().map(|h| self.pause_torrent(h))) + .await?; + } + Ok(selector) + } + + #[instrument(level = "debug", skip(self))] + async fn resume_torrents( + &self, + selector: Self::IdSelector, + ) -> Result { + let mut hashes: Vec<_> = selector.clone(); + + if hashes.len() == 1 { + self.resume_torrent(hashes.pop().unwrap()).await?; + } else { + futures::future::try_join_all(hashes.into_iter().map(|h| self.resume_torrent(h))) + .await?; + } + Ok(selector) + } + + #[instrument(level = "debug", skip(self))] + async fn remove_torrents( + &self, + selector: Self::IdSelector, + ) -> Result { + let mut hashes: Vec<_> = selector.clone(); + + if hashes.len() == 1 { + self.delete_torrent(hashes.pop().unwrap()).await?; + } else { + futures::future::try_join_all(hashes.into_iter().map(|h| self.delete_torrent(h))) + .await?; + } + Ok(selector) + } +} diff --git a/packages/downloader/src/rqbit/task.rs b/packages/downloader/src/rqbit/task.rs index f41b2d7..c277fe6 100644 --- a/packages/downloader/src/rqbit/task.rs +++ b/packages/downloader/src/rqbit/task.rs @@ -1,68 +1,84 @@ -use std::{borrow::Cow, time::Duration}; +use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration}; -use itertools::Itertools; -use qbit_rs::model::{ - GetTorrentListArg, State, Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, -}; +use librqbit::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState}; +use librqbit_core::Id20; use quirks_path::{Path, PathBuf}; use crate::{ DownloaderError, bittorrent::{ source::HashTorrentSource, - task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait}, + task::{TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, TorrentTaskTrait}, }, core::{ - DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait, - DownloadTaskTrait, + DownloadCreationTrait, DownloadIdSelector, DownloadIdTrait, DownloadSimpleState, + DownloadStateTrait, DownloadTaskTrait, }, }; -pub type RqbitHash = SimpleTorrentHash; +pub type RqbitHash = Id20; -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] -pub struct RqbitState(Option); +impl DownloadIdTrait for RqbitHash {} -impl DownloadStateTrait for RqbitState {} +impl TorrentHashTrait for RqbitHash {} + +#[derive(Debug, Clone)] +pub struct RqbitState(Arc); + +impl DownloadStateTrait for RqbitState { + fn to_download_state(&self) -> DownloadSimpleState { + match self.0.state { + TorrentStatsState::Error => DownloadSimpleState::Error, + TorrentStatsState::Paused => DownloadSimpleState::Paused, + TorrentStatsState::Live => { + if self.0.finished { + DownloadSimpleState::Completed + } else { + DownloadSimpleState::Active + } + } + TorrentStatsState::Initializing => DownloadSimpleState::Active, + } + } +} impl TorrentStateTrait for RqbitState {} -impl From> for RqbitState { - fn from(value: Option) -> Self { +impl From> for RqbitState { + fn from(value: Arc) -> Self { Self(value) } } -#[derive(Debug)] pub struct RqbitTask { pub hash_info: RqbitHash, - pub torrent: QbitTorrent, - pub contents: Vec, + pub torrent: Arc, pub state: RqbitState, + pub stats: Arc, } impl RqbitTask { - pub fn from_query( - torrent: QbitTorrent, - contents: Vec, - ) -> Result { - let hash = torrent - .hash - .clone() - .ok_or_else(|| DownloaderError::TorrentMetaError { - message: "missing hash".to_string(), - source: None.into(), - })?; - let state = RqbitState::from(torrent.state.clone()); + pub fn from_query(torrent: Arc) -> Result { + let hash = torrent.info_hash(); + let stats = Arc::new(torrent.stats()); Ok(Self { hash_info: hash, - contents, - state, + state: stats.clone().into(), + stats, torrent, }) } } +impl Debug for RqbitTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RqbitTask") + .field("hash_info", &self.hash_info) + .field("state", &self.id()) + .finish() + } +} + impl DownloadTaskTrait for RqbitTask { type State = RqbitState; type Id = RqbitHash; @@ -77,14 +93,26 @@ impl DownloadTaskTrait for RqbitTask { fn name(&self) -> Cow<'_, str> { self.torrent - .name - .as_deref() - .map(Cow::Borrowed) + .metadata + .load_full() + .and_then(|m| m.name.to_owned()) + .map(Cow::Owned) .unwrap_or_else(|| DownloadTaskTrait::name(self)) } fn speed(&self) -> Option { - self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok()) + self.stats + .live + .as_ref() + .map(|s| s.download_speed.mbps) + .and_then(|u| { + let v = u * 1024f64 * 1024f64; + if v.is_finite() && v > 0.0 && v < u64::MAX as f64 { + Some(v as u64) + } else { + None + } + }) } fn state(&self) -> &Self::State { @@ -92,54 +120,41 @@ impl DownloadTaskTrait for RqbitTask { } fn dl_bytes(&self) -> Option { - self.torrent.downloaded.and_then(|v| u64::try_from(v).ok()) + Some(self.stats.progress_bytes) } fn total_bytes(&self) -> Option { - self.torrent.size.and_then(|v| u64::try_from(v).ok()) - } - - fn left_bytes(&self) -> Option { - self.torrent.amount_left.and_then(|v| u64::try_from(v).ok()) + Some(self.stats.total_bytes) } fn et(&self) -> Option { - self.torrent - .time_active - .and_then(|v| u64::try_from(v).ok()) - .map(Duration::from_secs) + self.torrent.with_state(|l| match l { + ManagedTorrentState::Live(l) => Some(Duration::from_millis( + l.stats_snapshot().total_piece_download_ms, + )), + _ => None, + }) } fn eta(&self) -> Option { - self.torrent - .eta - .and_then(|v| u64::try_from(v).ok()) - .map(Duration::from_secs) - } - - fn progress(&self) -> Option { - self.torrent.progress.as_ref().map(|s| *s as f32) + self.torrent.with_state(|l| match l { + ManagedTorrentState::Live(l) => l.down_speed_estimator().time_remaining(), + _ => None, + }) } } impl TorrentTaskTrait for RqbitTask { - fn hash_info(&self) -> &str { - &self.hash_info + fn hash_info(&self) -> Cow<'_, str> { + Cow::Owned(self.hash_info.as_string()) } fn tags(&self) -> impl Iterator> { - self.torrent - .tags - .as_deref() - .unwrap_or("") - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .map(Cow::Borrowed) + std::iter::empty() } fn category(&self) -> Option> { - self.torrent.category.as_deref().map(Cow::Borrowed) + None } } @@ -171,45 +186,4 @@ impl TorrentCreationTrait for RqbitCreation { pub type RqbitHashSelector = DownloadIdSelector; -pub struct RqbitComplexSelector { - pub query: GetTorrentListArg, -} - -impl From for RqbitComplexSelector { - fn from(value: RqbitHashSelector) -> Self { - Self { - query: GetTorrentListArg { - hashes: Some(value.ids.join("|")), - ..Default::default() - }, - } - } -} - -impl DownloadSelectorTrait for RqbitComplexSelector { - type Id = RqbitHash; - type Task = RqbitTask; -} - -pub enum RqbitSelector { - Hash(RqbitHashSelector), - Complex(RqbitComplexSelector), -} - -impl DownloadSelectorTrait for RqbitSelector { - type Id = RqbitHash; - type Task = RqbitTask; - - fn try_into_ids_only(self) -> Result, Self> { - match self { - RqbitSelector::Complex(c) => c.try_into_ids_only().map_err(RqbitSelector::Complex), - RqbitSelector::Hash(h) => { - let result = h - .try_into_ids_only() - .unwrap_or_else(|_| unreachable!("hash selector must contains hash")) - .into_iter(); - Ok(result.collect_vec()) - } - } - } -} +pub type RqbitSelector = RqbitHashSelector;