From aeb5fbf06dc4fa34fc29bd47cdff0ae1d9ffa00f Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Sun, 3 Mar 2024 02:13:00 +0800 Subject: [PATCH] fix: fix remove issues --- crates/recorder/src/downloaders/qbitorrent.rs | 226 ++++++++++-------- 1 file changed, 123 insertions(+), 103 deletions(-) diff --git a/crates/recorder/src/downloaders/qbitorrent.rs b/crates/recorder/src/downloaders/qbitorrent.rs index 8d076f6..77270a7 100644 --- a/crates/recorder/src/downloaders/qbitorrent.rs +++ b/crates/recorder/src/downloaders/qbitorrent.rs @@ -1,4 +1,11 @@ -use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + fmt::Debug, + future::Future, + sync::Arc, + time::Duration, +}; use eyre::OptionExt; use futures::future::try_join_all; @@ -6,7 +13,7 @@ use qbit_rs::{ model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData}, Qbit, }; -use tokio::{sync::RwLock, time::sleep}; +use tokio::time::sleep; use url::Url; use super::{ @@ -15,17 +22,20 @@ use super::{ torrent_downloader::TorrentDownloader, }; use crate::{ - downloaders::defs::TorrentContent, + downloaders::defs::{QbitTorrent, QbitTorrentContent, TorrentContent}, models::{entities::downloaders, prelude::DownloaderCategory}, path::{path_str_equals, VFSPathBuf, VFSSubPath}, }; +pub struct SyncDataCache { + pub torrents: HashMap, +} + pub struct QBittorrentDownloader { pub subscriber_id: i32, pub endpoint_url: Url, - pub client: Qbit, + pub client: Arc, pub save_path: String, - pub rid: Arc>, pub wait_sync_timeout: Duration, } @@ -49,14 +59,13 @@ impl QBittorrentDownloader { .await .map_err(DownloaderError::QBitAPIError)?; - let init_sync_id = client.sync(None).await?.rid; + client.sync(None).await?.rid; Ok(Self { - client, + client: Arc::new(client), endpoint_url, subscriber_id: model.subscriber_id, save_path: model.save_path, - rid: Arc::new(RwLock::new(init_sync_id)), wait_sync_timeout: Duration::from_millis(10000), }) } @@ -66,85 +75,111 @@ impl QBittorrentDownloader { Ok(result) } - pub async fn last_cached_sync_id(&self) -> i64 { - *self.rid.read().await - } - - pub async fn sync_main_data(&self, sync_id: Option) -> eyre::Result { - let result = self.client.sync(sync_id).await?; - { - let mut sync_id = self.rid.write().await; - *sync_id = result.rid; + pub async fn wait_until( + &self, + capture_fn: H, + fetch_data_fn: G, + mut stop_wait_fn: F, + timeout: Option, + ) -> eyre::Result<()> + where + H: FnOnce() -> E, + G: Fn(Arc, E) -> Fut, + Fut: Future>, + F: FnMut(D) -> bool, + E: Clone, + { + let mut next_wait_ms = 32u64; + let mut all_wait_ms = 0u64; + let timeout = timeout.unwrap_or(self.wait_sync_timeout); + let env = capture_fn(); + loop { + sleep(Duration::from_millis(next_wait_ms)).await; + all_wait_ms += next_wait_ms; + if all_wait_ms >= timeout.as_millis() as u64 { + // full update + let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?; + if stop_wait_fn(sync_data) { + break; + } else { + return Err(DownloaderError::TimeoutError { + action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), + timeout, + } + .into()); + } + } + let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?; + if stop_wait_fn(sync_data) { + break; + } + next_wait_ms *= 2; } - Ok(result) + Ok(()) } - async fn wait_until_torrent_contents) -> bool>( + pub async fn wait_some_torrents_until( + &self, + hashes: Vec, + stop_wait_fn: F, + timeout: Option, + ) -> eyre::Result<()> + where + F: FnMut(Vec) -> bool, + { + self.wait_until( + || { + GetTorrentListArg::builder() + .hashes(hashes.join("|")) + .build() + }, + async move |client: Arc, + arg: GetTorrentListArg| + -> eyre::Result> { + let data = client.get_torrent_list(arg).await?; + Ok(data) + }, + stop_wait_fn, + timeout, + ) + .await + } + + pub async fn wait_sync_until bool>( + &self, + stop_wait_fn: F, + timeout: Option, + ) -> eyre::Result<()> { + self.wait_until( + || (), + async move |client: Arc, _| -> eyre::Result { + let data = client.sync(None).await?; + Ok(data) + }, + stop_wait_fn, + timeout, + ) + .await + } + + async fn wait_torrent_contents_until) -> bool>( &self, hash: &str, - mut stop_wait_fn: F, + stop_wait_fn: F, timeout: Option, ) -> eyre::Result<()> { - let mut next_wait_ms = 32u64; - let mut all_wait_ms = 0u64; - let timeout = timeout.unwrap_or(self.wait_sync_timeout); - loop { - sleep(Duration::from_millis(next_wait_ms)).await; - all_wait_ms += next_wait_ms; - if all_wait_ms >= timeout.as_millis() as u64 { - // full update - let sync_data = self.client.get_torrent_contents(hash, None).await?; - if stop_wait_fn(sync_data) { - break; - } else { - return Err(DownloaderError::TimeoutError { - action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), - timeout, - } - .into()); - } - } - let sync_data = self.client.get_torrent_contents(hash, None).await?; - if stop_wait_fn(sync_data) { - break; - } - next_wait_ms *= 2; - } - Ok(()) - } - - pub async fn wait_until bool>( - &self, - start_sync_id: i64, - mut stop_wait_fn: F, - timeout: Option, - ) -> eyre::Result<()> { - let mut next_wait_ms = 32u64; - let mut all_wait_ms = 0u64; - let timeout = timeout.unwrap_or(self.wait_sync_timeout); - loop { - sleep(Duration::from_millis(next_wait_ms)).await; - all_wait_ms += next_wait_ms; - if all_wait_ms >= timeout.as_millis() as u64 { - // full update - let sync_data = self.sync_main_data(None).await?; - if stop_wait_fn(sync_data) { - break; - } else { - return Err(DownloaderError::TimeoutError { - action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), - timeout, - } - .into()); - } - } - let sync_data = self.sync_main_data(Some(start_sync_id)).await?; - if stop_wait_fn(sync_data) { - break; - } - next_wait_ms *= 2; - } - Ok(()) + self.wait_until( + || Arc::new(hash.to_string()), + async move |client: Arc, + hash_arc: Arc| + -> eyre::Result> { + let data = client.get_torrent_contents(hash_arc.as_str(), None).await?; + Ok(data) + }, + stop_wait_fn, + timeout, + ) + .await } } @@ -191,7 +226,6 @@ impl TorrentDownloader for QBittorrentDownloader { auto_torrent_management: Some(false), ..Default::default() }; - let start_last_id = self.last_cached_sync_id().await; let add_result = self.client.add_torrent(arg.clone()).await; if let ( Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)), @@ -204,8 +238,7 @@ impl TorrentDownloader for QBittorrentDownloader { add_result?; } let source_hash = source.hash(); - self.wait_until( - start_last_id, + self.wait_sync_until( |sync_data| { sync_data .torrents @@ -218,7 +251,6 @@ impl TorrentDownloader for QBittorrentDownloader { } async fn delete_torrents(&self, hashes: Vec) -> eyre::Result<()> { - let start_last_id = self.last_cached_sync_id().await; let existed_list = self .client .get_torrent_list( @@ -231,13 +263,9 @@ impl TorrentDownloader for QBittorrentDownloader { self.client .delete_torrents(hashes.clone(), Some(true)) .await?; - self.wait_until( - start_last_id, + self.wait_sync_until( |sync_data| -> bool { - sync_data.torrents_removed.map_or(false, |tr| -> bool { - let tr = tr.into_iter().collect::>(); - hashes.iter().all(|s| tr.contains(s)) - }) && sync_data + sync_data .torrents .map_or(true, |t| hashes.iter().all(|h| !t.contains_key(h))) }, @@ -255,7 +283,7 @@ impl TorrentDownloader for QBittorrentDownloader { new_path: &str, ) -> eyre::Result<()> { self.client.rename_file(hash, old_path, new_path).await?; - self.wait_until_torrent_contents( + self.wait_torrent_contents_until( hash, |contents| -> bool { contents @@ -269,12 +297,10 @@ impl TorrentDownloader for QBittorrentDownloader { } async fn move_torrents(&self, hashes: Vec, new_path: &str) -> eyre::Result<()> { - let start_last_id = self.last_cached_sync_id().await; self.client .set_torrent_location(hashes.clone(), new_path) .await?; - self.wait_until( - start_last_id, + self.wait_sync_until( |sync_data| -> bool { hashes.iter().all(|hash| { sync_data.torrents.as_ref().map_or(false, |t| { @@ -310,7 +336,6 @@ impl TorrentDownloader for QBittorrentDownloader { } async fn set_torrents_category(&self, hashes: Vec, category: &str) -> eyre::Result<()> { - let start_last_id = self.last_cached_sync_id().await; let result = self .client .set_torrent_category(hashes.clone(), category) @@ -323,8 +348,7 @@ impl TorrentDownloader for QBittorrentDownloader { } else { result?; } - self.wait_until( - start_last_id, + self.wait_sync_until( |sync_data| { sync_data.torrents.map_or(false, |ts| { hashes.iter().all(|h| { @@ -344,13 +368,11 @@ impl TorrentDownloader for QBittorrentDownloader { if tags.is_empty() { return Err(eyre::eyre!("add torrent tags can not be empty")); } - let start_last_id = self.last_cached_sync_id().await; self.client .add_torrent_tags(hashes.clone(), tags.clone()) .await?; let tag_sets = tags.iter().map(|s| s.as_str()).collect::>(); - self.wait_until( - start_last_id, + self.wait_sync_until( |sync_data| { sync_data.torrents.map_or(false, |ts| { hashes.iter().all(|h| { @@ -373,15 +395,13 @@ impl TorrentDownloader for QBittorrentDownloader { } async fn add_category(&self, category: &str) -> eyre::Result<()> { - let start_sync_id = self.last_cached_sync_id().await; self.client .add_category( NonEmptyStr::new(category).ok_or_eyre("category can not be empty")?, self.save_path.as_str(), ) .await?; - self.wait_until( - start_sync_id, + self.wait_sync_until( |sync_data| { sync_data .categories