fix: fix remove issues

This commit is contained in:
master 2024-03-03 02:13:00 +08:00
parent c3ad677e8c
commit aeb5fbf06d

View File

@ -1,4 +1,11 @@
use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::Debug,
future::Future,
sync::Arc,
time::Duration,
};
use eyre::OptionExt;
use futures::future::try_join_all;
@ -6,7 +13,7 @@ use qbit_rs::{
model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData},
Qbit,
};
use tokio::{sync::RwLock, time::sleep};
use tokio::time::sleep;
use url::Url;
use super::{
@ -15,17 +22,20 @@ use super::{
torrent_downloader::TorrentDownloader,
};
use crate::{
downloaders::defs::TorrentContent,
downloaders::defs::{QbitTorrent, QbitTorrentContent, TorrentContent},
models::{entities::downloaders, prelude::DownloaderCategory},
path::{path_str_equals, VFSPathBuf, VFSSubPath},
};
pub struct SyncDataCache {
pub torrents: HashMap<String, QbitTorrent>,
}
pub struct QBittorrentDownloader {
pub subscriber_id: i32,
pub endpoint_url: Url,
pub client: Qbit,
pub client: Arc<Qbit>,
pub save_path: String,
pub rid: Arc<RwLock<i64>>,
pub wait_sync_timeout: Duration,
}
@ -49,14 +59,13 @@ impl QBittorrentDownloader {
.await
.map_err(DownloaderError::QBitAPIError)?;
let init_sync_id = client.sync(None).await?.rid;
client.sync(None).await?.rid;
Ok(Self {
client,
client: Arc::new(client),
endpoint_url,
subscriber_id: model.subscriber_id,
save_path: model.save_path,
rid: Arc::new(RwLock::new(init_sync_id)),
wait_sync_timeout: Duration::from_millis(10000),
})
}
@ -66,85 +75,111 @@ impl QBittorrentDownloader {
Ok(result)
}
pub async fn last_cached_sync_id(&self) -> i64 {
*self.rid.read().await
}
pub async fn sync_main_data(&self, sync_id: Option<i64>) -> eyre::Result<SyncData> {
let result = self.client.sync(sync_id).await?;
{
let mut sync_id = self.rid.write().await;
*sync_id = result.rid;
pub async fn wait_until<G, Fut, F, D, H, E>(
&self,
capture_fn: H,
fetch_data_fn: G,
mut stop_wait_fn: F,
timeout: Option<Duration>,
) -> eyre::Result<()>
where
H: FnOnce() -> E,
G: Fn(Arc<Qbit>, E) -> Fut,
Fut: Future<Output = eyre::Result<D>>,
F: FnMut(D) -> bool,
E: Clone,
{
let mut next_wait_ms = 32u64;
let mut all_wait_ms = 0u64;
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
let env = capture_fn();
loop {
sleep(Duration::from_millis(next_wait_ms)).await;
all_wait_ms += next_wait_ms;
if all_wait_ms >= timeout.as_millis() as u64 {
// full update
let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?;
if stop_wait_fn(sync_data) {
break;
} else {
return Err(DownloaderError::TimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
}
.into());
}
}
let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?;
if stop_wait_fn(sync_data) {
break;
}
next_wait_ms *= 2;
}
Ok(result)
Ok(())
}
async fn wait_until_torrent_contents<F: FnMut(Vec<qbit_rs::model::TorrentContent>) -> bool>(
pub async fn wait_some_torrents_until<F>(
&self,
hashes: Vec<String>,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> eyre::Result<()>
where
F: FnMut(Vec<QbitTorrent>) -> bool,
{
self.wait_until(
|| {
GetTorrentListArg::builder()
.hashes(hashes.join("|"))
.build()
},
async move |client: Arc<Qbit>,
arg: GetTorrentListArg|
-> eyre::Result<Vec<QbitTorrent>> {
let data = client.get_torrent_list(arg).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
pub async fn wait_sync_until<F: FnMut(SyncData) -> bool>(
&self,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> eyre::Result<()> {
self.wait_until(
|| (),
async move |client: Arc<Qbit>, _| -> eyre::Result<SyncData> {
let data = client.sync(None).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
async fn wait_torrent_contents_until<F: FnMut(Vec<qbit_rs::model::TorrentContent>) -> bool>(
&self,
hash: &str,
mut stop_wait_fn: F,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> eyre::Result<()> {
let mut next_wait_ms = 32u64;
let mut all_wait_ms = 0u64;
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
loop {
sleep(Duration::from_millis(next_wait_ms)).await;
all_wait_ms += next_wait_ms;
if all_wait_ms >= timeout.as_millis() as u64 {
// full update
let sync_data = self.client.get_torrent_contents(hash, None).await?;
if stop_wait_fn(sync_data) {
break;
} else {
return Err(DownloaderError::TimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
}
.into());
}
}
let sync_data = self.client.get_torrent_contents(hash, None).await?;
if stop_wait_fn(sync_data) {
break;
}
next_wait_ms *= 2;
}
Ok(())
}
pub async fn wait_until<F: FnMut(SyncData) -> bool>(
&self,
start_sync_id: i64,
mut stop_wait_fn: F,
timeout: Option<Duration>,
) -> eyre::Result<()> {
let mut next_wait_ms = 32u64;
let mut all_wait_ms = 0u64;
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
loop {
sleep(Duration::from_millis(next_wait_ms)).await;
all_wait_ms += next_wait_ms;
if all_wait_ms >= timeout.as_millis() as u64 {
// full update
let sync_data = self.sync_main_data(None).await?;
if stop_wait_fn(sync_data) {
break;
} else {
return Err(DownloaderError::TimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
}
.into());
}
}
let sync_data = self.sync_main_data(Some(start_sync_id)).await?;
if stop_wait_fn(sync_data) {
break;
}
next_wait_ms *= 2;
}
Ok(())
self.wait_until(
|| Arc::new(hash.to_string()),
async move |client: Arc<Qbit>,
hash_arc: Arc<String>|
-> eyre::Result<Vec<QbitTorrentContent>> {
let data = client.get_torrent_contents(hash_arc.as_str(), None).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
}
@ -191,7 +226,6 @@ impl TorrentDownloader for QBittorrentDownloader {
auto_torrent_management: Some(false),
..Default::default()
};
let start_last_id = self.last_cached_sync_id().await;
let add_result = self.client.add_torrent(arg.clone()).await;
if let (
Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)),
@ -204,8 +238,7 @@ impl TorrentDownloader for QBittorrentDownloader {
add_result?;
}
let source_hash = source.hash();
self.wait_until(
start_last_id,
self.wait_sync_until(
|sync_data| {
sync_data
.torrents
@ -218,7 +251,6 @@ impl TorrentDownloader for QBittorrentDownloader {
}
async fn delete_torrents(&self, hashes: Vec<String>) -> eyre::Result<()> {
let start_last_id = self.last_cached_sync_id().await;
let existed_list = self
.client
.get_torrent_list(
@ -231,13 +263,9 @@ impl TorrentDownloader for QBittorrentDownloader {
self.client
.delete_torrents(hashes.clone(), Some(true))
.await?;
self.wait_until(
start_last_id,
self.wait_sync_until(
|sync_data| -> bool {
sync_data.torrents_removed.map_or(false, |tr| -> bool {
let tr = tr.into_iter().collect::<HashSet<_>>();
hashes.iter().all(|s| tr.contains(s))
}) && sync_data
sync_data
.torrents
.map_or(true, |t| hashes.iter().all(|h| !t.contains_key(h)))
},
@ -255,7 +283,7 @@ impl TorrentDownloader for QBittorrentDownloader {
new_path: &str,
) -> eyre::Result<()> {
self.client.rename_file(hash, old_path, new_path).await?;
self.wait_until_torrent_contents(
self.wait_torrent_contents_until(
hash,
|contents| -> bool {
contents
@ -269,12 +297,10 @@ impl TorrentDownloader for QBittorrentDownloader {
}
async fn move_torrents(&self, hashes: Vec<String>, new_path: &str) -> eyre::Result<()> {
let start_last_id = self.last_cached_sync_id().await;
self.client
.set_torrent_location(hashes.clone(), new_path)
.await?;
self.wait_until(
start_last_id,
self.wait_sync_until(
|sync_data| -> bool {
hashes.iter().all(|hash| {
sync_data.torrents.as_ref().map_or(false, |t| {
@ -310,7 +336,6 @@ impl TorrentDownloader for QBittorrentDownloader {
}
async fn set_torrents_category(&self, hashes: Vec<String>, category: &str) -> eyre::Result<()> {
let start_last_id = self.last_cached_sync_id().await;
let result = self
.client
.set_torrent_category(hashes.clone(), category)
@ -323,8 +348,7 @@ impl TorrentDownloader for QBittorrentDownloader {
} else {
result?;
}
self.wait_until(
start_last_id,
self.wait_sync_until(
|sync_data| {
sync_data.torrents.map_or(false, |ts| {
hashes.iter().all(|h| {
@ -344,13 +368,11 @@ impl TorrentDownloader for QBittorrentDownloader {
if tags.is_empty() {
return Err(eyre::eyre!("add torrent tags can not be empty"));
}
let start_last_id = self.last_cached_sync_id().await;
self.client
.add_torrent_tags(hashes.clone(), tags.clone())
.await?;
let tag_sets = tags.iter().map(|s| s.as_str()).collect::<HashSet<&str>>();
self.wait_until(
start_last_id,
self.wait_sync_until(
|sync_data| {
sync_data.torrents.map_or(false, |ts| {
hashes.iter().all(|h| {
@ -373,15 +395,13 @@ impl TorrentDownloader for QBittorrentDownloader {
}
async fn add_category(&self, category: &str) -> eyre::Result<()> {
let start_sync_id = self.last_cached_sync_id().await;
self.client
.add_category(
NonEmptyStr::new(category).ok_or_eyre("category can not be empty")?,
self.save_path.as_str(),
)
.await?;
self.wait_until(
start_sync_id,
self.wait_sync_until(
|sync_data| {
sync_data
.categories