fix: temp save

This commit is contained in:
2025-04-05 07:02:47 +08:00
parent 27b52f7fd1
commit ecb56013a5
11 changed files with 1804 additions and 102 deletions

View File

@@ -103,7 +103,6 @@ tower-http = { version = "0.6", features = [
"set-header",
"compression-full",
] }
serde_yaml = "0.9.34"
tera = "1.20.0"
openidconnect = { version = "4", features = ["rustls-tls"] }
http-cache-reqwest = { version = "0.15", features = [
@@ -118,7 +117,6 @@ http-cache = { version = "0.20.0", features = [
], default-features = false }
http-cache-semantics = "2.1.0"
dotenv = "0.15.0"
nom = "8.0.0"
http = "1.2.0"
cookie = "0.18.1"
async-stream = "0.3.6"
@@ -127,14 +125,17 @@ tracing-appender = "0.2.3"
clap = "4.5.31"
futures-util = "0.3.31"
ipnetwork = "0.21.1"
ctor = "0.4.0"
librqbit = "8.0.0"
typed-builder = "0.21.0"
snafu = { version = "0.8.5", features = ["futures"] }
anyhow = "1.0.97"
dashmap = "6.1.0"
serde_yaml = "0.9.34"
merge-struct = "0.1.0"
serde-value = "0.7.0"
[dev-dependencies]
serial_test = "3"
insta = { version = "1", features = ["redactions", "yaml", "filters"] }
mockito = "1.6.1"
rstest = "0.25"
ctor = "0.4.0"

View File

@@ -3,13 +3,15 @@ use std::{
collections::{HashMap, HashSet},
fmt::Debug,
io,
sync::Arc,
sync::{Arc, Weak},
time::Duration,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::future::try_join_all;
use itertools::Itertools;
use merge_struct::merge;
pub use qbit_rs::model::{
Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFile as QbitTorrentFile,
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
@@ -17,14 +19,16 @@ pub use qbit_rs::model::{
use qbit_rs::{
Qbit,
model::{
AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, Sep, State, TorrentFile,
TorrentSource,
AddTorrentArg, Category, Credential, GetTorrentListArg, NonEmptyStr, Sep, State, SyncData,
TorrentFile, TorrentSource,
},
};
use quirks_path::{Path, PathBuf};
use seaography::itertools::Itertools;
use snafu::prelude::*;
use tokio::sync::watch;
use tokio::{
sync::{RwLock, watch},
time::sleep,
};
use tracing::instrument;
use url::Url;
@@ -257,8 +261,67 @@ impl DownloadSelectorTrait for QBittorrentSelector {
#[derive(Default)]
pub struct QBittorrentSyncData {
pub torrents: HashMap<String, QbitTorrent>,
pub categories: HashSet<String>,
pub categories: HashMap<String, Category>,
pub tags: HashSet<String>,
pub trackers: HashMap<String, Vec<String>>,
pub server_state: HashMap<String, serde_value::Value>,
pub rid: i64,
}
impl QBittorrentSyncData {
pub fn patch(&mut self, data: SyncData) {
self.rid = data.rid;
if data.full_update.is_some_and(|s| s) {
self.torrents.clear();
self.categories.clear();
self.tags.clear();
self.trackers.clear();
}
if let Some(remove_categories) = data.categories_removed {
for c in remove_categories {
self.categories.remove(&c);
}
}
if let Some(add_categories) = data.categories {
self.categories.extend(add_categories);
}
if let Some(remove_tags) = data.tags_removed {
for t in remove_tags {
self.tags.remove(&t);
}
}
if let Some(add_tags) = data.tags {
self.tags.extend(add_tags);
}
if let Some(remove_torrents) = data.torrents_removed {
for t in remove_torrents {
self.torrents.remove(&t);
}
}
if let Some(add_torrents) = data.torrents {
for (hash, torrent_patch) in add_torrents {
if let Some(torrent_full) = self.torrents.get_mut(&hash) {
*torrent_full = merge(torrent_full, &torrent_patch).unwrap_or_else(|_| {
unreachable!("failed to merge torrents, but they are same type")
});
} else {
self.torrents.insert(hash, torrent_patch);
}
}
}
if let Some(remove_trackers) = data.trackers_removed {
for t in remove_trackers {
self.trackers.remove(&t);
}
}
if let Some(add_trackers) = data.trackers {
self.trackers.extend(add_trackers);
}
if let Some(server_state) = data.server_state {
self.server_state = merge(&self.server_state, &server_state).unwrap_or_else(|_| {
unreachable!("failed to merge server state, but they are same type")
});
}
}
}
pub struct QBittorrentDownloader {
@@ -269,13 +332,13 @@ pub struct QBittorrentDownloader {
pub save_path: PathBuf,
pub wait_sync_timeout: Duration,
pub sync_watch: watch::Sender<DateTime<Utc>>,
pub sync_data: QBittorrentSyncData,
pub sync_data: Arc<RwLock<QBittorrentSyncData>>,
}
impl QBittorrentDownloader {
pub async fn from_creation(
creation: QBittorrentDownloaderCreation,
) -> Result<Self, DownloaderError> {
) -> Result<Arc<Self>, DownloaderError> {
let endpoint_url = Url::parse(&creation.endpoint)?;
let credential = Credential::new(creation.username, creation.password);
@@ -286,7 +349,7 @@ impl QBittorrentDownloader {
client.sync(None).await?;
Ok(Self {
let downloader = Arc::new(Self {
client: Arc::new(client),
endpoint_url,
subscriber_id: creation.subscriber_id,
@@ -294,8 +357,40 @@ impl QBittorrentDownloader {
wait_sync_timeout: Duration::from_millis(10000),
downloader_id: creation.downloader_id,
sync_watch: watch::channel(Utc::now()).0,
sync_data: QBittorrentSyncData::default(),
})
sync_data: Arc::new(RwLock::new(QBittorrentSyncData::default())),
});
let event_loop_me = Arc::downgrade(&downloader);
tokio::spawn(async move { Self::start_event_loop(event_loop_me).await });
Ok(downloader)
}
async fn start_event_loop(me: Weak<Self>) {
let mut tick = 0;
loop {
sleep(Duration::from_millis(100)).await;
if let Some(me) = me.upgrade() {
if tick >= 100 {
let _ = me.sync_data().await.inspect_err(|e| {
tracing::error!(name = "sync_data", error = ?e);
});
tick = 0;
continue;
}
let count = me.sync_watch.receiver_count();
if count > 0 && tick >= 10 {
let _ = me.sync_data().await.inspect_err(|e| {
tracing::error!(name = "sync_data", error = ?e);
});
tick = i32::max(0, tick - 10);
} else {
tick += 1;
}
}
}
}
#[instrument(level = "debug")]
@@ -304,38 +399,6 @@ impl QBittorrentDownloader {
Ok(result)
}
pub async fn wait_sync_until<S>(
&self,
stop_wait_fn: S,
timeout: Option<Duration>,
) -> Result<(), DownloaderError>
where
S: Fn(&QBittorrentSyncData) -> bool,
{
let mut receiver = self.sync_watch.subscribe();
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
let start_time = Utc::now();
while let Ok(()) = receiver.changed().await {
let sync_time = receiver.borrow();
if sync_time
.signed_duration_since(start_time)
.num_milliseconds()
> timeout.as_millis() as i64
{
tracing::warn!(name = "wait_until timeout", timeout = ?timeout);
return Err(DownloaderError::DownloadTimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
});
}
if stop_wait_fn(&self.sync_data) {
break;
}
}
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn add_category(&self, category: &str) -> Result<(), DownloaderError> {
self.client
@@ -345,8 +408,11 @@ impl QBittorrentDownloader {
self.save_path.as_str(),
)
.await?;
self.wait_sync_until(|sync_data| sync_data.categories.contains(category), None)
.await?;
self.wait_sync_until(
|sync_data| sync_data.categories.contains_key(category),
None,
)
.await?;
Ok(())
}
@@ -363,8 +429,11 @@ impl QBittorrentDownloader {
hashes: Vec<String>,
category: &str,
) -> Result<(), DownloaderError> {
if !self.sync_data.categories.contains(category) {
self.add_category(category).await?;
{
let sync_data = self.sync_data.read().await;
if !sync_data.categories.contains_key(category) {
self.add_category(category).await?;
}
}
self.client
.set_torrent_category(hashes.clone(), category)
@@ -429,18 +498,20 @@ impl QBittorrentDownloader {
hash: &str,
replacer: F,
) -> Result<(), DownloaderError> {
let old_path = self
.sync_data
.torrents
.get(hash)
.and_then(|t| t.content_path.as_deref())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"no torrent or torrent does not contain content path",
)
})?
.to_string();
let old_path = {
let sync_data = self.sync_data.read().await;
sync_data
.torrents
.get(hash)
.and_then(|t| t.content_path.as_deref())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"no torrent or torrent does not contain content path",
)
})?
.to_string()
};
let new_path = replacer(old_path.clone());
self.client
.rename_file(hash, old_path.clone(), new_path.to_string())
@@ -512,6 +583,55 @@ impl QBittorrentDownloader {
.whatever_context::<_, DownloaderError>("No bittorrent found")?;
Ok(torrent.save_path.take())
}
async fn sync_data(&self) -> Result<(), DownloaderError> {
let rid = { self.sync_data.read().await.rid };
let sync_data_patch = self.client.sync(Some(rid)).await?;
{
let mut sync_data = self.sync_data.write().await;
sync_data.patch(sync_data_patch);
}
let now = Utc::now();
self.sync_watch.send_replace(now);
Ok(())
}
async fn wait_sync_until<S>(
&self,
stop_wait_fn: S,
timeout: Option<Duration>,
) -> Result<(), DownloaderError>
where
S: Fn(&QBittorrentSyncData) -> bool,
{
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
let start_time = Utc::now();
let mut receiver = self.sync_watch.subscribe();
while let Ok(()) = receiver.changed().await {
let has_timeout = {
let sync_time = receiver.borrow().clone();
sync_time
.signed_duration_since(start_time)
.num_milliseconds()
> timeout.as_millis() as i64
};
if has_timeout {
tracing::warn!(name = "wait_until timeout", timeout = ?timeout);
return Err(DownloaderError::DownloadTimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
});
}
{
let sync_data = &self.sync_data.read().await;
if stop_wait_fn(&sync_data) {
break;
}
}
}
Ok(())
}
}
#[async_trait]
@@ -723,7 +843,33 @@ pub mod tests {
}
#[cfg(feature = "testcontainers")]
pub async fn create_qbit_testcontainer()
pub async fn create_torrents_testcontainers()
-> RResult<testcontainers::ContainerRequest<testcontainers::GenericImage>> {
use testcontainers::{
GenericImage,
core::{
ContainerPort,
// ReuseDirective,
WaitFor,
},
};
use testcontainers_modules::testcontainers::ImageExt;
use crate::test_utils::testcontainers::ContainerRequestEnhancedExt;
let container = GenericImage::new("ghcr.io/dumtruck/konobangu-testing-torrents", "latest")
.with_exposed_port()
.with_wait_for(WaitFor::message_on_stderr("Connection to localhost"))
// .with_reuse(ReuseDirective::Always)
.with_default_log_consumer()
.with_prune_existed_label("qbit-downloader", true, true)
.await?;
Ok(container)
}
#[cfg(feature = "testcontainers")]
pub async fn create_qbit_testcontainers()
-> RResult<testcontainers::ContainerRequest<testcontainers::GenericImage>> {
use testcontainers::{
GenericImage,
@@ -755,7 +901,9 @@ pub mod tests {
#[cfg(not(feature = "testcontainers"))]
#[tokio::test]
async fn test_qbittorrent_downloader() {
let _ = test_qbittorrent_downloader_impl(None, None).await;
let hash = "47ee2d69e7f19af783ad896541a07b012676f858".to_string();
let torrent_url = "https://mikanani.me/Download/20240301/{}.torrent";
let _ = test_qbittorrent_downloader_impl(torrent_url, hash, None, None).await;
}
#[cfg(feature = "testcontainers")]
@@ -769,7 +917,7 @@ pub mod tests {
.with_test_writer()
.init();
let image = create_qbit_testcontainer().await?;
let image = create_qbit_testcontainers().await?;
let container = image.start().await?;
@@ -809,6 +957,8 @@ pub mod tests {
}
async fn test_qbittorrent_downloader_impl(
torrent_url: String,
torrent_hash: String,
username: Option<&str>,
password: Option<&str>,
) -> RResult<()> {