diff --git a/Cargo.lock b/Cargo.lock index 2980d02..ab4933b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4780,6 +4780,7 @@ dependencies = [ name = "recorder" version = "0.1.0" dependencies = [ + "anyhow", "async-graphql", "async-graphql-axum", "async-stream", @@ -4793,6 +4794,7 @@ dependencies = [ "clap", "cookie", "ctor", + "dashmap 6.1.0", "dotenv", "fancy-regex", "fastrand", diff --git a/apps/recorder/.gitignore b/apps/recorder/.gitignore index cacf1e1..b1a4dba 100644 --- a/apps/recorder/.gitignore +++ b/apps/recorder/.gitignore @@ -25,3 +25,4 @@ Cargo.lock # Dist node_modules dist/ +temp/ diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 640a745..a4c229c 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -131,6 +131,8 @@ ctor = "0.4.0" librqbit = "8.0.0" typed-builder = "0.21.0" snafu = { version = "0.8.5", features = ["futures"] } +anyhow = "1.0.97" +dashmap = "6.1.0" [dev-dependencies] serial_test = "3" insta = { version = "1", features = ["redactions", "yaml", "filters"] } diff --git a/apps/recorder/examples/playground.rs b/apps/recorder/examples/playground.rs index 7da4124..e78a529 100644 --- a/apps/recorder/examples/playground.rs +++ b/apps/recorder/examples/playground.rs @@ -1,4 +1,4 @@ -use recorder::errors::RResult; +use recorder::errors::app_error::RResult; // #![allow(unused_imports)] // use recorder::{ // app::{AppContext, AppContextTrait}, diff --git a/apps/recorder/public/.gitkeep b/apps/recorder/public/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/apps/recorder/public/assets/404.html b/apps/recorder/public/assets/404.html deleted file mode 100644 index dbd7df4..0000000 --- a/apps/recorder/public/assets/404.html +++ /dev/null @@ -1,7 +0,0 @@ - - - - not found :-( - - - \ No newline at end of file diff --git a/apps/recorder/src/app/builder.rs b/apps/recorder/src/app/builder.rs index bd97546..e185e27 100644 --- a/apps/recorder/src/app/builder.rs +++ b/apps/recorder/src/app/builder.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use clap::{Parser, command}; use super::{AppContext, core::App, env::Environment}; -use crate::{app::config::AppConfig, errors::RResult}; +use crate::{app::config::AppConfig, errors::app_error::RResult}; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] diff --git a/apps/recorder/src/app/config/mod.rs b/apps/recorder/src/app/config/mod.rs index a996cea..5615841 100644 --- a/apps/recorder/src/app/config/mod.rs +++ b/apps/recorder/src/app/config/mod.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use super::env::Environment; use crate::{ - auth::AuthConfig, cache::CacheConfig, database::DatabaseConfig, errors::RResult, + auth::AuthConfig, cache::CacheConfig, database::DatabaseConfig, errors::app_error::RResult, extract::mikan::MikanConfig, graphql::GraphQLConfig, logger::LoggerConfig, storage::StorageConfig, web::WebServerConfig, }; diff --git a/apps/recorder/src/app/context.rs b/apps/recorder/src/app/context.rs index 8a8d479..991f635 100644 --- a/apps/recorder/src/app/context.rs +++ b/apps/recorder/src/app/context.rs @@ -1,6 +1,6 @@ use super::{Environment, config::AppConfig}; use crate::{ - auth::AuthService, cache::CacheService, database::DatabaseService, errors::RResult, + auth::AuthService, cache::CacheService, database::DatabaseService, errors::app_error::RResult, extract::mikan::MikanClient, graphql::GraphQLService, logger::LoggerService, storage::StorageService, }; diff --git a/apps/recorder/src/app/core.rs b/apps/recorder/src/app/core.rs index 5b2abc0..f6ce2f7 100644 --- a/apps/recorder/src/app/core.rs +++ b/apps/recorder/src/app/core.rs @@ -6,7 +6,7 @@ use tokio::signal; use super::{builder::AppBuilder, context::AppContextTrait}; use crate::{ - errors::RResult, + errors::app_error::RResult, web::{ controller::{self, core::ControllerTrait}, middleware::default_middleware_stack, diff --git a/apps/recorder/src/auth/oidc.rs b/apps/recorder/src/auth/oidc.rs index 36b0472..12eb420 100644 --- a/apps/recorder/src/auth/oidc.rs +++ b/apps/recorder/src/auth/oidc.rs @@ -24,7 +24,9 @@ use super::{ errors::{AuthError, OidcProviderUrlSnafu, OidcRequestRedirectUriSnafu}, service::{AuthServiceTrait, AuthUserInfo}, }; -use crate::{app::AppContextTrait, errors::RError, fetch::HttpClient, models::auth::AuthType}; +use crate::{ + app::AppContextTrait, errors::app_error::RError, fetch::HttpClient, models::auth::AuthType, +}; #[derive(Deserialize, Serialize, Clone, Debug)] pub struct OidcAuthClaims { diff --git a/apps/recorder/src/bin/main.rs b/apps/recorder/src/bin/main.rs index d2f87a1..3130dec 100644 --- a/apps/recorder/src/bin/main.rs +++ b/apps/recorder/src/bin/main.rs @@ -1,4 +1,4 @@ -use recorder::{app::AppBuilder, errors::RResult}; +use recorder::{app::AppBuilder, errors::app_error::RResult}; #[tokio::main] async fn main() -> RResult<()> { diff --git a/apps/recorder/src/cache/service.rs b/apps/recorder/src/cache/service.rs index 717299c..0be477f 100644 --- a/apps/recorder/src/cache/service.rs +++ b/apps/recorder/src/cache/service.rs @@ -1,5 +1,5 @@ use super::CacheConfig; -use crate::errors::RResult; +use crate::errors::app_error::RResult; pub struct CacheService {} diff --git a/apps/recorder/src/database/service.rs b/apps/recorder/src/database/service.rs index ebdde18..520e6e6 100644 --- a/apps/recorder/src/database/service.rs +++ b/apps/recorder/src/database/service.rs @@ -7,7 +7,7 @@ use sea_orm::{ use sea_orm_migration::MigratorTrait; use super::DatabaseConfig; -use crate::{errors::RResult, migrations::Migrator}; +use crate::{errors::app_error::RResult, migrations::Migrator}; pub struct DatabaseService { connection: DatabaseConnection, diff --git a/apps/recorder/src/downloader/bittorrent/downloader.rs b/apps/recorder/src/downloader/bittorrent/downloader.rs new file mode 100644 index 0000000..0744305 --- /dev/null +++ b/apps/recorder/src/downloader/bittorrent/downloader.rs @@ -0,0 +1,77 @@ +use async_trait::async_trait; + +use crate::downloader::{ + DownloaderError, + bittorrent::task::{ + TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, TorrentTaskTrait, + }, + core::{DownloadIdSelectorTrait, DownloadSelectorTrait, DownloadTaskTrait, DownloaderTrait}, +}; + +#[async_trait] +pub trait TorrentDownloaderTrait: DownloaderTrait +where + Self::State: TorrentStateTrait, + Self::Id: TorrentHashTrait, + Self::Task: TorrentTaskTrait, + Self::Creation: TorrentCreationTrait, + Self::Selector: DownloadSelectorTrait, +{ + type IdSelector: DownloadIdSelectorTrait; + + async fn pause_downloads( + &self, + selector: Self::Selector, + ) -> Result { + let hashes = + ::query_torrent_hashes(&self, selector).await?; + self.pause_torrents(hashes).await + } + + async fn resume_downloads( + &self, + selector: Self::Selector, + ) -> Result { + let hashes = + ::query_torrent_hashes(&self, selector).await?; + self.resume_torrents(hashes).await + } + async fn remove_downloads( + &self, + selector: Self::Selector, + ) -> Result { + let hashes = + ::query_torrent_hashes(&self, selector).await?; + self.remove_torrents(hashes).await + } + + async fn query_torrent_hashes( + &self, + selector: Self::Selector, + ) -> Result { + let hashes = match selector.try_into_ids_only() { + Ok(hashes) => Self::IdSelector::from_iter(hashes), + Err(selector) => { + let tasks = self.query_downloads(selector).await?; + + Self::IdSelector::from_iter(tasks.into_iter().map(|s| s.into_id())) + } + }; + Ok(hashes) + } + + async fn pause_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result; + + async fn resume_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result; + + async fn remove_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result; +} diff --git a/apps/recorder/src/downloader/bittorrent/mod.rs b/apps/recorder/src/downloader/bittorrent/mod.rs new file mode 100644 index 0000000..05b1c62 --- /dev/null +++ b/apps/recorder/src/downloader/bittorrent/mod.rs @@ -0,0 +1,3 @@ +pub mod downloader; +pub mod source; +pub mod task; diff --git a/apps/recorder/src/downloader/bittorrent/source.rs b/apps/recorder/src/downloader/bittorrent/source.rs new file mode 100644 index 0000000..9c96fa7 --- /dev/null +++ b/apps/recorder/src/downloader/bittorrent/source.rs @@ -0,0 +1,228 @@ +use std::{ + borrow::Cow, + fmt::{Debug, Formatter}, +}; + +use bytes::Bytes; +use librqbit_core::{magnet::Magnet, torrent_metainfo, torrent_metainfo::TorrentMetaV1Owned}; +use snafu::ResultExt; +use url::Url; + +use crate::{ + downloader::errors::{ + DownloadFetchSnafu, DownloaderError, MagnetFormatSnafu, TorrentMetaSnafu, + }, + errors::RAnyhowResultExt, + extract::bittorrent::core::MAGNET_SCHEMA, + fetch::{bytes::fetch_bytes, client::core::HttpClientTrait}, +}; + +pub trait HashTorrentSourceTrait: Sized { + fn hash_info(&self) -> Cow<'_, str>; +} + +pub struct MagnetUrlSource { + pub magnet: Magnet, + pub url: String, +} + +impl MagnetUrlSource { + pub fn from_url(url: String) -> Result { + let magnet = Magnet::parse(&url) + .to_dyn_boxed() + .context(MagnetFormatSnafu { + message: url.clone(), + })?; + + Ok(Self { magnet, url }) + } +} + +impl HashTorrentSourceTrait for MagnetUrlSource { + fn hash_info(&self) -> Cow<'_, str> { + let hash_info = self + .magnet + .as_id32() + .map(|s| s.as_string()) + .or_else(|| self.magnet.as_id20().map(|s| s.as_string())) + .unwrap_or_else(|| unreachable!("hash of magnet must existed")); + hash_info.into() + } +} + +impl Debug for MagnetUrlSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MagnetUrlSource") + .field("url", &self.url) + .finish() + } +} + +impl Clone for MagnetUrlSource { + fn clone(&self) -> Self { + Self { + magnet: Magnet::parse(&self.url).unwrap(), + url: self.url.clone(), + } + } +} + +impl PartialEq for MagnetUrlSource { + fn eq(&self, other: &Self) -> bool { + self.url == other.url + } +} + +impl Eq for MagnetUrlSource {} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TorrentUrlSource { + pub url: String, +} + +impl TorrentUrlSource { + pub fn from_url(url: String) -> Result { + Ok(Self { url }) + } +} + +#[derive(Clone)] +pub struct TorrentFileSource { + pub url: Option, + pub payload: Bytes, + pub meta: TorrentMetaV1Owned, + pub filename: String, +} + +impl TorrentFileSource { + pub fn from_bytes( + filename: String, + bytes: Bytes, + url: Option, + ) -> Result { + let meta = torrent_metainfo::torrent_from_bytes(bytes.as_ref()) + .to_dyn_boxed() + .with_context(|_| TorrentMetaSnafu { + message: format!( + "filename = {}, url = {}", + filename, + url.as_deref().unwrap_or_default() + ), + })? + .to_owned(); + + Ok(TorrentFileSource { + url, + payload: bytes, + meta, + filename, + }) + } + pub async fn from_url_and_http_client( + client: &impl HttpClientTrait, + url: String, + ) -> Result { + let payload = fetch_bytes(client, &url) + .await + .boxed() + .with_context(|_| DownloadFetchSnafu { url: url.clone() })?; + + let filename = Url::parse(&url) + .boxed() + .and_then(|s| { + s.path_segments() + .and_then(|p| p.last()) + .map(String::from) + .ok_or_else(|| anyhow::anyhow!("invalid url")) + .to_dyn_boxed() + }) + .with_context(|_| DownloadFetchSnafu { url: url.clone() })?; + + Self::from_bytes(filename, payload, Some(url)) + } +} + +impl HashTorrentSourceTrait for TorrentFileSource { + fn hash_info(&self) -> Cow<'_, str> { + self.meta.info_hash.as_string().into() + } +} + +impl Debug for TorrentFileSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TorrentFileSource") + .field("hash", &self.meta.info_hash.as_string()) + .finish() + } +} + +#[derive(Clone, Debug)] +pub enum UrlTorrentSource { + MagnetUrl(MagnetUrlSource), + TorrentUrl(TorrentUrlSource), +} + +impl UrlTorrentSource { + pub fn from_url(url: String) -> Result { + let url_ = Url::parse(&url)?; + let source = if url_.scheme() == MAGNET_SCHEMA { + Self::from_magnet_url(url)? + } else { + Self::from_torrent_url(url)? + }; + Ok(source) + } + + pub fn from_magnet_url(url: String) -> Result { + let magnet_source = MagnetUrlSource::from_url(url)?; + Ok(Self::MagnetUrl(magnet_source)) + } + + pub fn from_torrent_url(url: String) -> Result { + let torrent_source = TorrentUrlSource::from_url(url)?; + Ok(Self::TorrentUrl(torrent_source)) + } +} + +#[derive(Debug, Clone)] +pub enum HashTorrentSource { + MagnetUrl(MagnetUrlSource), + TorrentFile(TorrentFileSource), +} + +impl HashTorrentSource { + pub async fn from_url_and_http_client( + client: &impl HttpClientTrait, + url: String, + ) -> Result { + let url_ = Url::parse(&url)?; + let source = if url_.scheme() == MAGNET_SCHEMA { + Self::from_magnet_url(url)? + } else { + Self::from_torrent_url_and_http_client(client, url).await? + }; + Ok(source) + } + + pub fn from_magnet_url(url: String) -> Result { + let magnet_source = MagnetUrlSource::from_url(url)?; + Ok(Self::MagnetUrl(magnet_source)) + } + + pub async fn from_torrent_url_and_http_client( + client: &impl HttpClientTrait, + url: String, + ) -> Result { + let torrent_source = TorrentFileSource::from_url_and_http_client(client, url).await?; + Ok(Self::TorrentFile(torrent_source)) + } +} + +impl HashTorrentSourceTrait for HashTorrentSource { + fn hash_info(&self) -> Cow<'_, str> { + match self { + HashTorrentSource::MagnetUrl(m) => m.hash_info(), + HashTorrentSource::TorrentFile(t) => t.hash_info(), + } + } +} diff --git a/apps/recorder/src/downloader/bittorrent/task.rs b/apps/recorder/src/downloader/bittorrent/task.rs new file mode 100644 index 0000000..4da7a15 --- /dev/null +++ b/apps/recorder/src/downloader/bittorrent/task.rs @@ -0,0 +1,37 @@ +use std::{borrow::Cow, hash::Hash}; + +use quirks_path::{Path, PathBuf}; + +use crate::downloader::{ + bittorrent::source::HashTorrentSource, + core::{DownloadCreationTrait, DownloadIdTrait, DownloadStateTrait, DownloadTaskTrait}, +}; + +pub const TORRENT_TAG_NAME: &str = "konobangu"; + +pub trait TorrentHashTrait: DownloadIdTrait + Send + Hash {} + +pub trait TorrentStateTrait: DownloadStateTrait {} + +pub trait TorrentTaskTrait: DownloadTaskTrait +where + Self::State: TorrentStateTrait, + Self::Id: TorrentHashTrait, +{ + fn hash_info(&self) -> &str; + fn name(&self) -> Cow<'_, str> { + Cow::Borrowed(self.hash_info()) + } + + fn tags(&self) -> impl Iterator>; + + fn category(&self) -> Option>; +} + +pub trait TorrentCreationTrait: DownloadCreationTrait { + fn save_path(&self) -> &Path; + + fn save_path_mut(&mut self) -> &mut PathBuf; + + fn sources_mut(&mut self) -> &mut Vec; +} diff --git a/apps/recorder/src/downloader/core.rs b/apps/recorder/src/downloader/core.rs index b32f837..8c07d8e 100644 --- a/apps/recorder/src/downloader/core.rs +++ b/apps/recorder/src/downloader/core.rs @@ -1,297 +1,218 @@ -use std::fmt::Debug; +use std::{ + any::Any, borrow::Cow, fmt::Debug, hash::Hash, marker::PhantomData, ops::Deref, time::Duration, + vec::IntoIter, +}; use async_trait::async_trait; -use itertools::Itertools; -use lazy_static::lazy_static; -use librqbit_core::{ - magnet::Magnet, - torrent_metainfo::{TorrentMetaV1Owned, torrent_from_bytes}, -}; -use quirks_path::{Path, PathBuf}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use snafu::prelude::*; -use url::Url; -use super::{DownloaderError, QbitTorrent, QbitTorrentContent, errors::DownloadFetchSnafu}; -use crate::fetch::{HttpClientTrait, fetch_bytes}; +use super::DownloaderError; -pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent"; -pub const MAGNET_SCHEMA: &str = "magnet"; +pub trait DownloadStateTrait: Sized + Debug {} -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum TorrentFilter { - All, - Downloading, - Completed, - Paused, - Active, - Inactive, - Resumed, - Stalled, - StalledUploading, - StalledDownloading, - Errored, -} +pub trait DownloadIdTrait: Hash + Sized + Clone + Send + Debug {} -lazy_static! { - static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap(); - static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap(); -} +pub trait DownloadTaskTrait: Sized + Send + Debug { + type State: DownloadStateTrait; + type Id: DownloadIdTrait; -#[derive(Clone, PartialEq, Eq)] -pub enum TorrentSource { - MagnetUrl { - url: Url, - hash: String, - }, - TorrentUrl { - url: Url, - hash: String, - }, - TorrentFile { - torrent: Vec, - hash: String, - name: Option, - }, -} - -impl TorrentSource { - pub async fn parse(client: &H, url: &str) -> Result { - let url = Url::parse(url)?; - let source = if url.scheme() == MAGNET_SCHEMA { - TorrentSource::from_magnet_url(url)? - } else if let Some(basename) = url - .clone() - .path_segments() - .and_then(|mut segments| segments.next_back()) - { - if let (Some(match_hash), true) = ( - TORRENT_HASH_RE.find(basename), - TORRENT_EXT_RE.is_match(basename), - ) { - TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())? + fn id(&self) -> &Self::Id; + fn into_id(self) -> Self::Id; + fn name(&self) -> Cow<'_, str>; + fn speed(&self) -> Option; + fn state(&self) -> &Self::State; + fn dl_bytes(&self) -> Option; + fn total_bytes(&self) -> Option; + fn left_bytes(&self) -> Option { + if let (Some(tt), Some(dl)) = (self.total_bytes(), self.dl_bytes()) { + tt.checked_sub(dl) + } else { + None + } + } + fn et(&self) -> Option; + fn eta(&self) -> Option { + if let (Some(left_bytes), Some(speed)) = (self.left_bytes(), self.speed()) { + if speed > 0 { + Some(Duration::from_secs_f64(left_bytes as f64 / speed as f64)) } else { - let contents = fetch_bytes(client, url) - .await - .boxed() - .context(DownloadFetchSnafu)?; - TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))? + None } } else { - let contents = fetch_bytes(client, url) - .await - .boxed() - .context(DownloadFetchSnafu)?; - TorrentSource::from_torrent_file(contents.to_vec(), None)? - }; - Ok(source) + None + } } + fn average_speed(&self) -> Option { + if let (Some(et), Some(dl_bytes)) = (self.et(), self.dl_bytes()) { + let secs = et.as_secs_f64(); - pub fn from_torrent_file(file: Vec, name: Option) -> Result { - let torrent: TorrentMetaV1Owned = - torrent_from_bytes(&file).map_err(|_| DownloaderError::TorrentFileFormatError)?; - let hash = torrent.info_hash.as_string(); - Ok(TorrentSource::TorrentFile { - torrent: file, - hash, - name, - }) - } - - pub fn from_magnet_url(url: Url) -> Result { - if url.scheme() != MAGNET_SCHEMA { - Err(DownloaderError::DownloadSchemaError { - found: url.scheme().to_string(), - expected: MAGNET_SCHEMA.to_string(), - }) + if secs > 0.0 { + Some(dl_bytes as f64 / secs) + } else { + None + } } else { - let magnet = - Magnet::parse(url.as_str()).map_err(|_| DownloaderError::MagnetFormatError { - url: url.as_str().to_string(), - })?; - - let hash = magnet - .as_id20() - .ok_or_else(|| DownloaderError::MagnetFormatError { - url: url.as_str().to_string(), - })? - .as_string(); - Ok(TorrentSource::MagnetUrl { url, hash }) + None } } - - pub fn from_torrent_url(url: Url, hash: String) -> Result { - Ok(TorrentSource::TorrentUrl { url, hash }) - } - - pub fn hash(&self) -> &str { - match self { - TorrentSource::MagnetUrl { hash, .. } => hash, - TorrentSource::TorrentUrl { hash, .. } => hash, - TorrentSource::TorrentFile { hash, .. } => hash, - } - } -} - -impl Debug for TorrentSource { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TorrentSource::MagnetUrl { url, .. } => { - write!(f, "MagnetUrl {{ url: {} }}", url.as_str()) + fn progress(&self) -> Option { + if let (Some(dl), Some(tt)) = (self.dl_bytes(), self.total_bytes()) { + if dl > 0 { + if tt > 0 { + Some(dl as f32 / tt as f32) + } else { + None + } + } else { + Some(0.0) } - TorrentSource::TorrentUrl { url, .. } => { - write!(f, "TorrentUrl {{ url: {} }}", url.as_str()) - } - TorrentSource::TorrentFile { name, hash, .. } => write!( - f, - "TorrentFile {{ name: \"{}\", hash: \"{hash}\" }}", - name.as_deref().unwrap_or_default() - ), + } else { + None } } } -pub trait TorrentContent { - fn get_name(&self) -> &str; - - fn get_all_size(&self) -> u64; - - fn get_progress(&self) -> f64; - - fn get_curr_size(&self) -> u64; +pub trait DownloadCreationTrait: Sized { + type Task: DownloadTaskTrait; } -impl TorrentContent for QbitTorrentContent { - fn get_name(&self) -> &str { - self.name.as_str() - } +pub trait DownloadSelectorTrait: Sized + Any + Send { + type Id: DownloadIdTrait; + type Task: DownloadTaskTrait; - fn get_all_size(&self) -> u64 { - self.size - } - - fn get_progress(&self) -> f64 { - self.progress - } - - fn get_curr_size(&self) -> u64 { - u64::clamp( - f64::round(self.get_all_size() as f64 * self.get_progress()) as u64, - 0, - self.get_all_size(), - ) + fn try_into_ids_only(self) -> Result, Self> { + Err(self) } } -#[derive(Debug, Clone)] -pub enum Torrent { - Qbit { - torrent: QbitTorrent, - contents: Vec, - }, +pub trait DownloadIdSelectorTrait: + DownloadSelectorTrait + + IntoIterator + + FromIterator + + Into> + + From> +{ + fn try_into_ids_only(self) -> Result, Self> { + Ok(Vec::from_iter(self)) + } + + fn from_id(id: Self::Id) -> Self; } -impl Torrent { - pub fn iter_files(&self) -> impl Iterator { - match self { - Torrent::Qbit { contents, .. } => { - contents.iter().map(|item| item as &dyn TorrentContent) - } +#[derive(Debug)] +pub struct DownloadIdSelector +where + Task: DownloadTaskTrait, +{ + pub ids: Vec, + pub marker: PhantomData, +} + +impl Deref for DownloadIdSelector +where + Task: DownloadTaskTrait, +{ + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.ids + } +} + +impl IntoIterator for DownloadIdSelector +where + Task: DownloadTaskTrait, +{ + type Item = Task::Id; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.ids.into_iter() + } +} + +impl FromIterator for DownloadIdSelector +where + Task: DownloadTaskTrait, +{ + fn from_iter>(iter: T) -> Self { + Self { + ids: Vec::from_iter(iter), + marker: PhantomData, } } +} - pub fn get_name(&self) -> Option<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.name.as_deref(), +impl DownloadSelectorTrait for DownloadIdSelector +where + Task: DownloadTaskTrait + 'static, +{ + type Id = Task::Id; + type Task = Task; +} + +impl From> for DownloadIdSelector +where + Task: DownloadTaskTrait + 'static, +{ + fn from(value: Vec) -> Self { + Self { + ids: value, + marker: PhantomData, } } +} - pub fn get_hash(&self) -> Option<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.hash.as_deref(), - } +impl From> for Vec +where + Task: DownloadTaskTrait + 'static, +{ + fn from(value: DownloadIdSelector) -> Self { + value.ids + } +} + +impl DownloadIdSelectorTrait for DownloadIdSelector +where + Task: DownloadTaskTrait + 'static, +{ + fn try_into_ids_only(self) -> Result, Self> { + Ok(self.ids) } - pub fn get_save_path(&self) -> Option<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.save_path.as_deref(), - } - } - - pub fn get_content_path(&self) -> Option<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.content_path.as_deref(), - } - } - - pub fn get_tags(&self) -> Vec<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.tags.as_deref().map_or_else(Vec::new, |s| { - s.split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect_vec() - }), - } - } - - pub fn get_category(&self) -> Option<&str> { - match self { - Torrent::Qbit { torrent, .. } => torrent.category.as_deref(), + fn from_id(id: Self::Id) -> Self { + Self { + ids: vec![id], + marker: PhantomData, } } } #[async_trait] -pub trait TorrentDownloader { - async fn get_torrents_info( +pub trait DownloaderTrait { + type State: DownloadStateTrait; + type Id: DownloadIdTrait; + type Task: DownloadTaskTrait; + type Creation: DownloadCreationTrait; + type Selector: DownloadSelectorTrait; + + async fn add_downloads( &self, - status_filter: TorrentFilter, - category: Option, - tag: Option, - ) -> Result, DownloaderError>; - - async fn add_torrents( + creation: Self::Creation, + ) -> Result, DownloaderError>; + async fn pause_downloads( &self, - source: TorrentSource, - save_path: String, - category: Option<&str>, - ) -> Result<(), DownloaderError>; - - async fn delete_torrents(&self, hashes: Vec) -> Result<(), DownloaderError>; - - async fn rename_torrent_file( + selector: Self::Selector, + ) -> Result, DownloaderError>; + async fn resume_downloads( &self, - hash: &str, - old_path: &str, - new_path: &str, - ) -> Result<(), DownloaderError>; - - async fn move_torrents( + selector: Self::Selector, + ) -> Result, DownloaderError>; + async fn remove_downloads( &self, - hashes: Vec, - new_path: &str, - ) -> Result<(), DownloaderError>; - - async fn get_torrent_path(&self, hashes: String) -> Result, DownloaderError>; - - async fn check_connection(&self) -> Result<(), DownloaderError>; - - async fn set_torrents_category( + selector: Self::Selector, + ) -> Result, DownloaderError>; + async fn query_downloads( &self, - hashes: Vec, - category: &str, - ) -> Result<(), DownloaderError>; - - async fn add_torrent_tags( - &self, - hashes: Vec, - tags: Vec, - ) -> Result<(), DownloaderError>; - - async fn add_category(&self, category: &str) -> Result<(), DownloaderError>; - - fn get_save_path(&self, sub_path: &Path) -> PathBuf; + selector: Self::Selector, + ) -> Result, DownloaderError>; } diff --git a/apps/recorder/src/downloader/errors.rs b/apps/recorder/src/downloader/errors.rs index 7c21337..cd6397f 100644 --- a/apps/recorder/src/downloader/errors.rs +++ b/apps/recorder/src/downloader/errors.rs @@ -2,40 +2,45 @@ use std::{borrow::Cow, time::Duration}; use snafu::prelude::*; -use crate::errors::OptionWhateverAsync; +use crate::errors::OptDynErr; #[derive(Snafu, Debug)] #[snafu(visibility(pub(crate)))] pub enum DownloaderError { - #[snafu(display("Invalid mime (expected {expected:?}, got {found:?})"))] - DownloadMimeError { expected: String, found: String }, - #[snafu(display("Invalid url schema (expected {expected:?}, got {found:?})"))] - DownloadSchemaError { expected: String, found: String }, #[snafu(transparent)] DownloadUrlParseError { source: url::ParseError }, - #[snafu(display("Invalid url format: {reason}"))] - DownloadUrlFormatError { reason: Cow<'static, str> }, #[snafu(transparent)] QBitAPIError { source: qbit_rs::Error }, + #[snafu(transparent)] + DownloaderIOError { source: std::io::Error }, #[snafu(display("Timeout error (action = {action}, timeout = {timeout:?})"))] DownloadTimeoutError { action: Cow<'static, str>, timeout: Duration, }, - #[snafu(display("Invalid torrent file format"))] - TorrentFileFormatError, - #[snafu(display("Invalid magnet format (url = {url})"))] - MagnetFormatError { url: String }, + #[snafu(display("Invalid magnet format ({message})"))] + MagnetFormatError { + message: String, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, + #[snafu(display("Invalid torrent meta format ({message})"))] + TorrentMetaError { + message: String, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, #[snafu(display("Failed to fetch: {source}"))] DownloadFetchError { - #[snafu(source)] - source: Box, + url: String, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, }, #[snafu(display("{message}"))] Whatever { message: String, - #[snafu(source(from(Box, OptionWhateverAsync::some)))] - source: OptionWhateverAsync, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, }, } @@ -45,14 +50,14 @@ impl snafu::FromString for DownloaderError { fn without_source(message: String) -> Self { Self::Whatever { message, - source: OptionWhateverAsync::none(), + source: OptDynErr::none(), } } fn with_source(source: Self::Source, message: String) -> Self { Self::Whatever { message, - source: OptionWhateverAsync::some(source), + source: OptDynErr::some(source), } } } diff --git a/apps/recorder/src/downloader/mod.rs b/apps/recorder/src/downloader/mod.rs index 7409a19..38a22c8 100644 --- a/apps/recorder/src/downloader/mod.rs +++ b/apps/recorder/src/downloader/mod.rs @@ -1,14 +1,10 @@ +pub mod bittorrent; pub mod core; pub mod errors; pub mod qbit; pub mod rqbit; pub mod utils; -pub use core::{ - Torrent, TorrentContent, TorrentDownloader, TorrentFilter, TorrentSource, BITTORRENT_MIME_TYPE, - MAGNET_SCHEMA, -}; - pub use errors::DownloaderError; pub use qbit::{ QBittorrentDownloader, QBittorrentDownloaderCreation, QbitTorrent, QbitTorrentContent, diff --git a/apps/recorder/src/downloader/qbit/mod.rs b/apps/recorder/src/downloader/qbit/mod.rs index 177fa28..bc3baab 100644 --- a/apps/recorder/src/downloader/qbit/mod.rs +++ b/apps/recorder/src/downloader/qbit/mod.rs @@ -1,8 +1,14 @@ use std::{ - borrow::Cow, collections::HashSet, fmt::Debug, future::Future, sync::Arc, time::Duration, + borrow::Cow, + collections::{HashMap, HashSet}, + fmt::Debug, + io, + sync::Arc, + time::Duration, }; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use futures::future::try_join_all; pub use qbit_rs::model::{ Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFile as QbitTorrentFile, @@ -10,57 +16,185 @@ pub use qbit_rs::model::{ }; use qbit_rs::{ Qbit, - model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData}, + model::{ + AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, Sep, State, TorrentFile, + TorrentSource, + }, }; use quirks_path::{Path, PathBuf}; +use seaography::itertools::Itertools; use snafu::prelude::*; -use tokio::time::sleep; +use tokio::sync::watch; use tracing::instrument; use url::Url; -use super::{ - DownloaderError, Torrent, TorrentDownloader, TorrentFilter, TorrentSource, - utils::path_equals_as_file_url, +use super::{DownloaderError, utils::path_equals_as_file_url}; +use crate::downloader::{ + bittorrent::{ + downloader::TorrentDownloaderTrait, + source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource}, + task::{ + TORRENT_TAG_NAME, TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, + TorrentTaskTrait, + }, + }, + core::{ + DownloadCreationTrait, DownloadIdSelector, DownloadIdTrait, DownloadSelectorTrait, + DownloadStateTrait, DownloadTaskTrait, DownloaderTrait, + }, }; -impl From for QbitTorrentSource { - fn from(value: TorrentSource) -> Self { - match value { - TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls { - urls: qbit_rs::model::Sep::from([url]), - }, - TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls { - urls: qbit_rs::model::Sep::from([url]), - }, - TorrentSource::TorrentFile { - torrent: torrents, - name, - .. - } => QbitTorrentSource::TorrentFiles { - torrents: vec![QbitTorrentFile { - filename: name.unwrap_or_default(), - data: torrents, - }], - }, - } +pub type QBittorrentHash = String; + +impl DownloadIdTrait for QBittorrentHash {} + +impl TorrentHashTrait for QBittorrentHash {} + +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct QBittorrentState(Option); + +impl DownloadStateTrait for QBittorrentState {} + +impl TorrentStateTrait for QBittorrentState {} + +impl From> for QBittorrentState { + fn from(value: Option) -> Self { + Self(value) } } -impl From for QbitTorrentFilter { - fn from(val: TorrentFilter) -> Self { - match val { - TorrentFilter::All => QbitTorrentFilter::All, - TorrentFilter::Downloading => QbitTorrentFilter::Downloading, - TorrentFilter::Completed => QbitTorrentFilter::Completed, - TorrentFilter::Paused => QbitTorrentFilter::Paused, - TorrentFilter::Active => QbitTorrentFilter::Active, - TorrentFilter::Inactive => QbitTorrentFilter::Inactive, - TorrentFilter::Resumed => QbitTorrentFilter::Resumed, - TorrentFilter::Stalled => QbitTorrentFilter::Stalled, - TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading, - TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading, - TorrentFilter::Errored => QbitTorrentFilter::Errored, - } +#[derive(Debug)] +pub struct QBittorrentTask { + pub hash_info: QBittorrentHash, + pub torrent: QbitTorrent, + pub contents: Vec, + pub state: QBittorrentState, +} + +impl QBittorrentTask { + fn from_query( + torrent: QbitTorrent, + contents: Vec, + ) -> Result { + let hash = torrent + .hash + .clone() + .ok_or_else(|| DownloaderError::TorrentMetaError { + message: "missing hash".to_string(), + source: None.into(), + })?; + let state = QBittorrentState(torrent.state.clone()); + Ok(Self { + hash_info: hash, + contents, + state, + torrent, + }) + } +} + +impl DownloadTaskTrait for QBittorrentTask { + type State = QBittorrentState; + type Id = QBittorrentHash; + + fn id(&self) -> &Self::Id { + &self.hash_info + } + + fn into_id(self) -> Self::Id { + self.hash_info + } + + fn name(&self) -> Cow<'_, str> { + self.torrent + .name + .as_deref() + .map(Cow::Borrowed) + .unwrap_or_else(|| DownloadTaskTrait::name(self)) + } + + fn speed(&self) -> Option { + self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok()) + } + + fn state(&self) -> &Self::State { + &self.state + } + + fn dl_bytes(&self) -> Option { + self.torrent.downloaded.and_then(|v| u64::try_from(v).ok()) + } + + fn total_bytes(&self) -> Option { + self.torrent.size.and_then(|v| u64::try_from(v).ok()) + } + + fn left_bytes(&self) -> Option { + self.torrent.amount_left.and_then(|v| u64::try_from(v).ok()) + } + + fn et(&self) -> Option { + self.torrent + .time_active + .and_then(|v| u64::try_from(v).ok()) + .map(Duration::from_secs) + } + + fn eta(&self) -> Option { + self.torrent + .eta + .and_then(|v| u64::try_from(v).ok()) + .map(Duration::from_secs) + } + + fn progress(&self) -> Option { + self.torrent.progress.as_ref().map(|s| *s as f32) + } +} + +impl TorrentTaskTrait for QBittorrentTask { + fn hash_info(&self) -> &str { + &self.hash_info + } + + fn tags(&self) -> impl Iterator> { + self.torrent + .tags + .as_deref() + .unwrap_or("") + .split(',') + .filter(|s| !s.is_empty()) + .map(Cow::Borrowed) + } + + fn category(&self) -> Option> { + self.torrent.category.as_deref().map(Cow::Borrowed) + } +} + +#[derive(Debug, Clone, Default)] +pub struct QBittorrentCreation { + pub save_path: PathBuf, + pub tags: Vec, + pub category: Option, + pub sources: Vec, +} + +impl DownloadCreationTrait for QBittorrentCreation { + type Task = QBittorrentTask; +} + +impl TorrentCreationTrait for QBittorrentCreation { + fn save_path(&self) -> &Path { + self.save_path.as_ref() + } + + fn save_path_mut(&mut self) -> &mut PathBuf { + &mut self.save_path + } + + fn sources_mut(&mut self) -> &mut Vec { + &mut self.sources } } @@ -70,14 +204,72 @@ pub struct QBittorrentDownloaderCreation { pub password: String, pub save_path: String, pub subscriber_id: i32, + pub downloader_id: i32, +} + +pub type QBittorrentHashSelector = DownloadIdSelector; + +pub struct QBittorrentComplexSelector { + pub query: GetTorrentListArg, +} + +impl From for QBittorrentComplexSelector { + fn from(value: QBittorrentHashSelector) -> Self { + Self { + query: GetTorrentListArg { + hashes: Some(value.ids.join("|")), + ..Default::default() + }, + } + } +} + +impl DownloadSelectorTrait for QBittorrentComplexSelector { + type Id = QBittorrentHash; + type Task = QBittorrentTask; +} + +pub enum QBittorrentSelector { + Hash(QBittorrentHashSelector), + Complex(QBittorrentComplexSelector), +} + +impl DownloadSelectorTrait for QBittorrentSelector { + type Id = QBittorrentHash; + type Task = QBittorrentTask; + + fn try_into_ids_only(self) -> Result, Self> { + match self { + QBittorrentSelector::Complex(c) => { + c.try_into_ids_only().map_err(QBittorrentSelector::Complex) + } + QBittorrentSelector::Hash(h) => { + let result = h + .try_into_ids_only() + .unwrap_or_else(|_| unreachable!("hash selector must contains hash")) + .into_iter(); + Ok(result.collect_vec()) + } + } + } +} + +#[derive(Default)] +pub struct QBittorrentSyncData { + pub torrents: HashMap, + pub categories: HashSet, + pub tags: HashSet, } pub struct QBittorrentDownloader { pub subscriber_id: i32, + pub downloader_id: i32, pub endpoint_url: Url, pub client: Arc, pub save_path: PathBuf, pub wait_sync_timeout: Duration, + pub sync_watch: watch::Sender>, + pub sync_data: QBittorrentSyncData, } impl QBittorrentDownloader { @@ -100,6 +292,9 @@ impl QBittorrentDownloader { subscriber_id: creation.subscriber_id, save_path: creation.save_path.into(), wait_sync_timeout: Duration::from_millis(10000), + downloader_id: creation.downloader_id, + sync_watch: watch::channel(Utc::now()).0, + sync_data: QBittorrentSyncData::default(), }) } @@ -109,302 +304,79 @@ impl QBittorrentDownloader { Ok(result) } - pub async fn wait_until( + pub async fn wait_sync_until( &self, - capture_fn: H, - fetch_data_fn: G, - mut stop_wait_fn: F, + stop_wait_fn: S, timeout: Option, ) -> Result<(), DownloaderError> where - H: FnOnce() -> E, - G: Fn(Arc, E) -> Fut, - Fut: Future>, - F: FnMut(&D) -> bool, - E: Clone, - D: Debug + serde::Serialize, + S: Fn(&QBittorrentSyncData) -> bool, { - let mut next_wait_ms = 32u64; - let mut all_wait_ms = 0u64; + let mut receiver = self.sync_watch.subscribe(); let timeout = timeout.unwrap_or(self.wait_sync_timeout); - let env = capture_fn(); - loop { - sleep(Duration::from_millis(next_wait_ms)).await; - all_wait_ms += next_wait_ms; - if all_wait_ms >= timeout.as_millis() as u64 { - // full update - let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?; - if stop_wait_fn(&sync_data) { - break; - } else { - tracing::warn!(name = "wait_until timeout", sync_data = serde_json::to_string(&sync_data).unwrap(), timeout = ?timeout); - return Err(DownloaderError::DownloadTimeoutError { - action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), - timeout, - }); - } + let start_time = Utc::now(); + + while let Ok(()) = receiver.changed().await { + let sync_time = receiver.borrow(); + if sync_time + .signed_duration_since(start_time) + .num_milliseconds() + > timeout.as_millis() as i64 + { + tracing::warn!(name = "wait_until timeout", timeout = ?timeout); + return Err(DownloaderError::DownloadTimeoutError { + action: Cow::Borrowed("QBittorrentDownloader::wait_unit"), + timeout, + }); } - let sync_data = fetch_data_fn(self.client.clone(), env.clone()).await?; - if stop_wait_fn(&sync_data) { + if stop_wait_fn(&self.sync_data) { break; } - next_wait_ms *= 2; } Ok(()) } - #[instrument(level = "trace", skip(self, stop_wait_fn))] - pub async fn wait_torrents_until( - &self, - arg: GetTorrentListArg, - stop_wait_fn: F, - timeout: Option, - ) -> Result<(), DownloaderError> - where - F: FnMut(&Vec) -> bool, - { - self.wait_until( - || arg, - async move |client: Arc, - arg: GetTorrentListArg| - -> Result, DownloaderError> { - let data = client.get_torrent_list(arg).await?; - Ok(data) - }, - stop_wait_fn, - timeout, - ) - .await - } - - #[instrument(level = "debug", skip(self, stop_wait_fn))] - pub async fn wait_sync_until bool>( - &self, - stop_wait_fn: F, - timeout: Option, - ) -> Result<(), DownloaderError> { - self.wait_until( - || (), - async move |client: Arc, _| -> Result { - let data = client.sync(None).await?; - Ok(data) - }, - stop_wait_fn, - timeout, - ) - .await - } - - #[instrument(level = "debug", skip(self, stop_wait_fn))] - async fn wait_torrent_contents_until) -> bool>( - &self, - hash: &str, - stop_wait_fn: F, - timeout: Option, - ) -> Result<(), DownloaderError> { - self.wait_until( - || Arc::new(hash.to_string()), - async move |client: Arc, - hash_arc: Arc| - -> Result, DownloaderError> { - let data = client.get_torrent_contents(hash_arc.as_str(), None).await?; - Ok(data) - }, - stop_wait_fn, - timeout, - ) - .await - } -} - -#[async_trait] -impl TorrentDownloader for QBittorrentDownloader { #[instrument(level = "debug", skip(self))] - async fn get_torrents_info( - &self, - status_filter: TorrentFilter, - category: Option, - tag: Option, - ) -> Result, DownloaderError> { - let arg = GetTorrentListArg { - filter: Some(status_filter.into()), - category, - tag, - ..Default::default() - }; - let torrent_list = self.client.get_torrent_list(arg).await?; - let torrent_contents = try_join_all(torrent_list.iter().map(|s| async { - if let Some(hash) = &s.hash { - self.client.get_torrent_contents(hash as &str, None).await - } else { - Ok(vec![]) - } - })) - .await?; - Ok(torrent_list - .into_iter() - .zip(torrent_contents) - .map(|(torrent, contents)| Torrent::Qbit { torrent, contents }) - .collect::>()) - } - - #[instrument(level = "debug", skip(self))] - async fn add_torrents( - &self, - source: TorrentSource, - save_path: String, - category: Option<&str>, - ) -> Result<(), DownloaderError> { - let arg = AddTorrentArg { - source: source.clone().into(), - savepath: Some(save_path), - category: category.map(String::from), - auto_torrent_management: Some(false), - ..Default::default() - }; - let add_result = self.client.add_torrent(arg.clone()).await; - if let ( - Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)), - Some(category), - ) = (&add_result, category) - { - self.add_category(category).await?; - self.client.add_torrent(arg).await?; - } else { - add_result?; - } - let source_hash = source.hash(); - self.wait_sync_until( - |sync_data| { - sync_data - .torrents - .as_ref() - .is_some_and(|t| t.contains_key(source_hash)) - }, - None, - ) - .await?; - Ok(()) - } - - #[instrument(level = "debug", skip(self))] - async fn delete_torrents(&self, hashes: Vec) -> Result<(), DownloaderError> { + pub async fn add_category(&self, category: &str) -> Result<(), DownloaderError> { self.client - .delete_torrents(hashes.clone(), Some(true)) + .add_category( + NonEmptyStr::new(category) + .whatever_context::<_, DownloaderError>("category can not be empty")?, + self.save_path.as_str(), + ) .await?; - self.wait_torrents_until( - GetTorrentListArg::builder() - .hashes(hashes.join("|")) - .build(), - |torrents| -> bool { torrents.is_empty() }, - None, - ) - .await?; + self.wait_sync_until(|sync_data| sync_data.categories.contains(category), None) + .await?; + Ok(()) } #[instrument(level = "debug", skip(self))] - async fn rename_torrent_file( - &self, - hash: &str, - old_path: &str, - new_path: &str, - ) -> Result<(), DownloaderError> { - self.client.rename_file(hash, old_path, new_path).await?; - let new_path = self.save_path.join(new_path); - let save_path = self.save_path.as_path(); - self.wait_torrent_contents_until( - hash, - |contents| -> bool { - contents.iter().any(|c| { - path_equals_as_file_url(save_path.join(&c.name), &new_path) - .inspect_err(|error| { - tracing::warn!(name = "path_equals_as_file_url", error = ?error); - }) - .unwrap_or(false) - }) - }, - None, - ) - .await?; - Ok(()) - } - - #[instrument(level = "debug", skip(self))] - async fn move_torrents( - &self, - hashes: Vec, - new_path: &str, - ) -> Result<(), DownloaderError> { - self.client - .set_torrent_location(hashes.clone(), new_path) - .await?; - - self.wait_torrents_until( - GetTorrentListArg::builder() - .hashes(hashes.join("|")) - .build(), - |torrents| -> bool { - torrents.iter().flat_map(|t| t.save_path.as_ref()).any(|p| { - path_equals_as_file_url(p, new_path) - .inspect_err(|error| { - tracing::warn!(name = "path_equals_as_file_url", error = ?error); - }) - .unwrap_or(false) - }) - }, - None, - ) - .await?; - Ok(()) - } - - async fn get_torrent_path(&self, hashes: String) -> Result, DownloaderError> { - let mut torrent_list = self - .client - .get_torrent_list(GetTorrentListArg { - hashes: Some(hashes), - ..Default::default() - }) - .await?; - let torrent = torrent_list - .first_mut() - .whatever_context::<_, DownloaderError>("No torrent found")?; - Ok(torrent.save_path.take()) - } - - #[instrument(level = "debug", skip(self))] - async fn check_connection(&self) -> Result<(), DownloaderError> { + pub async fn check_connection(&self) -> Result<(), DownloaderError> { self.api_version().await?; Ok(()) } #[instrument(level = "debug", skip(self))] - async fn set_torrents_category( + pub async fn set_torrents_category( &self, hashes: Vec, category: &str, ) -> Result<(), DownloaderError> { - let result = self - .client - .set_torrent_category(hashes.clone(), category) - .await; - if let Err(qbit_rs::Error::ApiError(qbit_rs::ApiError::CategoryNotFound)) = &result { + if !self.sync_data.categories.contains(category) { self.add_category(category).await?; - self.client - .set_torrent_category(hashes.clone(), category) - .await?; - } else { - result?; } - self.wait_torrents_until( - GetTorrentListArg::builder() - .hashes(hashes.join("|")) - .build(), - |torrents| { - torrents - .iter() - .all(|t| t.category.as_ref().is_some_and(|c| c == category)) + self.client + .set_torrent_category(hashes.clone(), category) + .await?; + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + hashes.iter().all(|h| { + torrents + .get(h) + .is_some_and(|t| t.category.as_deref().is_some_and(|c| c == category)) + }) }, None, ) @@ -412,31 +384,77 @@ impl TorrentDownloader for QBittorrentDownloader { Ok(()) } + pub fn get_save_path(&self, sub_path: &Path) -> PathBuf { + self.save_path.join(sub_path) + } + #[instrument(level = "debug", skip(self))] - async fn add_torrent_tags( + pub async fn add_torrent_tags( &self, hashes: Vec, tags: Vec, ) -> Result<(), DownloaderError> { if tags.is_empty() { - whatever!("add torrent tags can not be empty"); + whatever!("add bittorrent tags can not be empty"); } self.client .add_torrent_tags(hashes.clone(), tags.clone()) .await?; let tag_sets = tags.iter().map(|s| s.as_str()).collect::>(); - self.wait_torrents_until( - GetTorrentListArg::builder() - .hashes(hashes.join("|")) - .build(), - |torrents| { - torrents.iter().all(|t| { - t.tags.as_ref().is_some_and(|t| { - t.split(',') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect::>() - .is_superset(&tag_sets) + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + + hashes.iter().all(|h| { + torrents.get(h).is_some_and(|t| { + t.tags.as_ref().is_some_and(|t| { + t.split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect::>() + .is_superset(&tag_sets) + }) + }) + }) + }, + None, + ) + .await?; + Ok(()) + } + + #[instrument(level = "debug", skip(self, replacer))] + pub async fn move_torrent_contents String>( + &self, + hash: &str, + replacer: F, + ) -> Result<(), DownloaderError> { + let old_path = self + .sync_data + .torrents + .get(hash) + .and_then(|t| t.content_path.as_deref()) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "no torrent or torrent does not contain content path", + ) + })? + .to_string(); + let new_path = replacer(old_path.clone()); + self.client + .rename_file(hash, old_path.clone(), new_path.to_string()) + .await?; + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + torrents.get(hash).is_some_and(|t| { + t.content_path.as_deref().is_some_and(|p| { + path_equals_as_file_url(p, &new_path) + .inspect_err(|error| { + tracing::warn!(name = "path_equals_as_file_url", error = ?error); + }) + .unwrap_or(false) }) }) }, @@ -447,30 +465,234 @@ impl TorrentDownloader for QBittorrentDownloader { } #[instrument(level = "debug", skip(self))] - async fn add_category(&self, category: &str) -> Result<(), DownloaderError> { + pub async fn move_torrents( + &self, + hashes: Vec, + new_path: &str, + ) -> Result<(), DownloaderError> { self.client - .add_category( - NonEmptyStr::new(category) - .whatever_context::<_, DownloaderError>("category can not be empty")?, - self.save_path.as_str(), - ) + .set_torrent_location(hashes.clone(), new_path) .await?; + self.wait_sync_until( - |sync_data| { - sync_data - .categories - .as_ref() - .is_some_and(|s| s.contains_key(category)) + |sync_data| -> bool { + let torrents = &sync_data.torrents; + + hashes.iter().all(|h| { + torrents.get(h).is_some_and(|t| { + t.save_path.as_deref().is_some_and(|p| { + path_equals_as_file_url(p, new_path) + .inspect_err(|error| { + tracing::warn!(name = "path_equals_as_file_url", error = ?error); + }) + .unwrap_or(false) + }) + }) + }) }, None, ) .await?; - Ok(()) } - fn get_save_path(&self, sub_path: &Path) -> PathBuf { - self.save_path.join(sub_path) + pub async fn get_torrent_path( + &self, + hashes: String, + ) -> Result, DownloaderError> { + let mut torrent_list = self + .client + .get_torrent_list(GetTorrentListArg { + hashes: Some(hashes), + ..Default::default() + }) + .await?; + let torrent = torrent_list + .first_mut() + .whatever_context::<_, DownloaderError>("No bittorrent found")?; + Ok(torrent.save_path.take()) + } +} + +#[async_trait] +impl DownloaderTrait for QBittorrentDownloader { + type State = QBittorrentState; + type Id = QBittorrentHash; + type Task = QBittorrentTask; + type Creation = QBittorrentCreation; + type Selector = QBittorrentSelector; + + async fn add_downloads( + &self, + creation: Self::Creation, + ) -> Result, DownloaderError> { + let tag = { + let mut tags = vec![TORRENT_TAG_NAME.to_string()]; + tags.extend(creation.tags); + Some(tags.into_iter().filter(|s| !s.is_empty()).join(",")) + }; + + let save_path = Some(creation.save_path.into_string()); + + let sources = creation.sources; + let ids = HashSet::from_iter(sources.iter().map(|s| s.hash_info().to_string())); + let (urls_source, files_source) = { + let mut urls = vec![]; + let mut files = vec![]; + for s in sources { + match s { + HashTorrentSource::MagnetUrl(MagnetUrlSource { url, .. }) => { + urls.push(Url::parse(&url)?) + } + HashTorrentSource::TorrentFile(TorrentFileSource { + payload, filename, .. + }) => files.push(TorrentFile { + filename, + data: payload.into(), + }), + } + } + ( + if urls.is_empty() { + None + } else { + Some(TorrentSource::Urls { + urls: Sep::from(urls), + }) + }, + if files.is_empty() { + None + } else { + Some(TorrentSource::TorrentFiles { torrents: files }) + }, + ) + }; + + let category = TORRENT_TAG_NAME.to_string(); + + if let Some(source) = urls_source { + self.client + .add_torrent(AddTorrentArg { + source, + savepath: save_path.clone(), + auto_torrent_management: Some(false), + category: Some(category.clone()), + tags: tag.clone(), + ..Default::default() + }) + .await?; + } + + if let Some(source) = files_source { + self.client + .add_torrent(AddTorrentArg { + source, + savepath: save_path.clone(), + auto_torrent_management: Some(false), + category: Some(category.clone()), + tags: tag, + ..Default::default() + }) + .await?; + } + self.wait_sync_until( + |sync_data| { + let torrents = &sync_data.torrents; + ids.iter().all(|id| torrents.contains_key(id)) + }, + None, + ) + .await?; + Ok(ids) + } + + async fn pause_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::pause_downloads(self, selector).await + } + + async fn resume_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::resume_downloads(self, selector).await + } + + async fn remove_downloads( + &self, + selector: Self::Selector, + ) -> Result, DownloaderError> { + ::remove_downloads(self, selector).await + } + + async fn query_downloads( + &self, + selector: QBittorrentSelector, + ) -> Result, DownloaderError> { + let selector = match selector { + QBittorrentSelector::Hash(h) => h.into(), + QBittorrentSelector::Complex(c) => c, + }; + + let torrent_list = self.client.get_torrent_list(selector.query).await?; + + let torrent_contents = try_join_all(torrent_list.iter().map(|s| async { + if let Some(hash) = &s.hash { + self.client.get_torrent_contents(hash as &str, None).await + } else { + Ok(vec![]) + } + })) + .await?; + let tasks = torrent_list + .into_iter() + .zip(torrent_contents) + .map(|(t, c)| Self::Task::from_query(t, c)) + .collect::, _>>()?; + Ok(tasks) + } +} + +#[async_trait] +impl TorrentDownloaderTrait for QBittorrentDownloader { + type IdSelector = DownloadIdSelector; + #[instrument(level = "debug", skip(self))] + async fn pause_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client.pause_torrents(hashes.clone()).await?; + Ok(hashes) + } + + #[instrument(level = "debug", skip(self))] + async fn resume_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client.resume_torrents(hashes.clone()).await?; + Ok(hashes) + } + + #[instrument(level = "debug", skip(self))] + async fn remove_torrents( + &self, + hashes: Self::IdSelector, + ) -> Result { + self.client + .delete_torrents(hashes.clone(), Some(true)) + .await?; + self.wait_sync_until( + |sync_data| -> bool { + let torrents = &sync_data.torrents; + hashes.iter().all(|h| !torrents.contains_key(h)) + }, + None, + ) + .await?; + Ok(hashes) } } @@ -485,10 +707,12 @@ impl Debug for QBittorrentDownloader { #[cfg(test)] pub mod tests { - use itertools::Itertools; - use super::*; - use crate::{errors::RResult, test_utils::fetch::build_testing_http_client}; + use crate::{ + downloader::core::DownloadIdSelectorTrait, + errors::{RError, app_error::RResult}, + test_utils::fetch::build_testing_http_client, + }; fn get_tmp_qbit_test_folder() -> &'static str { if cfg!(all(windows, not(feature = "testcontainers"))) { @@ -531,7 +755,7 @@ pub mod tests { #[cfg(not(feature = "testcontainers"))] #[tokio::test] async fn test_qbittorrent_downloader() { - test_qbittorrent_downloader_impl(None, None).await; + let _ = test_qbittorrent_downloader_impl(None, None).await; } #[cfg(feature = "testcontainers")] @@ -591,12 +815,15 @@ pub mod tests { let http_client = build_testing_http_client()?; let base_save_path = Path::new(get_tmp_qbit_test_folder()); + let hash = "47ee2d69e7f19af783ad896541a07b012676f858".to_string(); + let mut downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation { endpoint: "http://127.0.0.1:8080".to_string(), password: password.unwrap_or_default().to_string(), username: username.unwrap_or_default().to_string(), subscriber_id: 0, save_path: base_save_path.to_string(), + downloader_id: 0, }) .await?; @@ -605,111 +832,129 @@ pub mod tests { downloader.check_connection().await?; downloader - .delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()]) + .remove_torrents(vec![hash.clone()].into()) .await?; - let torrent_source = TorrentSource::parse( + let torrent_source = HashTorrentSource::from_url_and_http_client( &http_client, - "https://mikanani.me/Download/20240301/47ee2d69e7f19af783ad896541a07b012676f858.torrent" - ).await?; + format!("https://mikanani.me/Download/20240301/{}.torrent", &hash), + ) + .await?; - let save_path = base_save_path.join(format!( - "test_add_torrents_{}", - chrono::Utc::now().timestamp() - )); + let folder_name = format!("torrent_test_{}", Utc::now().timestamp()); + let save_path = base_save_path.join(&folder_name); - downloader - .add_torrents(torrent_source, save_path.to_string(), Some("bangumi")) - .await?; + let torrent_creation = QBittorrentCreation { + save_path, + tags: vec![], + sources: vec![torrent_source], + category: None, + }; - let get_torrent = async || -> Result { + downloader.add_downloads(torrent_creation).await?; + + let get_torrent = async || -> Result { let torrent_infos = downloader - .get_torrents_info(TorrentFilter::All, None, None) + .query_downloads(QBittorrentSelector::Hash(QBittorrentHashSelector::from_id( + hash.clone(), + ))) .await?; let result = torrent_infos .into_iter() - .find(|t| (t.get_hash() == Some("47ee2d69e7f19af783ad896541a07b012676f858"))) - .whatever_context::<_, DownloaderError>("no torrent")?; + .find(|t| t.hash_info() == hash) + .whatever_context::<_, DownloaderError>("no bittorrent")?; Ok(result) }; let target_torrent = get_torrent().await?; - let files = target_torrent.iter_files().collect_vec(); + let files = target_torrent.contents; assert!(!files.is_empty()); - let first_file = files[0]; + let first_file = files.first().expect("should have first file"); assert_eq!( - first_file.get_name(), + &first_file.name, r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"# ); - let test_tag = format!("test_tag_{}", chrono::Utc::now().timestamp()); + let test_tag = format!("test_tag_{}", Utc::now().timestamp()); downloader - .add_torrent_tags( - vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], - vec![test_tag.clone()], - ) + .add_torrent_tags(vec![hash.clone()], vec![test_tag.clone()]) .await?; let target_torrent = get_torrent().await?; - assert!(target_torrent.get_tags().iter().any(|s| s == &test_tag)); + assert!(target_torrent.tags().any(|s| s == test_tag)); - let test_category = format!("test_category_{}", chrono::Utc::now().timestamp()); + let test_category = format!("test_category_{}", Utc::now().timestamp()); downloader - .set_torrents_category( - vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], - &test_category, - ) + .set_torrents_category(vec![hash.clone()], &test_category) .await?; let target_torrent = get_torrent().await?; - assert_eq!(Some(test_category.as_str()), target_torrent.get_category()); + assert_eq!( + Some(test_category.as_str()), + target_torrent.category().as_deref() + ); - let moved_save_path = base_save_path.join(format!( - "moved_test_add_torrents_{}", - chrono::Utc::now().timestamp() - )); + let moved_torrent_path = base_save_path.join(format!("moved_{}", Utc::now().timestamp())); downloader - .move_torrents( - vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()], - moved_save_path.as_str(), - ) + .move_torrents(vec![hash.clone()], moved_torrent_path.as_str()) .await?; let target_torrent = get_torrent().await?; - let content_path = target_torrent.iter_files().next().unwrap().get_name(); + let actual_content_path = &target_torrent + .torrent + .save_path + .expect("failed to get actual save path"); - let new_content_path = &format!("new_{}", content_path); + assert!( + path_equals_as_file_url(actual_content_path, moved_torrent_path) + .whatever_context::<_, RError>( + "failed to compare actual torrent path and found expected torrent path" + )? + ); downloader - .rename_torrent_file( - "47ee2d69e7f19af783ad896541a07b012676f858", - content_path, - new_content_path, - ) + .move_torrent_contents(&hash, |f| { + f.replace(&folder_name, &format!("moved_{}", &folder_name)) + }) .await?; let target_torrent = get_torrent().await?; - let content_path = target_torrent.iter_files().next().unwrap().get_name(); + let actual_content_path = &target_torrent + .torrent + .content_path + .expect("failed to get actual content path"); - assert_eq!(content_path, new_content_path); + assert!( + path_equals_as_file_url( + actual_content_path, + base_save_path.join(actual_content_path) + ) + .whatever_context::<_, RError>( + "failed to compare actual content path and found expected content path" + )? + ); downloader - .delete_torrents(vec!["47ee2d69e7f19af783ad896541a07b012676f858".to_string()]) + .remove_torrents(vec![hash.clone()].into()) .await?; let torrent_infos1 = downloader - .get_torrents_info(TorrentFilter::All, None, None) + .query_downloads(QBittorrentSelector::Complex(QBittorrentComplexSelector { + query: GetTorrentListArg::builder() + .filter(QbitTorrentFilter::All) + .build(), + })) .await?; assert!(torrent_infos1.is_empty()); diff --git a/apps/recorder/src/errors/whatever.rs b/apps/recorder/src/errors/alias.rs similarity index 74% rename from apps/recorder/src/errors/whatever.rs rename to apps/recorder/src/errors/alias.rs index 2736a63..206839c 100644 --- a/apps/recorder/src/errors/whatever.rs +++ b/apps/recorder/src/errors/alias.rs @@ -1,15 +1,15 @@ use std::fmt::Display; #[derive(Debug)] -pub struct OptionWhateverAsync(Option>); +pub struct OptDynErr(Option>); -impl AsRef for OptionWhateverAsync { +impl AsRef for OptDynErr { fn as_ref(&self) -> &(dyn snafu::Error + 'static) { self } } -impl OptionWhateverAsync { +impl OptDynErr { pub fn some_boxed(e: E) -> Self { Self(Some(Box::new(e))) } @@ -23,7 +23,7 @@ impl OptionWhateverAsync { } } -impl Display for OptionWhateverAsync { +impl Display for OptDynErr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.0 { Some(e) => e.fmt(f), @@ -32,7 +32,7 @@ impl Display for OptionWhateverAsync { } } -impl snafu::Error for OptionWhateverAsync { +impl snafu::Error for OptDynErr { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { None } @@ -42,13 +42,13 @@ impl snafu::Error for OptionWhateverAsync { } } -impl From>> for OptionWhateverAsync { +impl From>> for OptDynErr { fn from(value: Option>) -> Self { Self(value) } } -impl From> for OptionWhateverAsync { +impl From> for OptDynErr { fn from(value: Box) -> Self { Self::some(value) } diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs new file mode 100644 index 0000000..3826bb5 --- /dev/null +++ b/apps/recorder/src/errors/app_error.rs @@ -0,0 +1,202 @@ +use std::borrow::Cow; + +use axum::{ + Json, + response::{IntoResponse, Response}, +}; +use http::StatusCode; +use serde::{Deserialize, Deserializer, Serialize}; +use snafu::Snafu; + +use crate::{ + auth::AuthError, + downloader::DownloaderError, + errors::{OptDynErr, response::StandardErrorResponse}, + fetch::HttpClientError, +}; + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub(crate)))] +pub enum RError { + #[snafu(transparent, context(false))] + FancyRegexError { + #[snafu(source(from(fancy_regex::Error, Box::new)))] + source: Box, + }, + #[snafu(transparent)] + RegexError { source: regex::Error }, + #[snafu(transparent)] + InvalidMethodError { source: http::method::InvalidMethod }, + #[snafu(transparent)] + InvalidHeaderNameError { + source: http::header::InvalidHeaderName, + }, + #[snafu(transparent)] + TracingAppenderInitError { + source: tracing_appender::rolling::InitError, + }, + #[snafu(transparent)] + GraphQLSchemaError { + source: async_graphql::dynamic::SchemaError, + }, + #[snafu(transparent)] + AuthError { source: AuthError }, + #[snafu(transparent)] + DownloadError { source: DownloaderError }, + #[snafu(transparent)] + RSSError { source: rss::Error }, + #[snafu(transparent)] + DotEnvError { source: dotenv::Error }, + #[snafu(transparent)] + TeraError { source: tera::Error }, + #[snafu(transparent)] + IOError { source: std::io::Error }, + #[snafu(transparent)] + DbError { source: sea_orm::DbErr }, + #[snafu(transparent)] + CookieParseError { source: cookie::ParseError }, + #[snafu(transparent, context(false))] + FigmentError { + #[snafu(source(from(figment::Error, Box::new)))] + source: Box, + }, + #[snafu(transparent)] + SerdeJsonError { source: serde_json::Error }, + #[snafu(transparent)] + ReqwestMiddlewareError { source: reqwest_middleware::Error }, + #[snafu(transparent)] + ReqwestError { source: reqwest::Error }, + #[snafu(transparent)] + ParseUrlError { source: url::ParseError }, + #[snafu(display("{source}"), context(false))] + OpenDALError { + #[snafu(source(from(opendal::Error, Box::new)))] + source: Box, + }, + #[snafu(transparent)] + InvalidHeaderValueError { + source: http::header::InvalidHeaderValue, + }, + #[snafu(transparent)] + HttpClientError { source: HttpClientError }, + #[cfg(all(feature = "testcontainers", test))] + #[snafu(transparent)] + TestcontainersError { + source: testcontainers::TestcontainersError, + }, + #[snafu(display("Extract {desc} with mime error, expected {expected}, but got {found}"))] + MimeError { + desc: String, + expected: String, + found: String, + }, + #[snafu(display("Invalid or unknown format in extracting mikan rss"))] + MikanRssInvalidFormatError, + #[snafu(display("Invalid field {field} in extracting mikan rss"))] + MikanRssInvalidFieldError { + field: Cow<'static, str>, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, + #[snafu(display("Missing field {field} in extracting mikan meta"))] + MikanMetaMissingFieldError { + field: Cow<'static, str>, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, + #[snafu(display("Model Entity {entity} not found"))] + ModelEntityNotFound { entity: Cow<'static, str> }, + #[snafu(display("{message}"))] + Whatever { + message: String, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, +} + +impl RError { + pub fn from_mikan_meta_missing_field(field: Cow<'static, str>) -> Self { + Self::MikanMetaMissingFieldError { + field, + source: None.into(), + } + } + + pub fn from_mikan_rss_invalid_field(field: Cow<'static, str>) -> Self { + Self::MikanRssInvalidFieldError { + field, + source: None.into(), + } + } + + pub fn from_mikan_rss_invalid_field_and_source( + field: Cow<'static, str>, + source: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self::MikanRssInvalidFieldError { + field, + source: OptDynErr::some_boxed(source), + } + } + + pub fn from_db_record_not_found(detail: T) -> Self { + Self::DbError { + source: sea_orm::DbErr::RecordNotFound(detail.to_string()), + } + } +} + +impl snafu::FromString for RError { + type Source = Box; + + fn without_source(message: String) -> Self { + Self::Whatever { + message, + source: OptDynErr::none(), + } + } + + fn with_source(source: Self::Source, message: String) -> Self { + Self::Whatever { + message, + source: OptDynErr::some(source), + } + } +} + +impl IntoResponse for RError { + fn into_response(self) -> Response { + match self { + Self::AuthError { source: auth_error } => auth_error.into_response(), + err => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json::(StandardErrorResponse::from(err.to_string())), + ) + .into_response(), + } + } +} + +impl Serialize for RError { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for RError { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(Self::Whatever { + message: s, + source: None.into(), + }) + } +} + +pub type RResult = Result; diff --git a/apps/recorder/src/errors/ext.rs b/apps/recorder/src/errors/ext.rs new file mode 100644 index 0000000..1012d37 --- /dev/null +++ b/apps/recorder/src/errors/ext.rs @@ -0,0 +1,9 @@ +pub trait RAnyhowResultExt: snafu::ResultExt { + fn to_dyn_boxed(self) -> Result>; +} + +impl RAnyhowResultExt for Result { + fn to_dyn_boxed(self) -> Result> { + self.map_err(|e| e.into()) + } +} diff --git a/apps/recorder/src/errors/mod.rs b/apps/recorder/src/errors/mod.rs index 0f0a741..e9e3d21 100644 --- a/apps/recorder/src/errors/mod.rs +++ b/apps/recorder/src/errors/mod.rs @@ -1,217 +1,9 @@ -pub mod whatever; -use std::borrow::Cow; +pub mod alias; +pub mod app_error; +pub mod ext; +pub mod response; -use axum::{ - Json, - response::{IntoResponse, Response}, -}; -use http::StatusCode; -use serde::{Deserialize, Deserializer, Serialize}; -use snafu::prelude::*; -pub use whatever::OptionWhateverAsync; - -use crate::{auth::AuthError, downloader::DownloaderError, fetch::HttpClientError}; - -#[derive(Snafu, Debug)] -#[snafu(visibility(pub(crate)))] -pub enum RError { - #[snafu(transparent, context(false))] - FancyRegexError { - #[snafu(source(from(fancy_regex::Error, Box::new)))] - source: Box, - }, - #[snafu(transparent)] - RegexError { source: regex::Error }, - #[snafu(transparent)] - InvalidMethodError { source: http::method::InvalidMethod }, - #[snafu(transparent)] - InvalidHeaderNameError { - source: http::header::InvalidHeaderName, - }, - #[snafu(transparent)] - TracingAppenderInitError { - source: tracing_appender::rolling::InitError, - }, - #[snafu(transparent)] - GraphQLSchemaError { - source: async_graphql::dynamic::SchemaError, - }, - #[snafu(transparent)] - AuthError { source: AuthError }, - #[snafu(transparent)] - DownloadError { source: DownloaderError }, - #[snafu(transparent)] - RSSError { source: rss::Error }, - #[snafu(transparent)] - DotEnvError { source: dotenv::Error }, - #[snafu(transparent)] - TeraError { source: tera::Error }, - #[snafu(transparent)] - IOError { source: std::io::Error }, - #[snafu(transparent)] - DbError { source: sea_orm::DbErr }, - #[snafu(transparent)] - CookieParseError { source: cookie::ParseError }, - #[snafu(transparent, context(false))] - FigmentError { - #[snafu(source(from(figment::Error, Box::new)))] - source: Box, - }, - #[snafu(transparent)] - SerdeJsonError { source: serde_json::Error }, - #[snafu(transparent)] - ReqwestMiddlewareError { source: reqwest_middleware::Error }, - #[snafu(transparent)] - ReqwestError { source: reqwest::Error }, - #[snafu(transparent)] - ParseUrlError { source: url::ParseError }, - #[snafu(display("{source}"), context(false))] - OpenDALError { - #[snafu(source(from(opendal::Error, Box::new)))] - source: Box, - }, - #[snafu(transparent)] - InvalidHeaderValueError { - source: http::header::InvalidHeaderValue, - }, - #[snafu(transparent)] - HttpClientError { source: HttpClientError }, - #[cfg(all(feature = "testcontainers", test))] - #[snafu(transparent)] - TestcontainersError { - source: testcontainers::TestcontainersError, - }, - #[snafu(display("Extract {desc} with mime error, expected {expected}, but got {found}"))] - MimeError { - desc: String, - expected: String, - found: String, - }, - #[snafu(display("Invalid or unknown format in extracting mikan rss"))] - MikanRssInvalidFormatError, - #[snafu(display("Invalid field {field} in extracting mikan rss"))] - MikanRssInvalidFieldError { - field: Cow<'static, str>, - #[snafu(source(from(Box, OptionWhateverAsync::some)))] - source: OptionWhateverAsync, - }, - #[snafu(display("Missing field {field} in extracting mikan meta"))] - MikanMetaMissingFieldError { - field: Cow<'static, str>, - #[snafu(source(from(Box, OptionWhateverAsync::some)))] - source: OptionWhateverAsync, - }, - #[snafu(display("Model Entity {entity} not found"))] - ModelEntityNotFound { entity: Cow<'static, str> }, - #[snafu(display("{message}"))] - Whatever { - message: String, - #[snafu(source(from(Box, OptionWhateverAsync::some)))] - source: OptionWhateverAsync, - }, -} - -impl RError { - pub fn from_mikan_meta_missing_field(field: Cow<'static, str>) -> Self { - Self::MikanMetaMissingFieldError { - field, - source: None.into(), - } - } - - pub fn from_mikan_rss_invalid_field(field: Cow<'static, str>) -> Self { - Self::MikanRssInvalidFieldError { - field, - source: None.into(), - } - } - - pub fn from_mikan_rss_invalid_field_and_source( - field: Cow<'static, str>, - source: impl std::error::Error + Send + Sync + 'static, - ) -> Self { - Self::MikanRssInvalidFieldError { - field, - source: OptionWhateverAsync::some_boxed(source), - } - } - - pub fn from_db_record_not_found(detail: T) -> Self { - Self::DbError { - source: sea_orm::DbErr::RecordNotFound(detail.to_string()), - } - } -} - -impl snafu::FromString for RError { - type Source = Box; - - fn without_source(message: String) -> Self { - Self::Whatever { - message, - source: OptionWhateverAsync::none(), - } - } - - fn with_source(source: Self::Source, message: String) -> Self { - Self::Whatever { - message, - source: OptionWhateverAsync::some(source), - } - } -} - -#[derive(Serialize, Debug, Clone)] -pub struct StandardErrorResponse { - pub success: bool, - pub message: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, -} - -impl From for StandardErrorResponse { - fn from(value: String) -> Self { - StandardErrorResponse { - success: false, - message: value, - result: None, - } - } -} - -impl IntoResponse for RError { - fn into_response(self) -> Response { - match self { - Self::AuthError { source: auth_error } => auth_error.into_response(), - err => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json::(StandardErrorResponse::from(err.to_string())), - ) - .into_response(), - } - } -} - -impl Serialize for RError { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - -impl<'de> Deserialize<'de> for RError { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - Ok(Self::Whatever { - message: s, - source: None.into(), - }) - } -} - -pub type RResult = Result; +pub use alias::OptDynErr; +pub use app_error::*; +pub use ext::RAnyhowResultExt; +pub use response::StandardErrorResponse; diff --git a/apps/recorder/src/errors/response.rs b/apps/recorder/src/errors/response.rs new file mode 100644 index 0000000..304a8fe --- /dev/null +++ b/apps/recorder/src/errors/response.rs @@ -0,0 +1,19 @@ +use serde::Serialize; + +#[derive(Serialize, Debug, Clone)] +pub struct StandardErrorResponse { + pub success: bool, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, +} + +impl From for StandardErrorResponse { + fn from(value: String) -> Self { + StandardErrorResponse { + success: false, + message: value, + result: None, + } + } +} diff --git a/apps/recorder/src/extract/bittorrent/core.rs b/apps/recorder/src/extract/bittorrent/core.rs new file mode 100644 index 0000000..eb68e65 --- /dev/null +++ b/apps/recorder/src/extract/bittorrent/core.rs @@ -0,0 +1,2 @@ +pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent"; +pub const MAGNET_SCHEMA: &str = "magnet"; \ No newline at end of file diff --git a/apps/recorder/src/extract/torrent/parser.rs b/apps/recorder/src/extract/bittorrent/extract.rs similarity index 99% rename from apps/recorder/src/extract/torrent/parser.rs rename to apps/recorder/src/extract/bittorrent/extract.rs index 05b225b..74d1346 100644 --- a/apps/recorder/src/extract/torrent/parser.rs +++ b/apps/recorder/src/extract/bittorrent/extract.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, whatever}; use crate::{ - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, extract::defs::SUBTITLE_LANG, }; diff --git a/apps/recorder/src/extract/bittorrent/mod.rs b/apps/recorder/src/extract/bittorrent/mod.rs new file mode 100644 index 0000000..5e3877f --- /dev/null +++ b/apps/recorder/src/extract/bittorrent/mod.rs @@ -0,0 +1,6 @@ +pub mod core; +pub mod extract; + +pub use core::{BITTORRENT_MIME_TYPE, MAGNET_SCHEMA}; + +pub use extract::*; diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index 52d83d1..f219991 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -6,7 +6,7 @@ use url::Url; use super::MikanConfig; use crate::{ - errors::RError, + errors::app_error::RError, fetch::{HttpClient, HttpClientTrait, client::HttpClientCookiesAuth}, }; diff --git a/apps/recorder/src/extract/mikan/rss_extract.rs b/apps/recorder/src/extract/mikan/rss_extract.rs index bc0de72..5b1573e 100644 --- a/apps/recorder/src/extract/mikan/rss_extract.rs +++ b/apps/recorder/src/extract/mikan/rss_extract.rs @@ -8,11 +8,13 @@ use tracing::instrument; use url::Url; use crate::{ - downloader::core::BITTORRENT_MIME_TYPE, - errors::{RError, RResult}, - extract::mikan::{ - MikanClient, - web_extract::{MikanEpisodeHomepage, extract_mikan_episode_id_from_homepage}, + errors::app_error::{RError, RResult}, + extract::{ + bittorrent::BITTORRENT_MIME_TYPE, + mikan::{ + MikanClient, + web_extract::{MikanEpisodeHomepage, extract_mikan_episode_id_from_homepage}, + }, }, fetch::bytes::fetch_bytes, }; @@ -338,11 +340,13 @@ mod tests { use url::Url; use crate::{ - downloader::core::BITTORRENT_MIME_TYPE, - errors::RResult, - extract::mikan::{ - MikanBangumiAggregationRssChannel, MikanBangumiRssChannel, MikanRssChannel, - extract_mikan_rss_channel_from_rss_link, + errors::app_error::RResult, + extract::{ + bittorrent::BITTORRENT_MIME_TYPE, + mikan::{ + MikanBangumiAggregationRssChannel, MikanBangumiRssChannel, MikanRssChannel, + extract_mikan_rss_channel_from_rss_link, + }, }, test_utils::mikan::build_testing_mikan_client, }; diff --git a/apps/recorder/src/extract/mikan/web_extract.rs b/apps/recorder/src/extract/mikan/web_extract.rs index 62b463a..1599cbb 100644 --- a/apps/recorder/src/extract/mikan/web_extract.rs +++ b/apps/recorder/src/extract/mikan/web_extract.rs @@ -15,7 +15,7 @@ use super::{ }; use crate::{ app::AppContextTrait, - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, extract::{ html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref}, media::extract_image_src_from_str, diff --git a/apps/recorder/src/extract/mod.rs b/apps/recorder/src/extract/mod.rs index 269a242..4e0edf7 100644 --- a/apps/recorder/src/extract/mod.rs +++ b/apps/recorder/src/extract/mod.rs @@ -4,4 +4,4 @@ pub mod http; pub mod media; pub mod mikan; pub mod rawname; -pub mod torrent; +pub mod bittorrent; diff --git a/apps/recorder/src/extract/rawname/parser.rs b/apps/recorder/src/extract/rawname/parser.rs index 38475af..146fc66 100644 --- a/apps/recorder/src/extract/rawname/parser.rs +++ b/apps/recorder/src/extract/rawname/parser.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use snafu::whatever; use crate::{ - errors::RResult, + errors::app_error::RResult, extract::defs::{DIGIT_1PLUS_REG, ZH_NUM_MAP, ZH_NUM_RE}, }; diff --git a/apps/recorder/src/extract/torrent/mod.rs b/apps/recorder/src/extract/torrent/mod.rs deleted file mode 100644 index 5feedaf..0000000 --- a/apps/recorder/src/extract/torrent/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod parser; - -pub use parser::*; diff --git a/apps/recorder/src/fetch/bytes.rs b/apps/recorder/src/fetch/bytes.rs index 500b882..99b630d 100644 --- a/apps/recorder/src/fetch/bytes.rs +++ b/apps/recorder/src/fetch/bytes.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use reqwest::IntoUrl; use super::client::HttpClientTrait; -use crate::errors::RError; +use crate::errors::app_error::RError; pub async fn fetch_bytes( client: &H, diff --git a/apps/recorder/src/fetch/client/secrecy.rs b/apps/recorder/src/fetch/client/secrecy.rs index 2df2606..98292d3 100644 --- a/apps/recorder/src/fetch/client/secrecy.rs +++ b/apps/recorder/src/fetch/client/secrecy.rs @@ -4,7 +4,7 @@ use cookie::Cookie; use reqwest::{ClientBuilder, cookie::Jar}; use url::Url; -use crate::errors::RError; +use crate::errors::app_error::RError; pub trait HttpClientSecrecyDataTrait { fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder { diff --git a/apps/recorder/src/fetch/html.rs b/apps/recorder/src/fetch/html.rs index fa6bcbb..460bc3a 100644 --- a/apps/recorder/src/fetch/html.rs +++ b/apps/recorder/src/fetch/html.rs @@ -1,7 +1,7 @@ use reqwest::IntoUrl; use super::client::HttpClientTrait; -use crate::errors::RError; +use crate::errors::app_error::RError; pub async fn fetch_html( client: &H, diff --git a/apps/recorder/src/fetch/image.rs b/apps/recorder/src/fetch/image.rs index 69f54f9..c9917e7 100644 --- a/apps/recorder/src/fetch/image.rs +++ b/apps/recorder/src/fetch/image.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use reqwest::IntoUrl; use super::{bytes::fetch_bytes, client::HttpClientTrait}; -use crate::errors::RError; +use crate::errors::app_error::RError; pub async fn fetch_image( client: &H, diff --git a/apps/recorder/src/graphql/service.rs b/apps/recorder/src/graphql/service.rs index f658366..76ef3a2 100644 --- a/apps/recorder/src/graphql/service.rs +++ b/apps/recorder/src/graphql/service.rs @@ -2,7 +2,7 @@ use async_graphql::dynamic::Schema; use sea_orm::DatabaseConnection; use super::{config::GraphQLConfig, schema_root}; -use crate::errors::RResult; +use crate::errors::app_error::RResult; #[derive(Debug)] pub struct GraphQLService { diff --git a/apps/recorder/src/lib.rs b/apps/recorder/src/lib.rs index cf7bd39..1edcde9 100644 --- a/apps/recorder/src/lib.rs +++ b/apps/recorder/src/lib.rs @@ -8,6 +8,7 @@ let_chains, error_generic_member_access )] +#![feature(associated_type_defaults)] pub mod app; pub mod auth; diff --git a/apps/recorder/src/logger/service.rs b/apps/recorder/src/logger/service.rs index 50a6ea6..1c0f7c9 100644 --- a/apps/recorder/src/logger/service.rs +++ b/apps/recorder/src/logger/service.rs @@ -10,7 +10,7 @@ use tracing_subscriber::{ }; use super::{LogFormat, LogLevel, LogRotation, LoggerConfig}; -use crate::errors::RResult; +use crate::errors::app_error::RResult; // Function to initialize the logger based on the provided configuration const MODULE_WHITELIST: &[&str] = &["sea_orm_migration", "tower_http", "sqlx::query", "sidekiq"]; diff --git a/apps/recorder/src/models/auth.rs b/apps/recorder/src/models/auth.rs index 3952160..b5fc6d2 100644 --- a/apps/recorder/src/models/auth.rs +++ b/apps/recorder/src/models/auth.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use super::subscribers::{self, SEED_SUBSCRIBER}; use crate::{ app::AppContextTrait, - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, }; #[derive( diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index cf910c4..c7643a8 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -4,7 +4,7 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::O use serde::{Deserialize, Serialize}; use super::subscription_bangumi; -use crate::{app::AppContextTrait, errors::RResult}; +use crate::{app::AppContextTrait, errors::app_error::RResult}; #[derive( Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject, diff --git a/apps/recorder/src/models/episodes.rs b/apps/recorder/src/models/episodes.rs index cb96ddc..756093a 100644 --- a/apps/recorder/src/models/episodes.rs +++ b/apps/recorder/src/models/episodes.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use super::{bangumi, query::InsertManyReturningExt, subscription_episode}; use crate::{ app::AppContextTrait, - errors::RResult, + errors::app_error::RResult, extract::{ mikan::{MikanEpisodeMeta, build_mikan_episode_homepage}, rawname::parse_episode_meta_from_raw_name, diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index 658f5cc..9a5a058 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::{ app::AppContextTrait, - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, }; pub const SEED_SUBSCRIBER: &str = "konobangu"; diff --git a/apps/recorder/src/models/subscriptions.rs b/apps/recorder/src/models/subscriptions.rs index 7033140..e3878d2 100644 --- a/apps/recorder/src/models/subscriptions.rs +++ b/apps/recorder/src/models/subscriptions.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use super::{bangumi, episodes, query::filter_values_in}; use crate::{ app::AppContextTrait, - errors::RResult, + errors::app_error::RResult, extract::{ mikan::{ build_mikan_bangumi_homepage, build_mikan_bangumi_rss_link, diff --git a/apps/recorder/src/models/tasks.rs b/apps/recorder/src/models/tasks.rs index bd62d45..ed6c0c4 100644 --- a/apps/recorder/src/models/tasks.rs +++ b/apps/recorder/src/models/tasks.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use sea_orm::{QuerySelect, entity::prelude::*}; use serde::{Deserialize, Serialize}; -use crate::{app::AppContextTrait, errors::RResult}; +use crate::{app::AppContextTrait, errors::app_error::RResult}; #[derive( Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, diff --git a/apps/recorder/src/storage/client.rs b/apps/recorder/src/storage/client.rs index 22a017f..5131d97 100644 --- a/apps/recorder/src/storage/client.rs +++ b/apps/recorder/src/storage/client.rs @@ -8,7 +8,7 @@ use url::Url; use uuid::Uuid; use super::StorageConfig; -use crate::errors::{RError, RResult}; +use crate::errors::app_error::{RError, RResult}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] diff --git a/apps/recorder/src/tasks/core.rs b/apps/recorder/src/tasks/core.rs index 0c25694..913ea59 100644 --- a/apps/recorder/src/tasks/core.rs +++ b/apps/recorder/src/tasks/core.rs @@ -7,7 +7,7 @@ use tokio::sync::{RwLock, mpsc}; use crate::{ app::AppContextTrait, - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, models, }; diff --git a/apps/recorder/src/tasks/mikan/extract_mikan_bangumis_meta_from_my_bangumi.rs b/apps/recorder/src/tasks/mikan/extract_mikan_bangumis_meta_from_my_bangumi.rs index dd37b9d..7d712dd 100644 --- a/apps/recorder/src/tasks/mikan/extract_mikan_bangumis_meta_from_my_bangumi.rs +++ b/apps/recorder/src/tasks/mikan/extract_mikan_bangumis_meta_from_my_bangumi.rs @@ -6,7 +6,7 @@ use url::Url; use crate::{ app::AppContextTrait, - errors::RResult, + errors::app_error::RResult, extract::mikan::{MikanAuthSecrecy, MikanBangumiMeta, web_extract}, tasks::core::{StandardStreamTaskReplayLayout, StreamTaskRunnerTrait}, }; diff --git a/apps/recorder/src/test_utils/fetch.rs b/apps/recorder/src/test_utils/fetch.rs index 0e57f3c..6ea0908 100644 --- a/apps/recorder/src/test_utils/fetch.rs +++ b/apps/recorder/src/test_utils/fetch.rs @@ -1,4 +1,4 @@ -use crate::{errors::RResult, fetch::HttpClient}; +use crate::{errors::app_error::RResult, fetch::HttpClient}; pub fn build_testing_http_client() -> RResult { let mikan_client = HttpClient::default(); diff --git a/apps/recorder/src/test_utils/mikan.rs b/apps/recorder/src/test_utils/mikan.rs index 09803ba..7ff8d9f 100644 --- a/apps/recorder/src/test_utils/mikan.rs +++ b/apps/recorder/src/test_utils/mikan.rs @@ -1,7 +1,7 @@ use reqwest::IntoUrl; use crate::{ - errors::RResult, + errors::app_error::RResult, extract::mikan::{MikanClient, MikanConfig}, fetch::HttpClientConfig, }; diff --git a/apps/recorder/src/web/controller/graphql/mod.rs b/apps/recorder/src/web/controller/graphql/mod.rs index 8fdd5be..8adcb8d 100644 --- a/apps/recorder/src/web/controller/graphql/mod.rs +++ b/apps/recorder/src/web/controller/graphql/mod.rs @@ -7,7 +7,7 @@ use super::core::Controller; use crate::{ app::AppContextTrait, auth::{AuthUserInfo, header_www_authenticate_middleware}, - errors::RResult, + errors::app_error::RResult, }; pub const CONTROLLER_PREFIX: &str = "/api/graphql"; diff --git a/apps/recorder/src/web/controller/metadata/mod.rs b/apps/recorder/src/web/controller/metadata/mod.rs index 6c95dd7..0486a9a 100644 --- a/apps/recorder/src/web/controller/metadata/mod.rs +++ b/apps/recorder/src/web/controller/metadata/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use axum::{Json, Router, extract::State, routing::get}; use serde::Serialize; -use crate::{app::AppContextTrait, errors::RResult, web::controller::Controller}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::controller::Controller}; pub const CONTROLLER_PREFIX: &str = "/api/metadata"; diff --git a/apps/recorder/src/web/controller/oidc/mod.rs b/apps/recorder/src/web/controller/oidc/mod.rs index b8d4b35..b0497e0 100644 --- a/apps/recorder/src/web/controller/oidc/mod.rs +++ b/apps/recorder/src/web/controller/oidc/mod.rs @@ -16,7 +16,7 @@ use crate::{ errors::OidcRequestRedirectUriSnafu, oidc::{OidcAuthCallbackPayload, OidcAuthCallbackQuery, OidcAuthRequest}, }, - errors::RResult, + errors::app_error::RResult, extract::http::ForwardedRelatedInfo, models::auth::AuthType, }; diff --git a/apps/recorder/src/web/middleware/catch_panic.rs b/apps/recorder/src/web/middleware/catch_panic.rs index 21108f7..83f649a 100644 --- a/apps/recorder/src/web/middleware/catch_panic.rs +++ b/apps/recorder/src/web/middleware/catch_panic.rs @@ -12,7 +12,7 @@ use http::StatusCode; use serde::{Deserialize, Serialize}; use tower_http::catch_panic::CatchPanicLayer; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct CatchPanic { diff --git a/apps/recorder/src/web/middleware/compression.rs b/apps/recorder/src/web/middleware/compression.rs index a9d539f..e9daf29 100644 --- a/apps/recorder/src/web/middleware/compression.rs +++ b/apps/recorder/src/web/middleware/compression.rs @@ -11,7 +11,7 @@ use axum::Router; use serde::{Deserialize, Serialize}; use tower_http::compression::CompressionLayer; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Compression { diff --git a/apps/recorder/src/web/middleware/cors.rs b/apps/recorder/src/web/middleware/cors.rs index 565b0f9..6a9aa97 100644 --- a/apps/recorder/src/web/middleware/cors.rs +++ b/apps/recorder/src/web/middleware/cors.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tower_http::cors::{self, Any}; -use crate::{app::AppContextTrait, web::middleware::MiddlewareLayer, errors::RResult}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; /// CORS middleware configuration #[derive(Debug, Clone, Deserialize, Serialize)] @@ -157,7 +157,10 @@ impl MiddlewareLayer for Cors { } /// Applies the CORS middleware layer to the Axum router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(self.cors()?)) } } diff --git a/apps/recorder/src/web/middleware/etag.rs b/apps/recorder/src/web/middleware/etag.rs index f61ace3..7360be1 100644 --- a/apps/recorder/src/web/middleware/etag.rs +++ b/apps/recorder/src/web/middleware/etag.rs @@ -25,7 +25,7 @@ use futures_util::future::BoxFuture; use serde::{Deserialize, Serialize}; use tower::{Layer, Service}; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Etag { diff --git a/apps/recorder/src/web/middleware/format.rs b/apps/recorder/src/web/middleware/format.rs index 0a7c5a0..02c0434 100644 --- a/apps/recorder/src/web/middleware/format.rs +++ b/apps/recorder/src/web/middleware/format.rs @@ -8,7 +8,7 @@ use axum::{ }; use serde::{Deserialize, Serialize}; -use crate::errors::RError as Error; +use crate::errors::app_error::RError as Error; #[derive(Debug, Deserialize, Serialize)] pub struct Format(pub RespondTo); diff --git a/apps/recorder/src/web/middleware/logger.rs b/apps/recorder/src/web/middleware/logger.rs index d900e20..e1f54eb 100644 --- a/apps/recorder/src/web/middleware/logger.rs +++ b/apps/recorder/src/web/middleware/logger.rs @@ -15,7 +15,7 @@ use tower_http::{add_extension::AddExtensionLayer, trace::TraceLayer}; use crate::{ app::{AppContextTrait, Environment}, - errors::RResult, + errors::app_error::RResult, web::middleware::{MiddlewareLayer, request_id::LocoRequestId}, }; diff --git a/apps/recorder/src/web/middleware/mod.rs b/apps/recorder/src/web/middleware/mod.rs index 165aad9..bd7418b 100644 --- a/apps/recorder/src/web/middleware/mod.rs +++ b/apps/recorder/src/web/middleware/mod.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use axum::Router; use serde::{Deserialize, Serialize}; -use crate::{app::AppContextTrait, errors::RResult}; +use crate::{app::AppContextTrait, errors::app_error::RResult}; /// Trait representing the behavior of middleware components in the application. /// When implementing a new middleware, make sure to go over this checklist: diff --git a/apps/recorder/src/web/middleware/remote_ip.rs b/apps/recorder/src/web/middleware/remote_ip.rs index d7f474f..45af9d4 100644 --- a/apps/recorder/src/web/middleware/remote_ip.rs +++ b/apps/recorder/src/web/middleware/remote_ip.rs @@ -33,7 +33,7 @@ use tracing::error; use crate::{ app::AppContextTrait, - errors::{RError, RResult}, + errors::app_error::{RError, RResult}, web::middleware::MiddlewareLayer, }; diff --git a/apps/recorder/src/web/middleware/request_id.rs b/apps/recorder/src/web/middleware/request_id.rs index 240af4e..d2299f8 100644 --- a/apps/recorder/src/web/middleware/request_id.rs +++ b/apps/recorder/src/web/middleware/request_id.rs @@ -11,13 +11,15 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, web::middleware::MiddlewareLayer}; const X_REQUEST_ID: &str = "x-request-id"; const MAX_LEN: usize = 255; use std::sync::{Arc, OnceLock}; +use crate::errors::app_error::RResult; + static ID_CLEANUP: OnceLock = OnceLock::new(); fn get_id_cleanup() -> &'static Regex { diff --git a/apps/recorder/src/web/middleware/secure_headers.rs b/apps/recorder/src/web/middleware/secure_headers.rs index 8139c5f..b405b7e 100644 --- a/apps/recorder/src/web/middleware/secure_headers.rs +++ b/apps/recorder/src/web/middleware/secure_headers.rs @@ -21,7 +21,7 @@ use serde_json::{self, json}; use snafu::whatever; use tower::{Layer, Service}; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; static PRESETS: OnceLock>> = OnceLock::new(); fn get_presets() -> &'static HashMap> { diff --git a/apps/recorder/src/web/middleware/timeout.rs b/apps/recorder/src/web/middleware/timeout.rs index 69b9f36..d0f8285 100644 --- a/apps/recorder/src/web/middleware/timeout.rs +++ b/apps/recorder/src/web/middleware/timeout.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tower_http::timeout::TimeoutLayer; -use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::app_error::RResult, web::middleware::MiddlewareLayer}; /// Timeout middleware configuration #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/apps/recorder/temp/clippy/rustrover/.rustc_info.json b/apps/recorder/temp/clippy/rustrover/.rustc_info.json deleted file mode 100644 index 8d7e647..0000000 --- a/apps/recorder/temp/clippy/rustrover/.rustc_info.json +++ /dev/null @@ -1 +0,0 @@ -{"rustc_fingerprint":12631718921104437280,"outputs":{"9566862992471862046":{"success":true,"status":"","code":0,"stdout":"___.exe\nlib___.rlib\n___.dll\n___.dll\n___.lib\n___.dll\nC:\\code\\scoop\\persist\\rustup\\.rustup\\toolchains\\nightly-x86_64-pc-windows-msvc\npacked\n___\ndebug_assertions\nfmt_debug=\"full\"\noverflow_checks\npanic=\"unwind\"\nproc_macro\nrelocation_model=\"pic\"\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"msvc\"\ntarget_family=\"windows\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"lahfsahf\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"x87\"\ntarget_has_atomic\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"128\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"windows\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"pc\"\nub_checks\nwindows\n","stderr":""},"5537925964935398022":{"success":true,"status":"","code":0,"stdout":"rustc 1.86.0-nightly (43ca9d18e 2025-02-08)\nbinary: rustc\ncommit-hash: 43ca9d18e333797f0aa3b525501a7cec8d61a96b\ncommit-date: 2025-02-08\nhost: x86_64-pc-windows-msvc\nrelease: 1.86.0-nightly\nLLVM version: 19.1.7\n","stderr":""}},"successes":{}} \ No newline at end of file diff --git a/apps/recorder/temp/clippy/rustrover/CACHEDIR.TAG b/apps/recorder/temp/clippy/rustrover/CACHEDIR.TAG deleted file mode 100644 index 20d7c31..0000000 --- a/apps/recorder/temp/clippy/rustrover/CACHEDIR.TAG +++ /dev/null @@ -1,3 +0,0 @@ -Signature: 8a477f597d28d172789f06886806bc55 -# This file is a cache directory tag created by cargo. -# For information about cache directory tags see https://bford.info/cachedir/ diff --git a/.devcontainer.bk/Dockerfile b/archive/.devcontainer/Dockerfile similarity index 100% rename from .devcontainer.bk/Dockerfile rename to archive/.devcontainer/Dockerfile diff --git a/.devcontainer.bk/devcontainer.json b/archive/.devcontainer/devcontainer.json similarity index 100% rename from .devcontainer.bk/devcontainer.json rename to archive/.devcontainer/devcontainer.json diff --git a/.devcontainer.bk/docker-compose.yml b/archive/.devcontainer/docker-compose.yml similarity index 100% rename from .devcontainer.bk/docker-compose.yml rename to archive/.devcontainer/docker-compose.yml diff --git a/packages/testing-torrents/main.ts b/packages/testing-torrents/main.ts index b5e0784..a96404f 100644 --- a/packages/testing-torrents/main.ts +++ b/packages/testing-torrents/main.ts @@ -84,7 +84,7 @@ async function generateMockFile(filePath: string, size: number) { await fsp.truncate(filePath, size); } -// Generate torrent file +// Generate bittorrent file function generateTorrent(folderPath: string, torrentPath: string) { return new Promise((resolve, reject) => { createTorrent( @@ -113,7 +113,7 @@ function generateTorrent(folderPath: string, torrentPath: string) { }); } -// Add torrent and seed +// Add bittorrent and seed async function seedTorrent(torrentPath: string): Promise { return new Promise((resolve) => { const torrent = webTorrent.seed( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ffee463..6f78665 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5515,7 +5515,7 @@ packages: engines: {node: '>=6'} terminal-link@2.1.1: - resolution: {integrity: sha512-un0FmiRUQNr5PJqy9kP7c40F5BOfpGlYTrxonDChEZB7pzZxRNp/bt+ymiy9/npwXya9KH99nJ/GXFIiUkYGFQ==} + resolution: {integrity: sha512-un0FmiRUQNr5PJqy9kP7c40F5BOfpGlYTrxonDChEZB7pzZxRNp/bittorrent+ymiy9/npwXya9KH99nJ/GXFIiUkYGFQ==} engines: {node: '>=8'} terser-webpack-plugin@5.3.13: diff --git a/rustfmt.toml b/rustfmt.toml index fe826bc..7ee641f 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -6,3 +6,4 @@ use_small_heuristics = "Default" group_imports = "StdExternalCrate" format_strings = true tab_spaces = 4 +reorder_imports = true