refactor: switch error handle to snafu

This commit is contained in:
2025-04-02 00:22:52 +08:00
parent 011f62829a
commit 234441e6a3
32 changed files with 549 additions and 436 deletions

View File

@@ -0,0 +1,297 @@
use std::fmt::Debug;
use async_trait::async_trait;
use itertools::Itertools;
use lazy_static::lazy_static;
use librqbit_core::{
magnet::Magnet,
torrent_metainfo::{TorrentMetaV1Owned, torrent_from_bytes},
};
use quirks_path::{Path, PathBuf};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use url::Url;
use super::{DownloaderError, QbitTorrent, QbitTorrentContent, errors::DownloadFetchSnafu};
use crate::fetch::{HttpClientTrait, fetch_bytes};
pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent";
pub const MAGNET_SCHEMA: &str = "magnet";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TorrentFilter {
All,
Downloading,
Completed,
Paused,
Active,
Inactive,
Resumed,
Stalled,
StalledUploading,
StalledDownloading,
Errored,
}
lazy_static! {
static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap();
static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap();
}
#[derive(Clone, PartialEq, Eq)]
pub enum TorrentSource {
MagnetUrl {
url: Url,
hash: String,
},
TorrentUrl {
url: Url,
hash: String,
},
TorrentFile {
torrent: Vec<u8>,
hash: String,
name: Option<String>,
},
}
impl TorrentSource {
pub async fn parse<H: HttpClientTrait>(client: &H, url: &str) -> Result<Self, DownloaderError> {
let url = Url::parse(url)?;
let source = if url.scheme() == MAGNET_SCHEMA {
TorrentSource::from_magnet_url(url)?
} else if let Some(basename) = url
.clone()
.path_segments()
.and_then(|mut segments| segments.next_back())
{
if let (Some(match_hash), true) = (
TORRENT_HASH_RE.find(basename),
TORRENT_EXT_RE.is_match(basename),
) {
TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())?
} else {
let contents = fetch_bytes(client, url)
.await
.boxed()
.context(DownloadFetchSnafu)?;
TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))?
}
} else {
let contents = fetch_bytes(client, url)
.await
.boxed()
.context(DownloadFetchSnafu)?;
TorrentSource::from_torrent_file(contents.to_vec(), None)?
};
Ok(source)
}
pub fn from_torrent_file(file: Vec<u8>, name: Option<String>) -> Result<Self, DownloaderError> {
let torrent: TorrentMetaV1Owned =
torrent_from_bytes(&file).map_err(|_| DownloaderError::TorrentFileFormatError)?;
let hash = torrent.info_hash.as_string();
Ok(TorrentSource::TorrentFile {
torrent: file,
hash,
name,
})
}
pub fn from_magnet_url(url: Url) -> Result<Self, DownloaderError> {
if url.scheme() != MAGNET_SCHEMA {
Err(DownloaderError::DownloadSchemaError {
found: url.scheme().to_string(),
expected: MAGNET_SCHEMA.to_string(),
})
} else {
let magnet =
Magnet::parse(url.as_str()).map_err(|_| DownloaderError::MagnetFormatError {
url: url.as_str().to_string(),
})?;
let hash = magnet
.as_id20()
.ok_or_else(|| DownloaderError::MagnetFormatError {
url: url.as_str().to_string(),
})?
.as_string();
Ok(TorrentSource::MagnetUrl { url, hash })
}
}
pub fn from_torrent_url(url: Url, hash: String) -> Result<Self, DownloaderError> {
Ok(TorrentSource::TorrentUrl { url, hash })
}
pub fn hash(&self) -> &str {
match self {
TorrentSource::MagnetUrl { hash, .. } => hash,
TorrentSource::TorrentUrl { hash, .. } => hash,
TorrentSource::TorrentFile { hash, .. } => hash,
}
}
}
impl Debug for TorrentSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TorrentSource::MagnetUrl { url, .. } => {
write!(f, "MagnetUrl {{ url: {} }}", url.as_str())
}
TorrentSource::TorrentUrl { url, .. } => {
write!(f, "TorrentUrl {{ url: {} }}", url.as_str())
}
TorrentSource::TorrentFile { name, hash, .. } => write!(
f,
"TorrentFile {{ name: \"{}\", hash: \"{hash}\" }}",
name.as_deref().unwrap_or_default()
),
}
}
}
pub trait TorrentContent {
fn get_name(&self) -> &str;
fn get_all_size(&self) -> u64;
fn get_progress(&self) -> f64;
fn get_curr_size(&self) -> u64;
}
impl TorrentContent for QbitTorrentContent {
fn get_name(&self) -> &str {
self.name.as_str()
}
fn get_all_size(&self) -> u64 {
self.size
}
fn get_progress(&self) -> f64 {
self.progress
}
fn get_curr_size(&self) -> u64 {
u64::clamp(
f64::round(self.get_all_size() as f64 * self.get_progress()) as u64,
0,
self.get_all_size(),
)
}
}
#[derive(Debug, Clone)]
pub enum Torrent {
Qbit {
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
},
}
impl Torrent {
pub fn iter_files(&self) -> impl Iterator<Item = &dyn TorrentContent> {
match self {
Torrent::Qbit { contents, .. } => {
contents.iter().map(|item| item as &dyn TorrentContent)
}
}
}
pub fn get_name(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.name.as_deref(),
}
}
pub fn get_hash(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.hash.as_deref(),
}
}
pub fn get_save_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.save_path.as_deref(),
}
}
pub fn get_content_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.content_path.as_deref(),
}
}
pub fn get_tags(&self) -> Vec<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.tags.as_deref().map_or_else(Vec::new, |s| {
s.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect_vec()
}),
}
}
pub fn get_category(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.category.as_deref(),
}
}
}
#[async_trait]
pub trait TorrentDownloader {
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> Result<Vec<Torrent>, DownloaderError>;
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> Result<(), DownloaderError>;
async fn delete_torrents(&self, hashes: Vec<String>) -> Result<(), DownloaderError>;
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> Result<(), DownloaderError>;
async fn move_torrents(
&self,
hashes: Vec<String>,
new_path: &str,
) -> Result<(), DownloaderError>;
async fn get_torrent_path(&self, hashes: String) -> Result<Option<String>, DownloaderError>;
async fn check_connection(&self) -> Result<(), DownloaderError>;
async fn set_torrents_category(
&self,
hashes: Vec<String>,
category: &str,
) -> Result<(), DownloaderError>;
async fn add_torrent_tags(
&self,
hashes: Vec<String>,
tags: Vec<String>,
) -> Result<(), DownloaderError>;
async fn add_category(&self, category: &str) -> Result<(), DownloaderError>;
fn get_save_path(&self, sub_path: &Path) -> PathBuf;
}

View File

@@ -0,0 +1,58 @@
use std::{borrow::Cow, time::Duration};
use snafu::prelude::*;
use crate::errors::OptionWhateverAsync;
#[derive(Snafu, Debug)]
#[snafu(visibility(pub(crate)))]
pub enum DownloaderError {
#[snafu(display("Invalid mime (expected {expected:?}, got {found:?})"))]
DownloadMimeError { expected: String, found: String },
#[snafu(display("Invalid url schema (expected {expected:?}, got {found:?})"))]
DownloadSchemaError { expected: String, found: String },
#[snafu(transparent)]
DownloadUrlParseError { source: url::ParseError },
#[snafu(display("Invalid url format: {reason}"))]
DownloadUrlFormatError { reason: Cow<'static, str> },
#[snafu(transparent)]
QBitAPIError { source: qbit_rs::Error },
#[snafu(display("Timeout error (action = {action}, timeout = {timeout:?})"))]
DownloadTimeoutError {
action: Cow<'static, str>,
timeout: Duration,
},
#[snafu(display("Invalid torrent file format"))]
TorrentFileFormatError,
#[snafu(display("Invalid magnet format (url = {url})"))]
MagnetFormatError { url: String },
#[snafu(display("Failed to fetch: {source}"))]
DownloadFetchError {
#[snafu(source)]
source: Box<dyn snafu::Error + Send + Sync>,
},
#[snafu(display("{message}"))]
Whatever {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptionWhateverAsync::some)))]
source: OptionWhateverAsync,
},
}
impl snafu::FromString for DownloaderError {
type Source = Box<dyn std::error::Error + Send + Sync>;
fn without_source(message: String) -> Self {
Self::Whatever {
message,
source: OptionWhateverAsync::none(),
}
}
fn with_source(source: Self::Source, message: String) -> Self {
Self::Whatever {
message,
source: OptionWhateverAsync::some(source),
}
}
}

View File

@@ -0,0 +1,16 @@
pub mod core;
pub mod errors;
pub mod qbit;
pub mod rqbit;
pub mod utils;
pub use core::{
Torrent, TorrentContent, TorrentDownloader, TorrentFilter, TorrentSource, BITTORRENT_MIME_TYPE,
MAGNET_SCHEMA,
};
pub use errors::DownloaderError;
pub use qbit::{
QBittorrentDownloader, QBittorrentDownloaderCreation, QbitTorrent, QbitTorrentContent,
QbitTorrentFile, QbitTorrentFilter, QbitTorrentSource,
};

View File

@@ -0,0 +1,719 @@
use std::{
borrow::Cow, collections::HashSet, fmt::Debug, future::Future, sync::Arc, time::Duration,
};
use async_trait::async_trait;
use futures::future::try_join_all;
pub use qbit_rs::model::{
Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFile as QbitTorrentFile,
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
};
use qbit_rs::{
Qbit,
model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData},
};
use quirks_path::{Path, PathBuf};
use snafu::prelude::*;
use tokio::time::sleep;
use tracing::instrument;
use url::Url;
use super::{
DownloaderError, Torrent, TorrentDownloader, TorrentFilter, TorrentSource,
utils::path_equals_as_file_url,
};
impl From<TorrentSource> for QbitTorrentSource {
fn from(value: TorrentSource) -> Self {
match value {
TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentFile {
torrent: torrents,
name,
..
} => QbitTorrentSource::TorrentFiles {
torrents: vec![QbitTorrentFile {
filename: name.unwrap_or_default(),
data: torrents,
}],
},
}
}
}
impl From<TorrentFilter> for QbitTorrentFilter {
fn from(val: TorrentFilter) -> Self {
match val {
TorrentFilter::All => QbitTorrentFilter::All,
TorrentFilter::Downloading => QbitTorrentFilter::Downloading,
TorrentFilter::Completed => QbitTorrentFilter::Completed,
TorrentFilter::Paused => QbitTorrentFilter::Paused,
TorrentFilter::Active => QbitTorrentFilter::Active,
TorrentFilter::Inactive => QbitTorrentFilter::Inactive,
TorrentFilter::Resumed => QbitTorrentFilter::Resumed,
TorrentFilter::Stalled => QbitTorrentFilter::Stalled,
TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading,
TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading,
TorrentFilter::Errored => QbitTorrentFilter::Errored,
}
}
}
pub struct QBittorrentDownloaderCreation {
pub endpoint: String,
pub username: String,
pub password: String,
pub save_path: String,
pub subscriber_id: i32,
}
pub struct QBittorrentDownloader {
pub subscriber_id: i32,
pub endpoint_url: Url,
pub client: Arc<Qbit>,
pub save_path: PathBuf,
pub wait_sync_timeout: Duration,
}
impl QBittorrentDownloader {
pub async fn from_creation(
creation: QBittorrentDownloaderCreation,
) -> Result<Self, DownloaderError> {
let endpoint_url = Url::parse(&creation.endpoint)?;
let credential = Credential::new(creation.username, creation.password);
let client = Qbit::new(endpoint_url.clone(), credential);
client.login(false).await?;
client.sync(None).await?;
Ok(Self {
client: Arc::new(client),
endpoint_url,
subscriber_id: creation.subscriber_id,
save_path: creation.save_path.into(),
wait_sync_timeout: Duration::from_millis(10000),
})
}
#[instrument(level = "debug")]
pub async fn api_version(&self) -> Result<String, DownloaderError> {
let result = self.client.get_webapi_version().await?;
Ok(result)
}
pub async fn wait_until<G, Fut, F, D, H, E>(
&self,
capture_fn: H,
fetch_data_fn: G,
mut stop_wait_fn: F,
timeout: Option<Duration>,
) -> Result<(), DownloaderError>
where
H: FnOnce() -> E,
G: Fn(Arc<Qbit>, E) -> Fut,
Fut: Future<Output = Result<D, DownloaderError>>,
F: FnMut(&D) -> bool,
E: Clone,
D: Debug + serde::Serialize,
{
let mut next_wait_ms = 32u64;
let mut all_wait_ms = 0u64;
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
let env = capture_fn();
loop {
sleep(Duration::from_millis(next_wait_ms)).await;
all_wait_ms += next_wait_ms;
if all_wait_ms >= timeout.as_millis() as u64 {
// full update
let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?;
if stop_wait_fn(&sync_data) {
break;
} else {
tracing::warn!(name = "wait_until timeout", sync_data = serde_json::to_string(&sync_data).unwrap(), timeout = ?timeout);
return Err(DownloaderError::DownloadTimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
});
}
}
let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?;
if stop_wait_fn(&sync_data) {
break;
}
next_wait_ms *= 2;
}
Ok(())
}
#[instrument(level = "trace", skip(self, stop_wait_fn))]
pub async fn wait_torrents_until<F>(
&self,
arg: GetTorrentListArg,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> Result<(), DownloaderError>
where
F: FnMut(&Vec<QbitTorrent>) -> bool,
{
self.wait_until(
|| arg,
async move |client: Arc<Qbit>,
arg: GetTorrentListArg|
-> Result<Vec<QbitTorrent>, DownloaderError> {
let data = client.get_torrent_list(arg).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
#[instrument(level = "debug", skip(self, stop_wait_fn))]
pub async fn wait_sync_until<F: FnMut(&SyncData) -> bool>(
&self,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> Result<(), DownloaderError> {
self.wait_until(
|| (),
async move |client: Arc<Qbit>, _| -> Result<SyncData, DownloaderError> {
let data = client.sync(None).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
#[instrument(level = "debug", skip(self, stop_wait_fn))]
async fn wait_torrent_contents_until<F: FnMut(&Vec<QbitTorrentContent>) -> bool>(
&self,
hash: &str,
stop_wait_fn: F,
timeout: Option<Duration>,
) -> Result<(), DownloaderError> {
self.wait_until(
|| Arc::new(hash.to_string()),
async move |client: Arc<Qbit>,
hash_arc: Arc<String>|
-> Result<Vec<QbitTorrentContent>, DownloaderError> {
let data = client.get_torrent_contents(hash_arc.as_str(), None).await?;
Ok(data)
},
stop_wait_fn,
timeout,
)
.await
}
}
#[async_trait]
impl TorrentDownloader for QBittorrentDownloader {
#[instrument(level = "debug", skip(self))]
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> Result<Vec<Torrent>, DownloaderError> {
let arg = GetTorrentListArg {
filter: Some(status_filter.into()),
category,
tag,
..Default::default()
};
let torrent_list = self.client.get_torrent_list(arg).await?;
let torrent_contents = try_join_all(torrent_list.iter().map(|s| async {
if let Some(hash) = &s.hash {
self.client.get_torrent_contents(hash as &str, None).await
} else {
Ok(vec![])
}
}))
.await?;
Ok(torrent_list
.into_iter()
.zip(torrent_contents)
.map(|(torrent, contents)| Torrent::Qbit { torrent, contents })
.collect::<Vec<_>>())
}
#[instrument(level = "debug", skip(self))]
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> Result<(), DownloaderError> {
let arg = AddTorrentArg {
source: source.clone().into(),
savepath: Some(save_path),
category: category.map(String::from),
auto_torrent_management: Some(false),
..Default::default()
};
let add_result = self.client.add_torrent(arg.clone()).await;
if let (
Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)),
Some(category),
) = (&add_result, category)
{
self.add_category(category).await?;
self.client.add_torrent(arg).await?;
} else {
add_result?;
}
let source_hash = source.hash();
self.wait_sync_until(
|sync_data| {
sync_data
.torrents
.as_ref()
.is_some_and(|t| t.contains_key(source_hash))
},
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn delete_torrents(&self, hashes: Vec<String>) -> Result<(), DownloaderError> {
self.client
.delete_torrents(hashes.clone(), Some(true))
.await?;
self.wait_torrents_until(
GetTorrentListArg::builder()
.hashes(hashes.join("|"))
.build(),
|torrents| -> bool { torrents.is_empty() },
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> Result<(), DownloaderError> {
self.client.rename_file(hash, old_path, new_path).await?;
let new_path = self.save_path.join(new_path);
let save_path = self.save_path.as_path();
self.wait_torrent_contents_until(
hash,
|contents| -> bool {
contents.iter().any(|c| {
path_equals_as_file_url(save_path.join(&c.name), &new_path)
.inspect_err(|error| {
tracing::warn!(name = "path_equals_as_file_url", error = ?error);
})
.unwrap_or(false)
})
},
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn move_torrents(
&self,
hashes: Vec<String>,
new_path: &str,
) -> Result<(), DownloaderError> {
self.client
.set_torrent_location(hashes.clone(), new_path)
.await?;
self.wait_torrents_until(
GetTorrentListArg::builder()
.hashes(hashes.join("|"))
.build(),
|torrents| -> bool {
torrents.iter().flat_map(|t| t.save_path.as_ref()).any(|p| {
path_equals_as_file_url(p, new_path)
.inspect_err(|error| {
tracing::warn!(name = "path_equals_as_file_url", error = ?error);
})
.unwrap_or(false)
})
},
None,
)
.await?;
Ok(())
}
async fn get_torrent_path(&self, hashes: String) -> Result<Option<String>, DownloaderError> {
let mut torrent_list = self
.client
.get_torrent_list(GetTorrentListArg {
hashes: Some(hashes),
..Default::default()
})
.await?;
let torrent = torrent_list
.first_mut()
.whatever_context::<_, DownloaderError>("No torrent found")?;
Ok(torrent.save_path.take())
}
#[instrument(level = "debug", skip(self))]
async fn check_connection(&self) -> Result<(), DownloaderError> {
self.api_version().await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn set_torrents_category(
&self,
hashes: Vec<String>,
category: &str,
) -> Result<(), DownloaderError> {
let result = self
.client
.set_torrent_category(hashes.clone(), category)
.await;
if let Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)) = &result {
self.add_category(category).await?;
self.client
.set_torrent_category(hashes.clone(), category)
.await?;
} else {
result?;
}
self.wait_torrents_until(
GetTorrentListArg::builder()
.hashes(hashes.join("|"))
.build(),
|torrents| {
torrents
.iter()
.all(|t| t.category.as_ref().is_some_and(|c| c == category))
},
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn add_torrent_tags(
&self,
hashes: Vec<String>,
tags: Vec<String>,
) -> Result<(), DownloaderError> {
if tags.is_empty() {
whatever!("add torrent tags can not be empty");
}
self.client
.add_torrent_tags(hashes.clone(), tags.clone())
.await?;
let tag_sets = tags.iter().map(|s| s.as_str()).collect::<HashSet<&str>>();
self.wait_torrents_until(
GetTorrentListArg::builder()
.hashes(hashes.join("|"))
.build(),
|torrents| {
torrents.iter().all(|t| {
t.tags.as_ref().is_some_and(|t| {
t.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<HashSet<&str>>()
.is_superset(&tag_sets)
})
})
},
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
async fn add_category(&self, category: &str) -> Result<(), DownloaderError> {
self.client
.add_category(
NonEmptyStr::new(category)
.whatever_context::<_, DownloaderError>("category can not be empty")?,
self.save_path.as_str(),
)
.await?;
self.wait_sync_until(
|sync_data| {
sync_data
.categories
.as_ref()
.is_some_and(|s| s.contains_key(category))
},
None,
)
.await?;
Ok(())
}
fn get_save_path(&self, sub_path: &Path) -> PathBuf {
self.save_path.join(sub_path)
}
}
impl Debug for QBittorrentDownloader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QBittorrentDownloader")
.field("subscriber_id", &self.subscriber_id)
.field("client", &self.endpoint_url.as_str())
.finish()
}
}
#[cfg(test)]
pub mod tests {
use itertools::Itertools;
use super::*;
use crate::{errors::RResult, test_utils::fetch::build_testing_http_client};
fn get_tmp_qbit_test_folder() -> &'static str {
if cfg!(all(windows, not(feature = "testcontainers"))) {
"C:\\Windows\\Temp\\konobangu\\qbit"
} else {
"/tmp/konobangu/qbit"
}
}
#[cfg(feature = "testcontainers")]
pub async fn create_qbit_testcontainer()
-> RResult<testcontainers::ContainerRequest<testcontainers::GenericImage>> {
use testcontainers::{
GenericImage,
core::{
ContainerPort,
// ReuseDirective,
WaitFor,
},
};
use testcontainers_modules::testcontainers::ImageExt;
use crate::test_utils::testcontainers::ContainerRequestEnhancedExt;
let container = GenericImage::new("linuxserver/qbittorrent", "latest")
.with_wait_for(WaitFor::message_on_stderr("Connection to localhost"))
.with_env_var("WEBUI_PORT", "8080")
.with_env_var("TZ", "Asia/Singapore")
.with_env_var("TORRENTING_PORT", "6881")
.with_mapped_port(6881, ContainerPort::Tcp(6881))
.with_mapped_port(8080, ContainerPort::Tcp(8080))
// .with_reuse(ReuseDirective::Always)
.with_default_log_consumer()
.with_prune_existed_label("qbit-downloader", true, true)
.await?;
Ok(container)
}
#[cfg(not(feature = "testcontainers"))]
#[tokio::test]
async fn test_qbittorrent_downloader() {
test_qbittorrent_downloader_impl(None, None).await;
}
#[cfg(feature = "testcontainers")]
#[tokio::test(flavor = "multi_thread")]
async fn test_qbittorrent_downloader() -> RResult<()> {
use testcontainers::runners::AsyncRunner;
use tokio::io::AsyncReadExt;
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.init();
let image = create_qbit_testcontainer().await?;
let container = image.start().await?;
let mut logs = String::new();
container.stdout(false).read_to_string(&mut logs).await?;
let username = logs
.lines()
.find_map(|line| {
if line.contains("The WebUI administrator username is") {
line.split_whitespace().last()
} else {
None
}
})
.expect("should have username")
.trim();
let password = logs
.lines()
.find_map(|line| {
if line.contains("A temporary password is provided for this session") {
line.split_whitespace().last()
} else {
None
}
})
.expect("should have password")
.trim();
tracing::info!(username, password);
test_qbittorrent_downloader_impl(Some(username), Some(password)).await?;
Ok(())
}
async fn test_qbittorrent_downloader_impl(
username: Option<&str>,
password: Option<&str>,
) -> RResult<()> {
let http_client = build_testing_http_client()?;
let base_save_path = Path::new(get_tmp_qbit_test_folder());
let mut downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation {
endpoint: "http://127.0.0.1:8080".to_string(),
password: password.unwrap_or_default().to_string(),
username: username.unwrap_or_default().to_string(),
subscriber_id: 0,
save_path: base_save_path.to_string(),
})
.await?;
downloader.wait_sync_timeout = Duration::from_secs(3);
downloader.check_connection().await?;
downloader
.delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()])
.await?;
let torrent_source = TorrentSource::parse(
&http_client,
"https://mikanani.me/Download/20240301/47ee2d69e7f19af783ad896541a07b012676f858.torrent"
).await?;
let save_path = base_save_path.join(format!(
"test_add_torrents_{}",
chrono::Utc::now().timestamp()
));
downloader
.add_torrents(torrent_source, save_path.to_string(), Some("bangumi"))
.await?;
let get_torrent = async || -> Result<Torrent, DownloaderError> {
let torrent_infos = downloader
.get_torrents_info(TorrentFilter::All, None, None)
.await?;
let result = torrent_infos
.into_iter()
.find(|t| (t.get_hash() == Some("47ee2d69e7f19af783ad896541a07b012676f858")))
.whatever_context::<_, DownloaderError>("no torrent")?;
Ok(result)
};
let target_torrent = get_torrent().await?;
let files = target_torrent.iter_files().collect_vec();
assert!(!files.is_empty());
let first_file = files[0];
assert_eq!(
first_file.get_name(),
r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"#
);
let test_tag = format!("test_tag_{}", chrono::Utc::now().timestamp());
downloader
.add_torrent_tags(
vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()],
vec![test_tag.clone()],
)
.await?;
let target_torrent = get_torrent().await?;
assert!(target_torrent.get_tags().iter().any(|s| s == &test_tag));
let test_category = format!("test_category_{}", chrono::Utc::now().timestamp());
downloader
.set_torrents_category(
vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()],
&test_category,
)
.await?;
let target_torrent = get_torrent().await?;
assert_eq!(Some(test_category.as_str()), target_torrent.get_category());
let moved_save_path = base_save_path.join(format!(
"moved_test_add_torrents_{}",
chrono::Utc::now().timestamp()
));
downloader
.move_torrents(
vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()],
moved_save_path.as_str(),
)
.await?;
let target_torrent = get_torrent().await?;
let content_path = target_torrent.iter_files().next().unwrap().get_name();
let new_content_path = &format!("new_{}", content_path);
downloader
.rename_torrent_file(
"47ee2d69e7f19af783ad896541a07b012676f858",
content_path,
new_content_path,
)
.await?;
let target_torrent = get_torrent().await?;
let content_path = target_torrent.iter_files().next().unwrap().get_name();
assert_eq!(content_path, new_content_path);
downloader
.delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()])
.await?;
let torrent_infos1 = downloader
.get_torrents_info(TorrentFilter::All, None, None)
.await?;
assert!(torrent_infos1.is_empty());
Ok(())
}
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,11 @@
use quirks_path::{Path, PathToUrlError};
pub fn path_equals_as_file_url<A: AsRef<Path>, B: AsRef<Path>>(
a: A,
b: B,
) -> Result<bool, PathToUrlError> {
let u1 = a.as_ref().to_file_url()?;
let u2 = b.as_ref().to_file_url()?;
Ok(u1.as_str() == u2.as_str())
}