From 376d2b28d3893d642d00c980d57e26689ff5aff7 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Sat, 5 Apr 2025 19:51:59 +0800 Subject: [PATCH] refactor: split out testcontainers-rs-ext --- Cargo.lock | 216 +--- Cargo.toml | 2 +- apps/recorder/Cargo.toml | 16 +- apps/recorder/src/downloader/mod.rs | 4 - .../src/downloader/qbit/downloader.rs | 605 +++++++++ apps/recorder/src/downloader/qbit/mod.rs | 1128 +---------------- apps/recorder/src/downloader/qbit/task.rs | 221 ++++ apps/recorder/src/downloader/qbit/test.rs | 280 ++++ apps/recorder/src/test_utils/mod.rs | 2 - .../recorder/src/test_utils/testcontainers.rs | 117 -- packages/testing-torrents/Cargo.toml | 10 + packages/testing-torrents/src/lib.rs | 44 + 12 files changed, 1202 insertions(+), 1443 deletions(-) create mode 100644 apps/recorder/src/downloader/qbit/downloader.rs create mode 100644 apps/recorder/src/downloader/qbit/task.rs create mode 100644 apps/recorder/src/downloader/qbit/test.rs delete mode 100644 apps/recorder/src/test_utils/testcontainers.rs create mode 100644 packages/testing-torrents/Cargo.toml create mode 100644 packages/testing-torrents/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f776ad3..a814145 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -955,21 +955,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "conquer-once" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d008a441c0f269f36ca13712528069a86a3e60dffee1d98b976eb3b0b2160b4" -dependencies = [ - "conquer-util", -] - -[[package]] -name = "conquer-util" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e763eef8846b13b380f37dfecda401770b0ca4e56e95170237bd7c25c7db3582" - [[package]] name = "console" version = "0.15.11" @@ -1633,18 +1618,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "enum-as-inner" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.100", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -2218,51 +2191,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hickory-proto" -version = "0.24.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner", - "futures-channel", - "futures-io", - "futures-util", - "idna 1.0.3", - "ipnet", - "once_cell", - "rand 0.8.5", - "thiserror 1.0.69", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "hickory-resolver" -version = "0.24.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e" -dependencies = [ - "cfg-if", - "futures-util", - "hickory-proto", - "ipconfig", - "lru-cache", - "once_cell", - "parking_lot 0.12.3", - "rand 0.8.5", - "resolv-conf", - "smallvec", - "thiserror 1.0.69", - "tokio", - "tracing", -] - [[package]] name = "hkdf" version = "0.12.4" @@ -2290,17 +2218,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "hostname" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" -dependencies = [ - "cfg-if", - "libc", - "windows 0.52.0", -] - [[package]] name = "html-escape" version = "0.2.13" @@ -2486,7 +2403,6 @@ dependencies = [ "hyper", "hyper-util", "rustls", - "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -2807,18 +2723,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "ipconfig" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" -dependencies = [ - "socket2", - "widestring", - "windows-sys 0.48.0", - "winreg", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -2870,17 +2774,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "java-properties" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37bf6f484471c451f2b51eabd9e66b3fa7274550c5ec4b6c3d6070840945117f" -dependencies = [ - "encoding_rs", - "lazy_static", - "regex", -] - [[package]] name = "jobserver" version = "0.1.32" @@ -3441,15 +3334,6 @@ dependencies = [ "hashbrown 0.15.2", ] -[[package]] -name = "lru-cache" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "mac" version = "0.1.1" @@ -4798,7 +4682,6 @@ dependencies = [ "axum", "axum-extra", "base64 0.22.1", - "bollard", "bytes", "chrono", "clap", @@ -4855,7 +4738,9 @@ dependencies = [ "snafu", "tera", "testcontainers", + "testcontainers-ext", "testcontainers-modules", + "testing-torrents", "tokio", "tower", "tower-http", @@ -4991,7 +4876,6 @@ dependencies = [ "futures-core", "futures-util", "h2", - "hickory-resolver", "http", "http-body", "http-body-util", @@ -5010,7 +4894,6 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", - "rustls-native-certs", "rustls-pemfile", "rustls-pki-types", "serde", @@ -5087,15 +4970,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "resolv-conf" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48375394603e3dd4b2d64371f7148fd8c7baa2680e28741f2cb8d23b59e3d4c4" -dependencies = [ - "hostname", -] - [[package]] name = "retry-policies" version = "0.4.0" @@ -5672,17 +5546,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-java-properties" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b5db85b934578ef8a8acc8ef7956b313d9e920d4d4160ef7862bd4c85d4bc7" -dependencies = [ - "encoding_rs", - "java-properties", - "serde", -] - [[package]] name = "serde-value" version = "0.7.0" @@ -5901,16 +5764,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" -dependencies = [ - "libc", - "signal-hook-registry", -] - [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -6542,7 +6395,6 @@ dependencies = [ "bollard", "bollard-stubs", "bytes", - "conquer-once", "docker_credential", "either", "etcetera", @@ -6551,21 +6403,31 @@ dependencies = [ "memchr", "parse-display", "pin-project-lite", - "reqwest", "serde", - "serde-java-properties", "serde_json", "serde_with", - "signal-hook", "thiserror 2.0.12", "tokio", "tokio-stream", "tokio-tar", "tokio-util", - "ulid", "url", ] +[[package]] +name = "testcontainers-ext" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a75f54e804529c1a3d0491c989fba636e349dca1d3e484da1f316f41ba6110" +dependencies = [ + "bollard", + "futures", + "log", + "testcontainers", + "testcontainers-modules", + "tracing", +] + [[package]] name = "testcontainers-modules" version = "0.11.6" @@ -6575,6 +6437,16 @@ dependencies = [ "testcontainers", ] +[[package]] +name = "testing-torrents" +version = "0.1.0" +dependencies = [ + "serde", + "testcontainers", + "testcontainers-ext", + "testcontainers-modules", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -7047,16 +6919,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" -[[package]] -name = "ulid" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" -dependencies = [ - "rand 0.9.0", - "web-time", -] - [[package]] name = "uncased" version = "0.9.10" @@ -7433,12 +7295,6 @@ dependencies = [ "wasite", ] -[[package]] -name = "widestring" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" - [[package]] name = "winapi" version = "0.3.9" @@ -7470,16 +7326,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" -dependencies = [ - "windows-core 0.52.0", - "windows-targets 0.52.6", -] - [[package]] name = "windows" version = "0.58.0" @@ -7895,16 +7741,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winreg" -version = "0.50.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index 52b2d7a..fcaa74c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["apps/recorder"] +members = ["apps/recorder", "packages/testing-torrents"] resolver = "2" [patch.crates-io] diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 71c4967..8291bef 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -18,7 +18,8 @@ default = [] testcontainers = [ "dep:testcontainers", "dep:testcontainers-modules", - "dep:bollard", + "dep:testcontainers-ext", + "dep:testing-torrents", ] [dependencies] @@ -75,16 +76,12 @@ qbit-rs = { git = "https://github.com/lonelyhentxi/qbit.git", rev = "72d53138ebe "default", "builder", ] } -testcontainers = { version = "0.23.3", features = [ - "default", - "properties-config", - "watchdog", - "http_wait", - "reusable-containers", -], optional = true } +testcontainers = { version = "0.23.3", optional = true } testcontainers-modules = { version = "0.11.4", optional = true } +testcontainers-ext = { version = "0.1.0", optional = true, features = [ + "tracing", +] } log = "0.4.22" -bollard = { version = "0.18", optional = true } async-graphql = { version = "7", features = [] } async-graphql-axum = "7" fastrand = "2.3.0" @@ -132,6 +129,7 @@ anyhow = "1.0.97" serde_yaml = "0.9.34" merge-struct = "0.1.0" serde-value = "0.7.0" +testing-torrents = { path = "../../packages/testing-torrents", optional = true } [dev-dependencies] serial_test = "3" diff --git a/apps/recorder/src/downloader/mod.rs b/apps/recorder/src/downloader/mod.rs index 38a22c8..b1cb2db 100644 --- a/apps/recorder/src/downloader/mod.rs +++ b/apps/recorder/src/downloader/mod.rs @@ -6,7 +6,3 @@ pub mod rqbit; pub mod utils; pub use errors::DownloaderError; -pub use qbit::{ - QBittorrentDownloader, QBittorrentDownloaderCreation, QbitTorrent, QbitTorrentContent, - QbitTorrentFile, QbitTorrentFilter, QbitTorrentSource, -}; diff --git a/apps/recorder/src/downloader/qbit/downloader.rs b/apps/recorder/src/downloader/qbit/downloader.rs new file mode 100644 index 0000000..ca9a977 --- /dev/null +++ b/apps/recorder/src/downloader/qbit/downloader.rs @@ -0,0 +1,605 @@ +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + fmt::Debug, + sync::{Arc, Weak}, + time::Duration, +}; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use futures_util::future::try_join_all; +use itertools::Itertools; +use merge_struct::merge; +use qbit_rs::{ + Qbit, + model::{ + AddTorrentArg, Category, Credential, GetTorrentListArg, NonEmptyStr, Sep, SyncData, + Torrent as QbitTorrent, TorrentFile, TorrentSource, + }, +}; +use quirks_path::{Path, PathBuf}; +use snafu::{OptionExt, whatever}; +use tokio::{ + sync::{RwLock, watch}, + time::sleep, +}; +use tracing::instrument; +use url::Url; + +use crate::downloader::{ + DownloaderError, + bittorrent::{ + downloader::TorrentDownloaderTrait, + source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource}, + task::TORRENT_TAG_NAME, + }, + core::{DownloadIdSelector, DownloaderTrait}, + qbit::task::{ + QBittorrentCreation, QBittorrentHash, QBittorrentSelector, QBittorrentState, + QBittorrentTask, + }, + utils::path_equals_as_file_url, +}; + +pub struct QBittorrentDownloaderCreation { + pub endpoint: String, + pub username: String, + pub password: String, + pub save_path: String, + pub subscriber_id: i32, + pub downloader_id: i32, + pub wait_sync_timeout: Option, +} + +#[derive(Default)] +pub struct QBittorrentSyncData { + pub torrents: HashMap, + pub categories: HashMap, + pub tags: HashSet, + pub trackers: HashMap>, + pub server_state: HashMap, + pub rid: i64, +} + +impl QBittorrentSyncData { + pub fn patch(&mut self, data: SyncData) { + self.rid = data.rid; + if data.full_update.is_some_and(|s| s) { + self.torrents.clear(); + self.categories.clear(); + self.tags.clear(); + self.trackers.clear(); + } + if let Some(remove_categories) = data.categories_removed { + for c in remove_categories { + self.categories.remove(&c); + } + } + if let Some(add_categories) = data.categories { + self.categories.extend(add_categories); + } + if let Some(remove_tags) = data.tags_removed { + for t in remove_tags { + self.tags.remove(&t); + } + } + if let Some(add_tags) = data.tags { + self.tags.extend(add_tags); + } + if let Some(remove_torrents) = data.torrents_removed { + for t in remove_torrents { + self.torrents.remove(&t); + } + } + if let Some(add_torrents) = data.torrents { + for (hash, torrent_patch) in add_torrents { + if let Some(torrent_full) = self.torrents.get_mut(&hash) { + *torrent_full = merge(torrent_full, &torrent_patch).unwrap_or_else(|_| { + unreachable!("failed to merge torrents, but they are same type") + }); + } else { + self.torrents.insert(hash, torrent_patch); + } + } + } + if let Some(remove_trackers) = data.trackers_removed { + for t in remove_trackers { + self.trackers.remove(&t); + } + } + if let Some(add_trackers) = data.trackers { + self.trackers.extend(add_trackers); + } + if let Some(server_state) = data.server_state { + self.server_state = merge(&self.server_state, &server_state).unwrap_or_else(|_| { + unreachable!("failed to merge server state, but they are same type") + }); + } + } +} + +pub struct QBittorrentDownloader { + pub subscriber_id: i32, + pub downloader_id: i32, + pub endpoint_url: Url, + pub client: Arc, + pub save_path: PathBuf, + pub wait_sync_timeout: Duration, + pub sync_watch: watch::Sender>, + pub sync_data: Arc>, +} + +impl QBittorrentDownloader { + pub async fn from_creation( + creation: QBittorrentDownloaderCreation, + ) -> Result, DownloaderError> { + let endpoint_url = Url::parse(&creation.endpoint)?; + + let credential = Credential::new(creation.username, creation.password); + + let client = Qbit::new(endpoint_url.clone(), credential); + + client.login(false).await?; + + client.sync(None).await?; + + let downloader = Arc::new(Self { + client: Arc::new(client), + endpoint_url, + subscriber_id: creation.subscriber_id, + save_path: creation.save_path.into(), + wait_sync_timeout: creation + .wait_sync_timeout + .unwrap_or(Duration::from_secs(10)), + downloader_id: creation.downloader_id, + sync_watch: watch::channel(Utc::now()).0, + sync_data: Arc::new(RwLock::new(QBittorrentSyncData::default())), + }); + + let event_loop_me = Arc::downgrade(&downloader); + + tokio::spawn(async move { Self::start_event_loop(event_loop_me).await }); + + Ok(downloader) + } + + async fn start_event_loop(me: Weak) { + let mut tick = 0; + + loop { + sleep(Duration::from_millis(100)).await; + if let Some(me) = me.upgrade() { + if tick >= 100 { + let _ = me.sync_data().await.inspect_err(|e| { + tracing::error!(name = "sync_data", error = ?e); + }); + tick = 0; + continue; + } + let count = me.sync_watch.receiver_count(); + if count > 0 && tick >= 10 { + let _ = me.sync_data().await.inspect_err(|e| { + tracing::error!(name = "sync_data", error = ?e); + }); + tick = i32::max(0, tick - 10); + } else { + tick += 1; + } + } + } + } + + #[instrument(level = "debug")] + pub async fn api_version(&self) -> Result { + let result = self.client.get_webapi_version().await?; + Ok(result) + } + + #[instrument(level = "debug", skip(self))] + pub async fn add_category(&self, category: &str) -> Result<(), DownloaderError> { + self.client + .add_category( + NonEmptyStr::new(category) + .whatever_context::<_, DownloaderError>("category can not be empty")?, + self.save_path.as_str(), + ) + .await?; + self.wait_sync_until( + |sync_data| sync_data.categories.contains_key(category), + None, + ) + .await?; + + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + pub async fn check_connection(&self) -> Result<(), DownloaderError> { + self.api_version().await?; + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + pub async fn set_torrents_category( + &self, + hashes: Vec, + category: &str, + ) -> Result<(), DownloaderError> { + { + let category_no_exists = { + let sync_data = self.sync_data.read().await; + !sync_data.categories.contains_key(category) + }; + + if category_no_exists { + self.add_category(category).await?; + } + } + self.client + .set_torrent_category(hashes.clone(), category) + .await?; + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + hashes.iter().all(|h| { + torrents + .get(h) + .is_some_and(|t| t.category.as_deref().is_some_and(|c| c == category)) + }) + }, + None, + ) + .await?; + Ok(()) + } + + pub fn get_save_path(&self, sub_path: &Path) -> PathBuf { + self.save_path.join(sub_path) + } + + #[instrument(level = "debug", skip(self))] + pub async fn add_torrent_tags( + &self, + hashes: Vec, + tags: Vec, + ) -> Result<(), DownloaderError> { + if tags.is_empty() { + whatever!("add bittorrent tags can not be empty"); + } + self.client + .add_torrent_tags(hashes.clone(), tags.clone()) + .await?; + let tag_sets = tags.iter().map(|s| s.as_str()).collect::>(); + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + + hashes.iter().all(|h| { + torrents.get(h).is_some_and(|t| { + t.tags.as_ref().is_some_and(|t| { + t.split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect::>() + .is_superset(&tag_sets) + }) + }) + }) + }, + None, + ) + .await?; + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + pub async fn move_torrents( + &self, + hashes: Vec, + new_path: &str, + ) -> Result<(), DownloaderError> { + self.client + .set_torrent_location(hashes.clone(), new_path) + .await?; + + self.wait_sync_until( + |sync_data| -> bool { + let torrents = &sync_data.torrents; + + hashes.iter().all(|h| { + torrents.get(h).is_some_and(|t| { + t.save_path.as_deref().is_some_and(|p| { + path_equals_as_file_url(p, new_path) + .inspect_err(|error| { + tracing::warn!(name = "path_equals_as_file_url", error = ?error); + }) + .unwrap_or(false) + }) + }) + }) + }, + None, + ) + .await?; + Ok(()) + } + + pub async fn get_torrent_path( + &self, + hashes: String, + ) -> Result, DownloaderError> { + let mut torrent_list = self + .client + .get_torrent_list(GetTorrentListArg { + hashes: Some(hashes), + ..Default::default() + }) + .await?; + let torrent = torrent_list + .first_mut() + .whatever_context::<_, DownloaderError>("No bittorrent found")?; + Ok(torrent.save_path.take()) + } + + #[instrument(level = "debug", skip(self))] + async fn sync_data(&self) -> Result<(), DownloaderError> { + let rid = { self.sync_data.read().await.rid }; + let sync_data_patch = self.client.sync(Some(rid)).await?; + { + let mut sync_data = self.sync_data.write().await; + sync_data.patch(sync_data_patch); + } + let now = Utc::now(); + self.sync_watch.send_replace(now); + Ok(()) + } + + async fn wait_sync_until( + &self, + stop_wait_fn: S, + timeout: Option, + ) -> Result<(), DownloaderError> + where + S: Fn(&QBittorrentSyncData) -> bool, + { + { + let sync_data = &self.sync_data.read().await; + if stop_wait_fn(sync_data) { + return Ok(()); + } + } + + let timeout = timeout.unwrap_or(self.wait_sync_timeout); + let start_time = Utc::now(); + + let mut receiver = self.sync_watch.subscribe(); + + while let Ok(()) = receiver.changed().await { + let has_timeout = { + let sync_time = *receiver.borrow(); + let diff_time = sync_time - start_time; + diff_time.num_milliseconds() > timeout.as_millis() as i64 + }; + if has_timeout { + tracing::warn!(name = "wait_until timeout", timeout = ?timeout); + return Err(DownloaderError::DownloadTimeoutError { + action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), + timeout, + }); + } + { + let sync_data = &self.sync_data.read().await; + if stop_wait_fn(sync_data) { + break; + } + } + } + Ok(()) + } +} + +#[async_trait] +impl DownloaderTrait for QBittorrentDownloader { + type State = QBittorrentState; + type Id = QBittorrentHash; + type Task = QBittorrentTask; + type Creation = QBittorrentCreation; + type Selector = QBittorrentSelector; + + async fn add_downloads( + &self, + creation: Self::Creation, + ) -> Result, DownloaderError> { + let tags = { + let mut tags = vec![TORRENT_TAG_NAME.to_string()]; + tags.extend(creation.tags); + Some(tags.into_iter().filter(|s| !s.is_empty()).join(",")) + }; + + let save_path = Some(creation.save_path.into_string()); + + let sources = creation.sources; + let hashes = HashSet::from_iter(sources.iter().map(|s| s.hash_info().to_string())); + let (urls_source, files_source) = { + let mut urls = vec![]; + let mut files = vec![]; + for s in sources { + match s { + HashTorrentSource::MagnetUrl(MagnetUrlSource { url, .. }) => { + urls.push(Url::parse(&url)?) + } + HashTorrentSource::TorrentFile(TorrentFileSource { + payload, filename, .. + }) => files.push(TorrentFile { + filename, + data: payload.into(), + }), + } + } + ( + if urls.is_empty() { + None + } else { + Some(TorrentSource::Urls { + urls: Sep::from(urls), + }) + }, + if files.is_empty() { + None + } else { + Some(TorrentSource::TorrentFiles { torrents: files }) + }, + ) + }; + + let category = creation.category; + + if let Some(category) = category.as_deref() { + let has_caetgory = { + self.sync_data + .read() + .await + .categories + .contains_key(category) + }; + if !has_caetgory { + self.add_category(category).await?; + } + } + + if let Some(source) = urls_source { + self.client + .add_torrent(AddTorrentArg { + source, + savepath: save_path.clone(), + auto_torrent_management: Some(false), + category: category.clone(), + tags: tags.clone(), + ..Default::default() + }) + .await?; + } + + if let Some(source) = files_source { + self.client + .add_torrent(AddTorrentArg { + source, + savepath: save_path, + auto_torrent_management: Some(false), + category, + tags, + ..Default::default() + }) + .await?; + } + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + hashes.iter().all(|hash| torrents.contains_key(hash)) + }, + None, + ) + .await?; + Ok(hashes) + } + + async fn pause_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::pause_downloads(self, selector).await + } + + async fn resume_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::resume_downloads(self, selector).await + } + + async fn remove_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::remove_downloads(self, selector).await + } + + async fn query_downloads( + &self, + selector: QBittorrentSelector, + ) -> Result, DownloaderError> { + let selector = match selector { + QBittorrentSelector::Hash(h) => h.into(), + QBittorrentSelector::Complex(c) => c, + }; + + let torrent_list = self.client.get_torrent_list(selector.query).await?; + + let torrent_contents = try_join_all(torrent_list.iter().map(|s| async { + if let Some(hash) = &s.hash { + self.client.get_torrent_contents(hash as &str, None).await + } else { + Ok(vec![]) + } + })) + .await?; + + let tasks = torrent_list + .into_iter() + .zip(torrent_contents) + .map(|(t, c)| Self::Task::from_query(t, c)) + .collect::, _>>()?; + Ok(tasks) + } +} + +#[async_trait] +impl TorrentDownloaderTrait for QBittorrentDownloader { + type IdSelector = DownloadIdSelector; + #[instrument(level = "debug", skip(self))] + async fn pause_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client.pause_torrents(hashes.clone()).await?; + Ok(hashes) + } + + #[instrument(level = "debug", skip(self))] + async fn resume_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client.resume_torrents(hashes.clone()).await?; + Ok(hashes) + } + + #[instrument(level = "debug", skip(self))] + async fn remove_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client + .delete_torrents(hashes.clone(), Some(true)) + .await?; + self.wait_sync_until( + |sync_data| -> bool { + let torrents = &sync_data.torrents; + hashes.iter().all(|h| !torrents.contains_key(h)) + }, + None, + ) + .await?; + Ok(hashes) + } +} + +impl Debug for QBittorrentDownloader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QBittorrentDownloader") + .field("subscriber_id", &self.subscriber_id) + .field("client", &self.endpoint_url.as_str()) + .finish() + } +} diff --git a/apps/recorder/src/downloader/qbit/mod.rs b/apps/recorder/src/downloader/qbit/mod.rs index caecad8..9ff66ec 100644 --- a/apps/recorder/src/downloader/qbit/mod.rs +++ b/apps/recorder/src/downloader/qbit/mod.rs @@ -1,1123 +1,11 @@ -use std::{ - borrow::Cow, - collections::{HashMap, HashSet}, - fmt::Debug, - sync::{Arc, Weak}, - time::Duration, -}; - -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use futures::future::try_join_all; -use itertools::Itertools; -use merge_struct::merge; -pub use qbit_rs::model::{ - Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFile as QbitTorrentFile, - TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource, -}; -use qbit_rs::{ - Qbit, - model::{ - AddTorrentArg, Category, Credential, GetTorrentListArg, NonEmptyStr, Sep, State, SyncData, - TorrentFile, TorrentSource, - }, -}; -use quirks_path::{Path, PathBuf}; -use snafu::prelude::*; -use tokio::{ - sync::{RwLock, watch}, - time::sleep, -}; -use tracing::instrument; -use url::Url; - -use super::{DownloaderError, utils::path_equals_as_file_url}; -use crate::downloader::{ - bittorrent::{ - downloader::TorrentDownloaderTrait, - source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource}, - task::{ - TORRENT_TAG_NAME, TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, - TorrentTaskTrait, - }, - }, - core::{ - DownloadCreationTrait, DownloadIdSelector, DownloadIdTrait, DownloadSelectorTrait, - DownloadStateTrait, DownloadTaskTrait, DownloaderTrait, - }, -}; - -pub type QBittorrentHash = String; - -impl DownloadIdTrait for QBittorrentHash {} - -impl TorrentHashTrait for QBittorrentHash {} - -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] -pub struct QBittorrentState(Option); - -impl DownloadStateTrait for QBittorrentState {} - -impl TorrentStateTrait for QBittorrentState {} - -impl From> for QBittorrentState { - fn from(value: Option) -> Self { - Self(value) - } -} - -#[derive(Debug)] -pub struct QBittorrentTask { - pub hash_info: QBittorrentHash, - pub torrent: QbitTorrent, - pub contents: Vec, - pub state: QBittorrentState, -} - -impl QBittorrentTask { - fn from_query( - torrent: QbitTorrent, - contents: Vec, - ) -> Result { - let hash = torrent - .hash - .clone() - .ok_or_else(|| DownloaderError::TorrentMetaError { - message: "missing hash".to_string(), - source: None.into(), - })?; - let state = QBittorrentState(torrent.state.clone()); - Ok(Self { - hash_info: hash, - contents, - state, - torrent, - }) - } -} - -impl DownloadTaskTrait for QBittorrentTask { - type State = QBittorrentState; - type Id = QBittorrentHash; - - fn id(&self) -> &Self::Id { - &self.hash_info - } - - fn into_id(self) -> Self::Id { - self.hash_info - } - - fn name(&self) -> Cow<'_, str> { - self.torrent - .name - .as_deref() - .map(Cow::Borrowed) - .unwrap_or_else(|| DownloadTaskTrait::name(self)) - } - - fn speed(&self) -> Option { - self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok()) - } - - fn state(&self) -> &Self::State { - &self.state - } - - fn dl_bytes(&self) -> Option { - self.torrent.downloaded.and_then(|v| u64::try_from(v).ok()) - } - - fn total_bytes(&self) -> Option { - self.torrent.size.and_then(|v| u64::try_from(v).ok()) - } - - fn left_bytes(&self) -> Option { - self.torrent.amount_left.and_then(|v| u64::try_from(v).ok()) - } - - fn et(&self) -> Option { - self.torrent - .time_active - .and_then(|v| u64::try_from(v).ok()) - .map(Duration::from_secs) - } - - fn eta(&self) -> Option { - self.torrent - .eta - .and_then(|v| u64::try_from(v).ok()) - .map(Duration::from_secs) - } - - fn progress(&self) -> Option { - self.torrent.progress.as_ref().map(|s| *s as f32) - } -} - -impl TorrentTaskTrait for QBittorrentTask { - fn hash_info(&self) -> &str { - &self.hash_info - } - - fn tags(&self) -> impl Iterator> { - self.torrent - .tags - .as_deref() - .unwrap_or("") - .split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .map(Cow::Borrowed) - } - - fn category(&self) -> Option> { - self.torrent.category.as_deref().map(Cow::Borrowed) - } -} - -#[derive(Debug, Clone, Default)] -pub struct QBittorrentCreation { - pub save_path: PathBuf, - pub tags: Vec, - pub category: Option, - pub sources: Vec, -} - -impl DownloadCreationTrait for QBittorrentCreation { - type Task = QBittorrentTask; -} - -impl TorrentCreationTrait for QBittorrentCreation { - fn save_path(&self) -> &Path { - self.save_path.as_ref() - } - - fn save_path_mut(&mut self) -> &mut PathBuf { - &mut self.save_path - } - - fn sources_mut(&mut self) -> &mut Vec { - &mut self.sources - } -} - -pub struct QBittorrentDownloaderCreation { - pub endpoint: String, - pub username: String, - pub password: String, - pub save_path: String, - pub subscriber_id: i32, - pub downloader_id: i32, - pub wait_sync_timeout: Option, -} - -pub type QBittorrentHashSelector = DownloadIdSelector; - -pub struct QBittorrentComplexSelector { - pub query: GetTorrentListArg, -} - -impl From for QBittorrentComplexSelector { - fn from(value: QBittorrentHashSelector) -> Self { - Self { - query: GetTorrentListArg { - hashes: Some(value.ids.join("|")), - ..Default::default() - }, - } - } -} - -impl DownloadSelectorTrait for QBittorrentComplexSelector { - type Id = QBittorrentHash; - type Task = QBittorrentTask; -} - -pub enum QBittorrentSelector { - Hash(QBittorrentHashSelector), - Complex(QBittorrentComplexSelector), -} - -impl DownloadSelectorTrait for QBittorrentSelector { - type Id = QBittorrentHash; - type Task = QBittorrentTask; - - fn try_into_ids_only(self) -> Result, Self> { - match self { - QBittorrentSelector::Complex(c) => { - c.try_into_ids_only().map_err(QBittorrentSelector::Complex) - } - QBittorrentSelector::Hash(h) => { - let result = h - .try_into_ids_only() - .unwrap_or_else(|_| unreachable!("hash selector must contains hash")) - .into_iter(); - Ok(result.collect_vec()) - } - } - } -} - -#[derive(Default)] -pub struct QBittorrentSyncData { - pub torrents: HashMap, - pub categories: HashMap, - pub tags: HashSet, - pub trackers: HashMap>, - pub server_state: HashMap, - pub rid: i64, -} -impl QBittorrentSyncData { - pub fn patch(&mut self, data: SyncData) { - self.rid = data.rid; - if data.full_update.is_some_and(|s| s) { - self.torrents.clear(); - self.categories.clear(); - self.tags.clear(); - self.trackers.clear(); - } - if let Some(remove_categories) = data.categories_removed { - for c in remove_categories { - self.categories.remove(&c); - } - } - if let Some(add_categories) = data.categories { - self.categories.extend(add_categories); - } - if let Some(remove_tags) = data.tags_removed { - for t in remove_tags { - self.tags.remove(&t); - } - } - if let Some(add_tags) = data.tags { - self.tags.extend(add_tags); - } - if let Some(remove_torrents) = data.torrents_removed { - for t in remove_torrents { - self.torrents.remove(&t); - } - } - if let Some(add_torrents) = data.torrents { - for (hash, torrent_patch) in add_torrents { - if let Some(torrent_full) = self.torrents.get_mut(&hash) { - *torrent_full = merge(torrent_full, &torrent_patch).unwrap_or_else(|_| { - unreachable!("failed to merge torrents, but they are same type") - }); - } else { - self.torrents.insert(hash, torrent_patch); - } - } - } - if let Some(remove_trackers) = data.trackers_removed { - for t in remove_trackers { - self.trackers.remove(&t); - } - } - if let Some(add_trackers) = data.trackers { - self.trackers.extend(add_trackers); - } - if let Some(server_state) = data.server_state { - self.server_state = merge(&self.server_state, &server_state).unwrap_or_else(|_| { - unreachable!("failed to merge server state, but they are same type") - }); - } - } -} - -pub struct QBittorrentDownloader { - pub subscriber_id: i32, - pub downloader_id: i32, - pub endpoint_url: Url, - pub client: Arc, - pub save_path: PathBuf, - pub wait_sync_timeout: Duration, - pub sync_watch: watch::Sender>, - pub sync_data: Arc>, -} - -impl QBittorrentDownloader { - pub async fn from_creation( - creation: QBittorrentDownloaderCreation, - ) -> Result, DownloaderError> { - let endpoint_url = Url::parse(&creation.endpoint)?; - - let credential = Credential::new(creation.username, creation.password); - - let client = Qbit::new(endpoint_url.clone(), credential); - - client.login(false).await?; - - client.sync(None).await?; - - let downloader = Arc::new(Self { - client: Arc::new(client), - endpoint_url, - subscriber_id: creation.subscriber_id, - save_path: creation.save_path.into(), - wait_sync_timeout: creation - .wait_sync_timeout - .unwrap_or(Duration::from_secs(10)), - downloader_id: creation.downloader_id, - sync_watch: watch::channel(Utc::now()).0, - sync_data: Arc::new(RwLock::new(QBittorrentSyncData::default())), - }); - - let event_loop_me = Arc::downgrade(&downloader); - - tokio::spawn(async move { Self::start_event_loop(event_loop_me).await }); - - Ok(downloader) - } - - async fn start_event_loop(me: Weak) { - let mut tick = 0; - - loop { - sleep(Duration::from_millis(100)).await; - if let Some(me) = me.upgrade() { - if tick >= 100 { - let _ = me.sync_data().await.inspect_err(|e| { - tracing::error!(name = "sync_data", error = ?e); - }); - tick = 0; - continue; - } - let count = me.sync_watch.receiver_count(); - if count > 0 && tick >= 10 { - let _ = me.sync_data().await.inspect_err(|e| { - tracing::error!(name = "sync_data", error = ?e); - }); - tick = i32::max(0, tick - 10); - } else { - tick += 1; - } - } - } - } - - #[instrument(level = "debug")] - pub async fn api_version(&self) -> Result { - let result = self.client.get_webapi_version().await?; - Ok(result) - } - - #[instrument(level = "debug", skip(self))] - pub async fn add_category(&self, category: &str) -> Result<(), DownloaderError> { - self.client - .add_category( - NonEmptyStr::new(category) - .whatever_context::<_, DownloaderError>("category can not be empty")?, - self.save_path.as_str(), - ) - .await?; - self.wait_sync_until( - |sync_data| sync_data.categories.contains_key(category), - None, - ) - .await?; - - Ok(()) - } - - #[instrument(level = "debug", skip(self))] - pub async fn check_connection(&self) -> Result<(), DownloaderError> { - self.api_version().await?; - Ok(()) - } - - #[instrument(level = "debug", skip(self))] - pub async fn set_torrents_category( - &self, - hashes: Vec, - category: &str, - ) -> Result<(), DownloaderError> { - { - let category_no_exists = { - let sync_data = self.sync_data.read().await; - !sync_data.categories.contains_key(category) - }; - - if category_no_exists { - self.add_category(category).await?; - } - } - self.client - .set_torrent_category(hashes.clone(), category) - .await?; - self.wait_sync_until( - |sync_data| { - let torrents = &sync_data.torrents; - hashes.iter().all(|h| { - torrents - .get(h) - .is_some_and(|t| t.category.as_deref().is_some_and(|c| c == category)) - }) - }, - None, - ) - .await?; - Ok(()) - } - - pub fn get_save_path(&self, sub_path: &Path) -> PathBuf { - self.save_path.join(sub_path) - } - - #[instrument(level = "debug", skip(self))] - pub async fn add_torrent_tags( - &self, - hashes: Vec, - tags: Vec, - ) -> Result<(), DownloaderError> { - if tags.is_empty() { - whatever!("add bittorrent tags can not be empty"); - } - self.client - .add_torrent_tags(hashes.clone(), tags.clone()) - .await?; - let tag_sets = tags.iter().map(|s| s.as_str()).collect::>(); - self.wait_sync_until( - |sync_data| { - let torrents = &sync_data.torrents; - - hashes.iter().all(|h| { - torrents.get(h).is_some_and(|t| { - t.tags.as_ref().is_some_and(|t| { - t.split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect::>() - .is_superset(&tag_sets) - }) - }) - }) - }, - None, - ) - .await?; - Ok(()) - } - - #[instrument(level = "debug", skip(self))] - pub async fn move_torrents( - &self, - hashes: Vec, - new_path: &str, - ) -> Result<(), DownloaderError> { - self.client - .set_torrent_location(hashes.clone(), new_path) - .await?; - - self.wait_sync_until( - |sync_data| -> bool { - let torrents = &sync_data.torrents; - - hashes.iter().all(|h| { - torrents.get(h).is_some_and(|t| { - t.save_path.as_deref().is_some_and(|p| { - path_equals_as_file_url(p, new_path) - .inspect_err(|error| { - tracing::warn!(name = "path_equals_as_file_url", error = ?error); - }) - .unwrap_or(false) - }) - }) - }) - }, - None, - ) - .await?; - Ok(()) - } - - pub async fn get_torrent_path( - &self, - hashes: String, - ) -> Result, DownloaderError> { - let mut torrent_list = self - .client - .get_torrent_list(GetTorrentListArg { - hashes: Some(hashes), - ..Default::default() - }) - .await?; - let torrent = torrent_list - .first_mut() - .whatever_context::<_, DownloaderError>("No bittorrent found")?; - Ok(torrent.save_path.take()) - } - - #[instrument(level = "debug", skip(self))] - async fn sync_data(&self) -> Result<(), DownloaderError> { - let rid = { self.sync_data.read().await.rid }; - let sync_data_patch = self.client.sync(Some(rid)).await?; - { - let mut sync_data = self.sync_data.write().await; - sync_data.patch(sync_data_patch); - } - let now = Utc::now(); - self.sync_watch.send_replace(now); - Ok(()) - } - - async fn wait_sync_until( - &self, - stop_wait_fn: S, - timeout: Option, - ) -> Result<(), DownloaderError> - where - S: Fn(&QBittorrentSyncData) -> bool, - { - { - let sync_data = &self.sync_data.read().await; - if stop_wait_fn(sync_data) { - return Ok(()); - } - } - - let timeout = timeout.unwrap_or(self.wait_sync_timeout); - let start_time = Utc::now(); - - let mut receiver = self.sync_watch.subscribe(); - - while let Ok(()) = receiver.changed().await { - let has_timeout = { - let sync_time = *receiver.borrow(); - let diff_time = sync_time - start_time; - diff_time.num_milliseconds() > timeout.as_millis() as i64 - }; - if has_timeout { - tracing::warn!(name = "wait_until timeout", timeout = ?timeout); - return Err(DownloaderError::DownloadTimeoutError { - action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), - timeout, - }); - } - { - let sync_data = &self.sync_data.read().await; - if stop_wait_fn(sync_data) { - break; - } - } - } - Ok(()) - } -} - -#[async_trait] -impl DownloaderTrait for QBittorrentDownloader { - type State = QBittorrentState; - type Id = QBittorrentHash; - type Task = QBittorrentTask; - type Creation = QBittorrentCreation; - type Selector = QBittorrentSelector; - - async fn add_downloads( - &self, - creation: Self::Creation, - ) -> Result, DownloaderError> { - let tags = { - let mut tags = vec![TORRENT_TAG_NAME.to_string()]; - tags.extend(creation.tags); - Some(tags.into_iter().filter(|s| !s.is_empty()).join(",")) - }; - - let save_path = Some(creation.save_path.into_string()); - - let sources = creation.sources; - let hashes = HashSet::from_iter(sources.iter().map(|s| s.hash_info().to_string())); - let (urls_source, files_source) = { - let mut urls = vec![]; - let mut files = vec![]; - for s in sources { - match s { - HashTorrentSource::MagnetUrl(MagnetUrlSource { url, .. }) => { - urls.push(Url::parse(&url)?) - } - HashTorrentSource::TorrentFile(TorrentFileSource { - payload, filename, .. - }) => files.push(TorrentFile { - filename, - data: payload.into(), - }), - } - } - ( - if urls.is_empty() { - None - } else { - Some(TorrentSource::Urls { - urls: Sep::from(urls), - }) - }, - if files.is_empty() { - None - } else { - Some(TorrentSource::TorrentFiles { torrents: files }) - }, - ) - }; - - let category = creation.category; - - if let Some(category) = category.as_deref() { - let has_caetgory = { - self.sync_data - .read() - .await - .categories - .contains_key(category) - }; - if !has_caetgory { - self.add_category(category).await?; - } - } - - if let Some(source) = urls_source { - self.client - .add_torrent(AddTorrentArg { - source, - savepath: save_path.clone(), - auto_torrent_management: Some(false), - category: category.clone(), - tags: tags.clone(), - ..Default::default() - }) - .await?; - } - - if let Some(source) = files_source { - self.client - .add_torrent(AddTorrentArg { - source, - savepath: save_path, - auto_torrent_management: Some(false), - category, - tags, - ..Default::default() - }) - .await?; - } - self.wait_sync_until( - |sync_data| { - let torrents = &sync_data.torrents; - hashes.iter().all(|hash| torrents.contains_key(hash)) - }, - None, - ) - .await?; - Ok(hashes) - } - - async fn pause_downloads( - &self, - selector: Self::Selector, - ) -> Result, DownloaderError> { - ::pause_downloads(self, selector).await - } - - async fn resume_downloads( - &self, - selector: Self::Selector, - ) -> Result, DownloaderError> { - ::resume_downloads(self, selector).await - } - - async fn remove_downloads( - &self, - selector: Self::Selector, - ) -> Result, DownloaderError> { - ::remove_downloads(self, selector).await - } - - async fn query_downloads( - &self, - selector: QBittorrentSelector, - ) -> Result, DownloaderError> { - let selector = match selector { - QBittorrentSelector::Hash(h) => h.into(), - QBittorrentSelector::Complex(c) => c, - }; - - let torrent_list = self.client.get_torrent_list(selector.query).await?; - - let torrent_contents = try_join_all(torrent_list.iter().map(|s| async { - if let Some(hash) = &s.hash { - self.client.get_torrent_contents(hash as &str, None).await - } else { - Ok(vec![]) - } - })) - .await?; - - let tasks = torrent_list - .into_iter() - .zip(torrent_contents) - .map(|(t, c)| Self::Task::from_query(t, c)) - .collect::, _>>()?; - Ok(tasks) - } -} - -#[async_trait] -impl TorrentDownloaderTrait for QBittorrentDownloader { - type IdSelector = DownloadIdSelector; - #[instrument(level = "debug", skip(self))] - async fn pause_torrents( - &self, - hashes: Self::IdSelector, - ) -> Result { - self.client.pause_torrents(hashes.clone()).await?; - Ok(hashes) - } - - #[instrument(level = "debug", skip(self))] - async fn resume_torrents( - &self, - hashes: Self::IdSelector, - ) -> Result { - self.client.resume_torrents(hashes.clone()).await?; - Ok(hashes) - } - - #[instrument(level = "debug", skip(self))] - async fn remove_torrents( - &self, - hashes: Self::IdSelector, - ) -> Result { - self.client - .delete_torrents(hashes.clone(), Some(true)) - .await?; - self.wait_sync_until( - |sync_data| -> bool { - let torrents = &sync_data.torrents; - hashes.iter().all(|h| !torrents.contains_key(h)) - }, - None, - ) - .await?; - Ok(hashes) - } -} - -impl Debug for QBittorrentDownloader { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("QBittorrentDownloader") - .field("subscriber_id", &self.subscriber_id) - .field("client", &self.endpoint_url.as_str()) - .finish() - } -} +pub mod downloader; +pub mod task; #[cfg(test)] -pub mod tests { - use serde::{Deserialize, Serialize}; +mod test; - use super::*; - use crate::{ - downloader::core::DownloadIdSelectorTrait, - errors::{RError, app_error::RResult}, - test_utils::fetch::build_testing_http_client, - }; - - fn get_tmp_qbit_test_folder() -> &'static str { - if cfg!(all(windows, not(feature = "testcontainers"))) { - "C:\\Windows\\Temp\\konobangu\\qbit" - } else { - "/tmp/konobangu/qbit" - } - } - - #[derive(Serialize)] - struct MockFileItem { - path: String, - size: u64, - } - - #[derive(Serialize)] - #[serde(rename_all = "camelCase")] - struct MockRequest { - id: String, - file_list: Vec, - } - - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - #[allow(dead_code)] - pub struct MockResponse { - torrent_url: String, - magnet_url: String, - hash: String, - } - - #[cfg(feature = "testcontainers")] - pub async fn create_torrents_testcontainers() - -> RResult> { - use testcontainers::{ - GenericImage, - core::{ContainerPort, WaitFor}, - }; - use testcontainers_modules::testcontainers::ImageExt; - - use crate::test_utils::testcontainers::ContainerRequestEnhancedExt; - - let container = GenericImage::new("ghcr.io/dumtruck/konobangu-testing-torrents", "latest") - .with_wait_for(WaitFor::message_on_stdout("Listening on")) - .with_mapped_port(6080, ContainerPort::Tcp(6080)) - .with_mapped_port(6081, ContainerPort::Tcp(6081)) - .with_mapped_port(6082, ContainerPort::Tcp(6082)) - // .with_reuse(ReuseDirective::Always) - .with_default_log_consumer() - .with_prune_existed_label("konobangu-testing-torrents", true, true) - .await?; - - Ok(container) - } - - #[cfg(feature = "testcontainers")] - pub async fn create_qbit_testcontainers() - -> RResult> { - use testcontainers::{ - GenericImage, - core::{ - ContainerPort, - // ReuseDirective, - WaitFor, - }, - }; - use testcontainers_modules::testcontainers::ImageExt; - - use crate::test_utils::testcontainers::ContainerRequestEnhancedExt; - - let container = GenericImage::new("linuxserver/qbittorrent", "latest") - .with_wait_for(WaitFor::message_on_stderr("Connection to localhost")) - .with_env_var("WEBUI_PORT", "8080") - .with_env_var("TZ", "Asia/Singapore") - .with_env_var("TORRENTING_PORT", "6881") - .with_mapped_port(6881, ContainerPort::Tcp(6881)) - .with_mapped_port(8080, ContainerPort::Tcp(8080)) - // .with_reuse(ReuseDirective::Always) - .with_default_log_consumer() - .with_prune_existed_label("qbit-downloader", true, true) - .await?; - - Ok(container) - } - - #[cfg(not(feature = "testcontainers"))] - #[tokio::test] - async fn test_qbittorrent_downloader() { - let hash = "47ee2d69e7f19af783ad896541a07b012676f858".to_string(); - let torrent_url = "https://mikanani.me/Download/20240301/{}.torrent"; - let _ = test_qbittorrent_downloader_impl(torrent_url, hash, None, None).await; - } - - #[cfg(feature = "testcontainers")] - #[tokio::test(flavor = "multi_thread")] - async fn test_qbittorrent_downloader() -> RResult<()> { - use testcontainers::runners::AsyncRunner; - use tokio::io::AsyncReadExt; - - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .with_test_writer() - .init(); - - let torrents_image = create_torrents_testcontainers().await?; - let _torrents_container = torrents_image.start().await?; - - let torrents_req = MockRequest { - id: "f10ebdda-dd2e-43f8-b80c-bf0884d071c4".into(), - file_list: vec![MockFileItem { - path: "[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip \ - 1080p HEVC-10bit AAC ASSx2].mkv" - .into(), - size: 1024, - }], - }; - - let torrent_res: MockResponse = reqwest::Client::new() - .post("http://127.0.0.1:6080/api/torrents/mock") - .json(&torrents_req) - .send() - .await? - .json() - .await?; - - let qbit_image = create_qbit_testcontainers().await?; - let qbit_container = qbit_image.start().await?; - - let mut logs = String::new(); - - qbit_container - .stdout(false) - .read_to_string(&mut logs) - .await?; - - let username = logs - .lines() - .find_map(|line| { - if line.contains("The WebUI administrator username is") { - line.split_whitespace().last() - } else { - None - } - }) - .expect("should have username") - .trim(); - - let password = logs - .lines() - .find_map(|line| { - if line.contains("A temporary password is provided for") { - line.split_whitespace().last() - } else { - None - } - }) - .expect("should have password") - .trim(); - - tracing::info!(username, password); - - test_qbittorrent_downloader_impl( - torrent_res.torrent_url, - torrent_res.hash, - Some(username), - Some(password), - ) - .await?; - - Ok(()) - } - - async fn test_qbittorrent_downloader_impl( - torrent_url: String, - torrent_hash: String, - username: Option<&str>, - password: Option<&str>, - ) -> RResult<()> { - let http_client = build_testing_http_client()?; - let base_save_path = Path::new(get_tmp_qbit_test_folder()); - - let downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation { - endpoint: "http://127.0.0.1:8080".to_string(), - password: password.unwrap_or_default().to_string(), - username: username.unwrap_or_default().to_string(), - subscriber_id: 0, - save_path: base_save_path.to_string(), - downloader_id: 0, - wait_sync_timeout: Some(Duration::from_secs(3)), - }) - .await?; - - downloader.check_connection().await?; - - downloader - .remove_torrents(vec![torrent_hash.clone()].into()) - .await?; - - let torrent_source = - HashTorrentSource::from_url_and_http_client(&http_client, torrent_url).await?; - - let folder_name = format!("torrent_test_{}", Utc::now().timestamp()); - let save_path = base_save_path.join(&folder_name); - - let torrent_creation = QBittorrentCreation { - save_path, - tags: vec![], - sources: vec![torrent_source], - category: None, - }; - - downloader.add_downloads(torrent_creation).await?; - - let get_torrent = async || -> Result { - let torrent_infos = downloader - .query_downloads(QBittorrentSelector::Hash(QBittorrentHashSelector::from_id( - torrent_hash.clone(), - ))) - .await?; - - let result = torrent_infos - .into_iter() - .find(|t| t.hash_info() == torrent_hash) - .whatever_context::<_, DownloaderError>("no bittorrent")?; - - Ok(result) - }; - - let target_torrent = get_torrent().await?; - - let files = target_torrent.contents; - - assert!(!files.is_empty()); - - let first_file = files.first().expect("should have first file"); - assert!( - &first_file.name.ends_with(r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"#) - ); - - let test_tag = "test_tag".to_string(); - - downloader - .add_torrent_tags(vec![torrent_hash.clone()], vec![test_tag.clone()]) - .await?; - - let target_torrent = get_torrent().await?; - - assert!(target_torrent.tags().any(|s| s == test_tag)); - - let test_category = format!("test_category_{}", Utc::now().timestamp()); - - downloader - .set_torrents_category(vec![torrent_hash.clone()], &test_category) - .await?; - - let target_torrent = get_torrent().await?; - - assert_eq!( - Some(test_category.as_str()), - target_torrent.category().as_deref() - ); - - let moved_torrent_path = base_save_path.join(format!("moved_{}", Utc::now().timestamp())); - - downloader - .move_torrents(vec![torrent_hash.clone()], moved_torrent_path.as_str()) - .await?; - - let target_torrent = get_torrent().await?; - - let actual_content_path = &target_torrent - .torrent - .save_path - .expect("failed to get actual save path"); - - assert!( - path_equals_as_file_url(actual_content_path, moved_torrent_path) - .whatever_context::<_, RError>( - "failed to compare actual torrent path and found expected torrent path" - )? - ); - - downloader - .remove_torrents(vec![torrent_hash.clone()].into()) - .await?; - - let torrent_infos1 = downloader - .query_downloads(QBittorrentSelector::Complex(QBittorrentComplexSelector { - query: GetTorrentListArg::builder() - .filter(QbitTorrentFilter::All) - .build(), - })) - .await?; - - assert!(torrent_infos1.is_empty()); - - tracing::info!("test finished"); - - Ok(()) - } -} +pub use downloader::{QBittorrentDownloader, QBittorrentDownloaderCreation, QBittorrentSyncData}; +pub use task::{ + QBittorrentComplexSelector, QBittorrentCreation, QBittorrentHash, QBittorrentHashSelector, + QBittorrentSelector, QBittorrentState, QBittorrentTask, +}; diff --git a/apps/recorder/src/downloader/qbit/task.rs b/apps/recorder/src/downloader/qbit/task.rs new file mode 100644 index 0000000..24e8b23 --- /dev/null +++ b/apps/recorder/src/downloader/qbit/task.rs @@ -0,0 +1,221 @@ +use std::{borrow::Cow, time::Duration}; + +use itertools::Itertools; +use qbit_rs::model::{ + GetTorrentListArg, State, Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, +}; +use quirks_path::{Path, PathBuf}; + +use crate::downloader::{ + DownloaderError, + bittorrent::{ + source::HashTorrentSource, + task::{TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, TorrentTaskTrait}, + }, + core::{ + DownloadCreationTrait, DownloadIdSelector, DownloadIdTrait, DownloadSelectorTrait, + DownloadStateTrait, DownloadTaskTrait, + }, +}; + +pub type QBittorrentHash = String; + +impl DownloadIdTrait for QBittorrentHash {} + +impl TorrentHashTrait for QBittorrentHash {} + +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct QBittorrentState(Option); + +impl DownloadStateTrait for QBittorrentState {} + +impl TorrentStateTrait for QBittorrentState {} + +impl From> for QBittorrentState { + fn from(value: Option) -> Self { + Self(value) + } +} + +#[derive(Debug)] +pub struct QBittorrentTask { + pub hash_info: QBittorrentHash, + pub torrent: QbitTorrent, + pub contents: Vec, + pub state: QBittorrentState, +} + +impl QBittorrentTask { + pub fn from_query( + torrent: QbitTorrent, + contents: Vec, + ) -> Result { + let hash = torrent + .hash + .clone() + .ok_or_else(|| DownloaderError::TorrentMetaError { + message: "missing hash".to_string(), + source: None.into(), + })?; + let state = QBittorrentState(torrent.state.clone()); + Ok(Self { + hash_info: hash, + contents, + state, + torrent, + }) + } +} + +impl DownloadTaskTrait for QBittorrentTask { + type State = QBittorrentState; + type Id = QBittorrentHash; + + fn id(&self) -> &Self::Id { + &self.hash_info + } + + fn into_id(self) -> Self::Id { + self.hash_info + } + + fn name(&self) -> Cow<'_, str> { + self.torrent + .name + .as_deref() + .map(Cow::Borrowed) + .unwrap_or_else(|| DownloadTaskTrait::name(self)) + } + + fn speed(&self) -> Option { + self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok()) + } + + fn state(&self) -> &Self::State { + &self.state + } + + fn dl_bytes(&self) -> Option { + self.torrent.downloaded.and_then(|v| u64::try_from(v).ok()) + } + + fn total_bytes(&self) -> Option { + self.torrent.size.and_then(|v| u64::try_from(v).ok()) + } + + fn left_bytes(&self) -> Option { + self.torrent.amount_left.and_then(|v| u64::try_from(v).ok()) + } + + fn et(&self) -> Option { + self.torrent + .time_active + .and_then(|v| u64::try_from(v).ok()) + .map(Duration::from_secs) + } + + fn eta(&self) -> Option { + self.torrent + .eta + .and_then(|v| u64::try_from(v).ok()) + .map(Duration::from_secs) + } + + fn progress(&self) -> Option { + self.torrent.progress.as_ref().map(|s| *s as f32) + } +} + +impl TorrentTaskTrait for QBittorrentTask { + fn hash_info(&self) -> &str { + &self.hash_info + } + + fn tags(&self) -> impl Iterator> { + self.torrent + .tags + .as_deref() + .unwrap_or("") + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(Cow::Borrowed) + } + + fn category(&self) -> Option> { + self.torrent.category.as_deref().map(Cow::Borrowed) + } +} + +#[derive(Debug, Clone, Default)] +pub struct QBittorrentCreation { + pub save_path: PathBuf, + pub tags: Vec, + pub category: Option, + pub sources: Vec, +} + +impl DownloadCreationTrait for QBittorrentCreation { + type Task = QBittorrentTask; +} + +impl TorrentCreationTrait for QBittorrentCreation { + fn save_path(&self) -> &Path { + self.save_path.as_ref() + } + + fn save_path_mut(&mut self) -> &mut PathBuf { + &mut self.save_path + } + + fn sources_mut(&mut self) -> &mut Vec { + &mut self.sources + } +} + +pub type QBittorrentHashSelector = DownloadIdSelector; + +pub struct QBittorrentComplexSelector { + pub query: GetTorrentListArg, +} + +impl From for QBittorrentComplexSelector { + fn from(value: QBittorrentHashSelector) -> Self { + Self { + query: GetTorrentListArg { + hashes: Some(value.ids.join("|")), + ..Default::default() + }, + } + } +} + +impl DownloadSelectorTrait for QBittorrentComplexSelector { + type Id = QBittorrentHash; + type Task = QBittorrentTask; +} + +pub enum QBittorrentSelector { + Hash(QBittorrentHashSelector), + Complex(QBittorrentComplexSelector), +} + +impl DownloadSelectorTrait for QBittorrentSelector { + type Id = QBittorrentHash; + type Task = QBittorrentTask; + + fn try_into_ids_only(self) -> Result, Self> { + match self { + QBittorrentSelector::Complex(c) => { + c.try_into_ids_only().map_err(QBittorrentSelector::Complex) + } + QBittorrentSelector::Hash(h) => { + let result = h + .try_into_ids_only() + .unwrap_or_else(|_| unreachable!("hash selector must contains hash")) + .into_iter(); + Ok(result.collect_vec()) + } + } + } +} diff --git a/apps/recorder/src/downloader/qbit/test.rs b/apps/recorder/src/downloader/qbit/test.rs new file mode 100644 index 0000000..3ab0df6 --- /dev/null +++ b/apps/recorder/src/downloader/qbit/test.rs @@ -0,0 +1,280 @@ +use std::time::Duration; + +use chrono::Utc; +use qbit_rs::model::{GetTorrentListArg, TorrentFilter as QbitTorrentFilter}; +use quirks_path::Path; +use snafu::{OptionExt, ResultExt}; + +use crate::{ + downloader::{ + DownloaderError, + bittorrent::{ + downloader::TorrentDownloaderTrait, source::HashTorrentSource, task::TorrentTaskTrait, + }, + core::{DownloadIdSelectorTrait, DownloaderTrait}, + qbit::{ + QBittorrentDownloader, QBittorrentDownloaderCreation, + task::{ + QBittorrentComplexSelector, QBittorrentCreation, QBittorrentHashSelector, + QBittorrentSelector, QBittorrentTask, + }, + }, + utils::path_equals_as_file_url, + }, + errors::{RError, RResult}, + test_utils::fetch::build_testing_http_client, +}; + +fn get_tmp_qbit_test_folder() -> &'static str { + if cfg!(all(windows, not(feature = "testcontainers"))) { + "C:\\Windows\\Temp\\konobangu\\qbit" + } else { + "/tmp/konobangu/qbit" + } +} + +#[cfg(feature = "testcontainers")] +pub async fn create_qbit_testcontainers() +-> RResult> { + use testcontainers::{ + GenericImage, + core::{ + ContainerPort, + // ReuseDirective, + WaitFor, + }, + }; + use testcontainers_ext::{ImageDefaultLogConsumerExt, ImagePruneExistedLabelExt}; + use testcontainers_modules::testcontainers::ImageExt; + + let container = GenericImage::new("linuxserver/qbittorrent", "latest") + .with_wait_for(WaitFor::message_on_stderr("Connection to localhost")) + .with_env_var("WEBUI_PORT", "8080") + .with_env_var("TZ", "Asia/Singapore") + .with_env_var("TORRENTING_PORT", "6881") + .with_mapped_port(6881, ContainerPort::Tcp(6881)) + .with_mapped_port(8080, ContainerPort::Tcp(8080)) + // .with_reuse(ReuseDirective::Always) + .with_default_log_consumer() + .with_prune_existed_label(env!("CARGO_PKG_NAME"), "qbit-downloader", true, true) + .await?; + + Ok(container) +} + +#[cfg(not(feature = "testcontainers"))] +#[tokio::test] +async fn test_qbittorrent_downloader() { + let hash = "47ee2d69e7f19af783ad896541a07b012676f858".to_string(); + let torrent_url = "https://mikanani.me/Download/20240301/{}.torrent"; + let _ = test_qbittorrent_downloader_impl(torrent_url, hash, None, None).await; +} + +#[cfg(feature = "testcontainers")] +#[tokio::test(flavor = "multi_thread")] +async fn test_qbittorrent_downloader() -> RResult<()> { + use testcontainers::runners::AsyncRunner; + use testing_torrents::{TestTorrentRequest, TestTorrentResponse, TestingTorrentFileItem}; + use tokio::io::AsyncReadExt; + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_test_writer() + .init(); + + let torrents_image = testing_torrents::create_testcontainers().await?; + let _torrents_container = torrents_image.start().await?; + + let torrents_req = TestTorrentRequest { + id: "f10ebdda-dd2e-43f8-b80c-bf0884d071c4".into(), + file_list: vec![TestingTorrentFileItem { + path: "[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p \ + HEVC-10bit AAC ASSx2].mkv" + .into(), + size: 1024, + }], + }; + + let torrent_res: TestTorrentResponse = reqwest::Client::new() + .post("http://127.0.0.1:6080/api/torrents/mock") + .json(&torrents_req) + .send() + .await? + .json() + .await?; + + let qbit_image = create_qbit_testcontainers().await?; + let qbit_container = qbit_image.start().await?; + + let mut logs = String::new(); + + qbit_container + .stdout(false) + .read_to_string(&mut logs) + .await?; + + let username = logs + .lines() + .find_map(|line| { + if line.contains("The WebUI administrator username is") { + line.split_whitespace().last() + } else { + None + } + }) + .expect("should have username") + .trim(); + + let password = logs + .lines() + .find_map(|line| { + if line.contains("A temporary password is provided for") { + line.split_whitespace().last() + } else { + None + } + }) + .expect("should have password") + .trim(); + + tracing::info!(username, password); + + test_qbittorrent_downloader_impl( + torrent_res.torrent_url, + torrent_res.hash, + Some(username), + Some(password), + ) + .await?; + + Ok(()) +} + +async fn test_qbittorrent_downloader_impl( + torrent_url: String, + torrent_hash: String, + username: Option<&str>, + password: Option<&str>, +) -> RResult<()> { + let http_client = build_testing_http_client()?; + let base_save_path = Path::new(get_tmp_qbit_test_folder()); + + let downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation { + endpoint: "http://127.0.0.1:8080".to_string(), + password: password.unwrap_or_default().to_string(), + username: username.unwrap_or_default().to_string(), + subscriber_id: 0, + save_path: base_save_path.to_string(), + downloader_id: 0, + wait_sync_timeout: Some(Duration::from_secs(3)), + }) + .await?; + + downloader.check_connection().await?; + + downloader + .remove_torrents(vec![torrent_hash.clone()].into()) + .await?; + + let torrent_source = + HashTorrentSource::from_url_and_http_client(&http_client, torrent_url).await?; + + let folder_name = format!("torrent_test_{}", Utc::now().timestamp()); + let save_path = base_save_path.join(&folder_name); + + let torrent_creation = QBittorrentCreation { + save_path, + tags: vec![], + sources: vec![torrent_source], + category: None, + }; + + downloader.add_downloads(torrent_creation).await?; + + let get_torrent = async || -> Result { + let torrent_infos = downloader + .query_downloads(QBittorrentSelector::Hash(QBittorrentHashSelector::from_id( + torrent_hash.clone(), + ))) + .await?; + + let result = torrent_infos + .into_iter() + .find(|t| t.hash_info() == torrent_hash) + .whatever_context::<_, DownloaderError>("no bittorrent")?; + + Ok(result) + }; + + let target_torrent = get_torrent().await?; + + let files = target_torrent.contents; + + assert!(!files.is_empty()); + + let first_file = files.first().expect("should have first file"); + assert!( + &first_file.name.ends_with(r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"#) + ); + + let test_tag = "test_tag".to_string(); + + downloader + .add_torrent_tags(vec![torrent_hash.clone()], vec![test_tag.clone()]) + .await?; + + let target_torrent = get_torrent().await?; + + assert!(target_torrent.tags().any(|s| s == test_tag)); + + let test_category = format!("test_category_{}", Utc::now().timestamp()); + + downloader + .set_torrents_category(vec![torrent_hash.clone()], &test_category) + .await?; + + let target_torrent = get_torrent().await?; + + assert_eq!( + Some(test_category.as_str()), + target_torrent.category().as_deref() + ); + + let moved_torrent_path = base_save_path.join(format!("moved_{}", Utc::now().timestamp())); + + downloader + .move_torrents(vec![torrent_hash.clone()], moved_torrent_path.as_str()) + .await?; + + let target_torrent = get_torrent().await?; + + let actual_content_path = &target_torrent + .torrent + .save_path + .expect("failed to get actual save path"); + + assert!( + path_equals_as_file_url(actual_content_path, moved_torrent_path) + .whatever_context::<_, RError>( + "failed to compare actual torrent path and found expected torrent path" + )? + ); + + downloader + .remove_torrents(vec![torrent_hash.clone()].into()) + .await?; + + let torrent_infos1 = downloader + .query_downloads(QBittorrentSelector::Complex(QBittorrentComplexSelector { + query: GetTorrentListArg::builder() + .filter(QbitTorrentFilter::All) + .build(), + })) + .await?; + + assert!(torrent_infos1.is_empty()); + + tracing::info!("test finished"); + + Ok(()) +} diff --git a/apps/recorder/src/test_utils/mod.rs b/apps/recorder/src/test_utils/mod.rs index 940f548..1b76b86 100644 --- a/apps/recorder/src/test_utils/mod.rs +++ b/apps/recorder/src/test_utils/mod.rs @@ -1,6 +1,4 @@ pub mod app; pub mod fetch; pub mod mikan; -#[cfg(feature = "testcontainers")] -pub mod testcontainers; pub mod tracing; diff --git a/apps/recorder/src/test_utils/testcontainers.rs b/apps/recorder/src/test_utils/testcontainers.rs deleted file mode 100644 index b081ff5..0000000 --- a/apps/recorder/src/test_utils/testcontainers.rs +++ /dev/null @@ -1,117 +0,0 @@ -use async_trait::async_trait; -use bollard::container::ListContainersOptions; -use itertools::Itertools; -use testcontainers::{ - ContainerRequest, Image, ImageExt, TestcontainersError, - core::logs::consumer::logging_consumer::LoggingConsumer, -}; - -pub const TESTCONTAINERS_PROJECT_KEY: &str = "tech.enfw.testcontainers.project"; -pub const TESTCONTAINERS_CONTAINER_KEY: &str = "tech.enfw.testcontainers.container"; -pub const TESTCONTAINERS_PRUNE_KEY: &str = "tech.enfw.testcontainers.prune"; - -#[async_trait] -pub trait ContainerRequestEnhancedExt: Sized + ImageExt -where - I: Image, -{ - async fn with_prune_existed_label( - self, - container_label: &str, - prune: bool, - force: bool, - ) -> Result; - - fn with_default_log_consumer(self) -> Self; -} - -#[async_trait] -impl ContainerRequestEnhancedExt for ContainerRequest -where - I: Image, -{ - async fn with_prune_existed_label( - self, - container_label: &str, - prune: bool, - force: bool, - ) -> Result { - use std::collections::HashMap; - - use bollard::container::PruneContainersOptions; - use testcontainers::core::client::docker_client_instance; - - if prune { - let client = docker_client_instance().await?; - - let mut filters = HashMap::>::new(); - - filters.insert( - String::from("label"), - vec![ - format!("{TESTCONTAINERS_PRUNE_KEY}=true"), - format!("{}={}", TESTCONTAINERS_PROJECT_KEY, "konobangu"), - format!("{}={}", TESTCONTAINERS_CONTAINER_KEY, container_label), - ], - ); - - if force { - let result = client - .list_containers(Some(ListContainersOptions { - all: false, - filters: filters.clone(), - ..Default::default() - })) - .await - .map_err(|err| TestcontainersError::Other(Box::new(err)))?; - - let remove_containers = result - .iter() - .filter(|c| matches!(c.state.as_deref(), Some("running"))) - .flat_map(|c| c.id.as_deref()) - .collect_vec(); - - futures::future::try_join_all( - remove_containers - .iter() - .map(|c| client.stop_container(c, None)), - ) - .await - .map_err(|error| TestcontainersError::Other(Box::new(error)))?; - - if !remove_containers.is_empty() { - tracing::warn!(name = "stop running containers", result = ?remove_containers); - } - } - - let result = client - .prune_containers(Some(PruneContainersOptions { filters })) - .await - .map_err(|err| TestcontainersError::Other(Box::new(err)))?; - - if result - .containers_deleted - .as_ref() - .is_some_and(|c| !c.is_empty()) - { - tracing::warn!(name = "prune existed containers", result = ?result); - } - } - - let result = self.with_labels([ - (TESTCONTAINERS_PRUNE_KEY, "true"), - (TESTCONTAINERS_PROJECT_KEY, "konobangu"), - (TESTCONTAINERS_CONTAINER_KEY, container_label), - ]); - - Ok(result) - } - - fn with_default_log_consumer(self) -> Self { - self.with_log_consumer( - LoggingConsumer::new() - .with_stdout_level(log::Level::Info) - .with_stderr_level(log::Level::Error), - ) - } -} diff --git a/packages/testing-torrents/Cargo.toml b/packages/testing-torrents/Cargo.toml new file mode 100644 index 0000000..a8f26d3 --- /dev/null +++ b/packages/testing-torrents/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "testing-torrents" +version = "0.1.0" +edition = "2024" + +[dependencies] +serde = { version = "1", features = ["derive"] } +testcontainers = { version = "0.23.3" } +testcontainers-modules = { version = "0.11.4" } +testcontainers-ext = { version = "0.1.0", features = ["tracing"] } diff --git a/packages/testing-torrents/src/lib.rs b/packages/testing-torrents/src/lib.rs new file mode 100644 index 0000000..1fd3851 --- /dev/null +++ b/packages/testing-torrents/src/lib.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; +use testcontainers::{ + GenericImage, + core::{ContainerPort, WaitFor}, +}; +use testcontainers_ext::{ImageDefaultLogConsumerExt, ImagePruneExistedLabelExt}; +use testcontainers_modules::testcontainers::ImageExt; + +#[derive(Serialize)] +pub struct TestingTorrentFileItem { + pub path: String, + pub size: u64, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TestTorrentRequest { + pub id: String, + pub file_list: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TestTorrentResponse { + pub torrent_url: String, + pub magnet_url: String, + pub hash: String, +} + +pub async fn create_testcontainers() -> Result< + testcontainers::ContainerRequest, + testcontainers::TestcontainersError, +> { + let container = GenericImage::new("ghcr.io/dumtruck/konobangu-testing-torrents", "latest") + .with_wait_for(WaitFor::message_on_stdout("Listening on")) + .with_mapped_port(6080, ContainerPort::Tcp(6080)) + .with_mapped_port(6081, ContainerPort::Tcp(6081)) + .with_mapped_port(6082, ContainerPort::Tcp(6082)) + .with_default_log_consumer() + .with_prune_existed_label("konobangu", "testing-torrents", true, true) + .await?; + + Ok(container) +}