refactor: split modules

This commit is contained in:
2025-04-08 02:12:06 +08:00
parent 376d2b28d3
commit 2686fa1d76
94 changed files with 1125 additions and 580 deletions

View File

@@ -0,0 +1,49 @@
[package]
name = "downloader"
version = "0.1.0"
edition = "2024"
[features]
default = []
testcontainers = [
"dep:testcontainers",
"dep:testcontainers-modules",
"dep:testcontainers-ext",
"dep:testing-torrents",
]
[dependencies]
futures = { workspace = true }
testcontainers = { workspace = true, optional = true }
testcontainers-modules = { workspace = true, optional = true }
testcontainers-ext = { workspace = true, optional = true }
tokio = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
snafu = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
anyhow = { workspace = true }
quirks_path = { workspace = true }
itertools = { workspace = true }
chrono = { workspace = true }
bytes = { workspace = true }
serde-value = "0.7"
qbit-rs = { git = "https://github.com/lonelyhentxi/qbit.git", rev = "72d53138ebe", features = [
"default",
"builder",
] }
merge-struct = "0.1"
librqbit-core = "4"
librqbit = { version = "8", features = ["async-bt", "watch"] }
util = { workspace = true }
testing-torrents = { workspace = true, optional = true }
fetch = { workspace = true }
[dev-dependencies]
reqwest = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@@ -0,0 +1,2 @@
pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent";
pub const MAGNET_SCHEMA: &str = "magnet";

View File

@@ -0,0 +1,74 @@
use async_trait::async_trait;
use crate::{
DownloaderError,
bittorrent::task::{
TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, TorrentTaskTrait,
},
core::{DownloadIdSelectorTrait, DownloadSelectorTrait, DownloadTaskTrait, DownloaderTrait},
};
#[async_trait]
pub trait TorrentDownloaderTrait: DownloaderTrait
where
Self::State: TorrentStateTrait,
Self::Id: TorrentHashTrait,
Self::Task: TorrentTaskTrait<State = Self::State, Id = Self::Id>,
Self::Creation: TorrentCreationTrait<Task = Self::Task>,
Self::Selector: DownloadSelectorTrait<Task = Self::Task, Id = Self::Id>,
{
type IdSelector: DownloadIdSelectorTrait<Task = Self::Task, Id = Self::Id>;
async fn pause_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<Self::IdSelector, DownloaderError> {
let hashes = <Self as TorrentDownloaderTrait>::query_torrent_hashes(self, selector).await?;
self.pause_torrents(hashes).await
}
async fn resume_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<Self::IdSelector, DownloaderError> {
let hashes = <Self as TorrentDownloaderTrait>::query_torrent_hashes(self, selector).await?;
self.resume_torrents(hashes).await
}
async fn remove_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<Self::IdSelector, DownloaderError> {
let hashes = <Self as TorrentDownloaderTrait>::query_torrent_hashes(self, selector).await?;
self.remove_torrents(hashes).await
}
async fn query_torrent_hashes(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<Self::IdSelector, DownloaderError> {
let hashes = match selector.try_into_ids_only() {
Ok(hashes) => Self::IdSelector::from_iter(hashes),
Err(selector) => {
let tasks = self.query_downloads(selector).await?;
Self::IdSelector::from_iter(tasks.into_iter().map(|s| s.into_id()))
}
};
Ok(hashes)
}
async fn pause_torrents(
&self,
hashes: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError>;
async fn resume_torrents(
&self,
hashes: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError>;
async fn remove_torrents(
&self,
hashes: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError>;
}

View File

@@ -0,0 +1,6 @@
pub mod defs;
pub mod downloader;
pub mod source;
pub mod task;
pub use defs::{BITTORRENT_MIME_TYPE, MAGNET_SCHEMA};

View File

@@ -0,0 +1,224 @@
use std::{
borrow::Cow,
fmt::{Debug, Formatter},
};
use bytes::Bytes;
use fetch::{bytes::fetch_bytes, client::core::HttpClientTrait};
use librqbit_core::{magnet::Magnet, torrent_metainfo, torrent_metainfo::TorrentMetaV1Owned};
use snafu::ResultExt;
use url::Url;
use util::errors::AnyhowResultExt;
use super::defs::MAGNET_SCHEMA;
use crate::errors::{DownloadFetchSnafu, DownloaderError, MagnetFormatSnafu, TorrentMetaSnafu};
pub trait HashTorrentSourceTrait: Sized {
fn hash_info(&self) -> Cow<'_, str>;
}
pub struct MagnetUrlSource {
pub magnet: Magnet,
pub url: String,
}
impl MagnetUrlSource {
pub fn from_url(url: String) -> Result<Self, DownloaderError> {
let magnet = Magnet::parse(&url)
.to_dyn_boxed()
.context(MagnetFormatSnafu {
message: url.clone(),
})?;
Ok(Self { magnet, url })
}
}
impl HashTorrentSourceTrait for MagnetUrlSource {
fn hash_info(&self) -> Cow<'_, str> {
let hash_info = self
.magnet
.as_id32()
.map(|s| s.as_string())
.or_else(|| self.magnet.as_id20().map(|s| s.as_string()))
.unwrap_or_else(|| unreachable!("hash of magnet must existed"));
hash_info.into()
}
}
impl Debug for MagnetUrlSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MagnetUrlSource")
.field("url", &self.url)
.finish()
}
}
impl Clone for MagnetUrlSource {
fn clone(&self) -> Self {
Self {
magnet: Magnet::parse(&self.url).unwrap(),
url: self.url.clone(),
}
}
}
impl PartialEq for MagnetUrlSource {
fn eq(&self, other: &Self) -> bool {
self.url == other.url
}
}
impl Eq for MagnetUrlSource {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TorrentUrlSource {
pub url: String,
}
impl TorrentUrlSource {
pub fn from_url(url: String) -> Result<Self, DownloaderError> {
Ok(Self { url })
}
}
#[derive(Clone)]
pub struct TorrentFileSource {
pub url: Option<String>,
pub payload: Bytes,
pub meta: Box<TorrentMetaV1Owned>,
pub filename: String,
}
impl TorrentFileSource {
pub fn from_bytes(
filename: String,
bytes: Bytes,
url: Option<String>,
) -> Result<Self, DownloaderError> {
let meta = torrent_metainfo::torrent_from_bytes(bytes.as_ref())
.to_dyn_boxed()
.with_context(|_| TorrentMetaSnafu {
message: format!(
"filename = {}, url = {}",
filename,
url.as_deref().unwrap_or_default()
),
})?
.to_owned();
Ok(TorrentFileSource {
url,
payload: bytes,
meta: Box::new(meta),
filename,
})
}
pub async fn from_url_and_http_client(
client: &impl HttpClientTrait,
url: String,
) -> Result<TorrentFileSource, DownloaderError> {
let payload = fetch_bytes(client, &url)
.await
.boxed()
.with_context(|_| DownloadFetchSnafu { url: url.clone() })?;
let filename = Url::parse(&url)
.boxed()
.and_then(|s| {
s.path_segments()
.and_then(|mut p| p.next_back())
.map(String::from)
.ok_or_else(|| anyhow::anyhow!("invalid url"))
.to_dyn_boxed()
})
.with_context(|_| DownloadFetchSnafu { url: url.clone() })?;
Self::from_bytes(filename, payload, Some(url))
}
}
impl HashTorrentSourceTrait for TorrentFileSource {
fn hash_info(&self) -> Cow<'_, str> {
self.meta.info_hash.as_string().into()
}
}
impl Debug for TorrentFileSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TorrentFileSource")
.field("hash", &self.meta.info_hash.as_string())
.finish()
}
}
#[derive(Clone, Debug)]
pub enum UrlTorrentSource {
MagnetUrl(MagnetUrlSource),
TorrentUrl(TorrentUrlSource),
}
impl UrlTorrentSource {
pub fn from_url(url: String) -> Result<Self, DownloaderError> {
let url_ = Url::parse(&url)?;
let source = if url_.scheme() == MAGNET_SCHEMA {
Self::from_magnet_url(url)?
} else {
Self::from_torrent_url(url)?
};
Ok(source)
}
pub fn from_magnet_url(url: String) -> Result<Self, DownloaderError> {
let magnet_source = MagnetUrlSource::from_url(url)?;
Ok(Self::MagnetUrl(magnet_source))
}
pub fn from_torrent_url(url: String) -> Result<Self, DownloaderError> {
let torrent_source = TorrentUrlSource::from_url(url)?;
Ok(Self::TorrentUrl(torrent_source))
}
}
#[derive(Debug, Clone)]
pub enum HashTorrentSource {
MagnetUrl(MagnetUrlSource),
TorrentFile(TorrentFileSource),
}
impl HashTorrentSource {
pub async fn from_url_and_http_client(
client: &impl HttpClientTrait,
url: String,
) -> Result<Self, DownloaderError> {
let url_ = Url::parse(&url)?;
let source = if url_.scheme() == MAGNET_SCHEMA {
Self::from_magnet_url(url)?
} else {
Self::from_torrent_url_and_http_client(client, url).await?
};
Ok(source)
}
pub fn from_magnet_url(url: String) -> Result<Self, DownloaderError> {
let magnet_source = MagnetUrlSource::from_url(url)?;
Ok(Self::MagnetUrl(magnet_source))
}
pub async fn from_torrent_url_and_http_client(
client: &impl HttpClientTrait,
url: String,
) -> Result<Self, DownloaderError> {
let torrent_source = TorrentFileSource::from_url_and_http_client(client, url).await?;
Ok(Self::TorrentFile(torrent_source))
}
}
impl HashTorrentSourceTrait for HashTorrentSource {
fn hash_info(&self) -> Cow<'_, str> {
match self {
HashTorrentSource::MagnetUrl(m) => m.hash_info(),
HashTorrentSource::TorrentFile(t) => t.hash_info(),
}
}
}

View File

@@ -0,0 +1,43 @@
use std::{borrow::Cow, hash::Hash};
use quirks_path::{Path, PathBuf};
use crate::{
bittorrent::source::HashTorrentSource,
core::{DownloadCreationTrait, DownloadIdTrait, DownloadStateTrait, DownloadTaskTrait},
};
pub const TORRENT_TAG_NAME: &str = "konobangu";
pub trait TorrentHashTrait: DownloadIdTrait + Send + Hash {}
pub type SimpleTorrentHash = String;
impl DownloadIdTrait for SimpleTorrentHash {}
impl TorrentHashTrait for SimpleTorrentHash {}
pub trait TorrentStateTrait: DownloadStateTrait {}
pub trait TorrentTaskTrait: DownloadTaskTrait
where
Self::State: TorrentStateTrait,
Self::Id: TorrentHashTrait,
{
fn hash_info(&self) -> &str;
fn name(&self) -> Cow<'_, str> {
Cow::Borrowed(self.hash_info())
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>>;
fn category(&self) -> Option<Cow<'_, str>>;
}
pub trait TorrentCreationTrait: DownloadCreationTrait {
fn save_path(&self) -> &Path;
fn save_path_mut(&mut self) -> &mut PathBuf;
fn sources_mut(&mut self) -> &mut Vec<HashTorrentSource>;
}

View File

@@ -0,0 +1,218 @@
use std::{
any::Any, borrow::Cow, fmt::Debug, hash::Hash, marker::PhantomData, ops::Deref, time::Duration,
vec::IntoIter,
};
use async_trait::async_trait;
use super::DownloaderError;
pub trait DownloadStateTrait: Sized + Debug {}
pub trait DownloadIdTrait: Hash + Sized + Clone + Send + Debug {}
pub trait DownloadTaskTrait: Sized + Send + Debug {
type State: DownloadStateTrait;
type Id: DownloadIdTrait;
fn id(&self) -> &Self::Id;
fn into_id(self) -> Self::Id;
fn name(&self) -> Cow<'_, str>;
fn speed(&self) -> Option<u64>;
fn state(&self) -> &Self::State;
fn dl_bytes(&self) -> Option<u64>;
fn total_bytes(&self) -> Option<u64>;
fn left_bytes(&self) -> Option<u64> {
if let (Some(tt), Some(dl)) = (self.total_bytes(), self.dl_bytes()) {
tt.checked_sub(dl)
} else {
None
}
}
fn et(&self) -> Option<Duration>;
fn eta(&self) -> Option<Duration> {
if let (Some(left_bytes), Some(speed)) = (self.left_bytes(), self.speed()) {
if speed > 0 {
Some(Duration::from_secs_f64(left_bytes as f64 / speed as f64))
} else {
None
}
} else {
None
}
}
fn average_speed(&self) -> Option<f64> {
if let (Some(et), Some(dl_bytes)) = (self.et(), self.dl_bytes()) {
let secs = et.as_secs_f64();
if secs > 0.0 {
Some(dl_bytes as f64 / secs)
} else {
None
}
} else {
None
}
}
fn progress(&self) -> Option<f32> {
if let (Some(dl), Some(tt)) = (self.dl_bytes(), self.total_bytes()) {
if dl > 0 {
if tt > 0 {
Some(dl as f32 / tt as f32)
} else {
None
}
} else {
Some(0.0)
}
} else {
None
}
}
}
pub trait DownloadCreationTrait: Sized {
type Task: DownloadTaskTrait;
}
pub trait DownloadSelectorTrait: Sized + Any + Send {
type Id: DownloadIdTrait;
type Task: DownloadTaskTrait<Id = Self::Id>;
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
Err(self)
}
}
pub trait DownloadIdSelectorTrait:
DownloadSelectorTrait
+ IntoIterator<Item = Self::Id>
+ FromIterator<Self::Id>
+ Into<Vec<Self::Id>>
+ From<Vec<Self::Id>>
{
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
Ok(Vec::from_iter(self))
}
fn from_id(id: Self::Id) -> Self;
}
#[derive(Debug)]
pub struct DownloadIdSelector<Task>
where
Task: DownloadTaskTrait,
{
pub ids: Vec<Task::Id>,
pub marker: PhantomData<Task>,
}
impl<Task> Deref for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait,
{
type Target = Vec<Task::Id>;
fn deref(&self) -> &Self::Target {
&self.ids
}
}
impl<Task> IntoIterator for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait,
{
type Item = Task::Id;
type IntoIter = IntoIter<Task::Id>;
fn into_iter(self) -> Self::IntoIter {
self.ids.into_iter()
}
}
impl<Task> FromIterator<Task::Id> for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait,
{
fn from_iter<T: IntoIterator<Item = Task::Id>>(iter: T) -> Self {
Self {
ids: Vec::from_iter(iter),
marker: PhantomData,
}
}
}
impl<Task> DownloadSelectorTrait for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait + 'static,
{
type Id = Task::Id;
type Task = Task;
}
impl<Task> From<Vec<Task::Id>> for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait + 'static,
{
fn from(value: Vec<Task::Id>) -> Self {
Self {
ids: value,
marker: PhantomData,
}
}
}
impl<Task> From<DownloadIdSelector<Task>> for Vec<Task::Id>
where
Task: DownloadTaskTrait + 'static,
{
fn from(value: DownloadIdSelector<Task>) -> Self {
value.ids
}
}
impl<Task> DownloadIdSelectorTrait for DownloadIdSelector<Task>
where
Task: DownloadTaskTrait + 'static,
{
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
Ok(self.ids)
}
fn from_id(id: Self::Id) -> Self {
Self {
ids: vec![id],
marker: PhantomData,
}
}
}
#[async_trait]
pub trait DownloaderTrait {
type State: DownloadStateTrait;
type Id: DownloadIdTrait;
type Task: DownloadTaskTrait<State = Self::State, Id = Self::Id>;
type Creation: DownloadCreationTrait<Task = Self::Task>;
type Selector: DownloadSelectorTrait<Task = Self::Task>;
async fn add_downloads(
&self,
creation: Self::Creation,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError>;
async fn pause_downloads(
&self,
selector: Self::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError>;
async fn resume_downloads(
&self,
selector: Self::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError>;
async fn remove_downloads(
&self,
selector: Self::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError>;
async fn query_downloads(
&self,
selector: Self::Selector,
) -> Result<impl IntoIterator<Item = Self::Task>, DownloaderError>;
}

View File

@@ -0,0 +1,62 @@
use std::{borrow::Cow, time::Duration};
use snafu::prelude::*;
use util::errors::OptDynErr;
#[derive(Snafu, Debug)]
#[snafu(visibility(pub(crate)))]
pub enum DownloaderError {
#[snafu(transparent)]
DownloadUrlParseError { source: url::ParseError },
#[snafu(transparent)]
QBitAPIError { source: qbit_rs::Error },
#[snafu(transparent)]
DownloaderIOError { source: std::io::Error },
#[snafu(display("Timeout error (action = {action}, timeout = {timeout:?})"))]
DownloadTimeoutError {
action: Cow<'static, str>,
timeout: Duration,
},
#[snafu(display("Invalid magnet format ({message})"))]
MagnetFormatError {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("Invalid torrent meta format ({message})"))]
TorrentMetaError {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("Failed to fetch: {source}"))]
DownloadFetchError {
url: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("{message}"))]
Whatever {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
}
impl snafu::FromString for DownloaderError {
type Source = Box<dyn std::error::Error + Send + Sync>;
fn without_source(message: String) -> Self {
Self::Whatever {
message,
source: OptDynErr::none(),
}
}
fn with_source(source: Self::Source, message: String) -> Self {
Self::Whatever {
message,
source: OptDynErr::some(source),
}
}
}

View File

@@ -0,0 +1,8 @@
pub mod bittorrent;
pub mod core;
pub mod errors;
pub mod qbit;
pub mod rqbit;
pub mod utils;
pub use errors::DownloaderError;

View File

@@ -0,0 +1,605 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::Debug,
sync::{Arc, Weak},
time::Duration,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use itertools::Itertools;
use merge_struct::merge;
use qbit_rs::{
Qbit,
model::{
AddTorrentArg, Category, Credential, GetTorrentListArg, NonEmptyStr, Sep, SyncData,
Torrent as QbitTorrent, TorrentFile, TorrentSource,
},
};
use quirks_path::{Path, PathBuf};
use snafu::{OptionExt, whatever};
use tokio::{
sync::{RwLock, watch},
time::sleep,
};
use tracing::instrument;
use url::Url;
use crate::{
DownloaderError,
bittorrent::{
downloader::TorrentDownloaderTrait,
source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource},
task::TORRENT_TAG_NAME,
},
core::{DownloadIdSelector, DownloaderTrait},
qbit::task::{
QBittorrentCreation, QBittorrentHash, QBittorrentSelector, QBittorrentState,
QBittorrentTask,
},
utils::path_equals_as_file_url,
};
pub struct QBittorrentDownloaderCreation {
pub endpoint: String,
pub username: String,
pub password: String,
pub save_path: String,
pub subscriber_id: i32,
pub downloader_id: i32,
pub wait_sync_timeout: Option<Duration>,
}
#[derive(Default)]
pub struct QBittorrentSyncData {
pub torrents: HashMap<String, QbitTorrent>,
pub categories: HashMap<String, Category>,
pub tags: HashSet<String>,
pub trackers: HashMap<String, Vec<String>>,
pub server_state: HashMap<String, serde_value::Value>,
pub rid: i64,
}
impl QBittorrentSyncData {
pub fn patch(&mut self, data: SyncData) {
self.rid = data.rid;
if data.full_update.is_some_and(|s| s) {
self.torrents.clear();
self.categories.clear();
self.tags.clear();
self.trackers.clear();
}
if let Some(remove_categories) = data.categories_removed {
for c in remove_categories {
self.categories.remove(&c);
}
}
if let Some(add_categories) = data.categories {
self.categories.extend(add_categories);
}
if let Some(remove_tags) = data.tags_removed {
for t in remove_tags {
self.tags.remove(&t);
}
}
if let Some(add_tags) = data.tags {
self.tags.extend(add_tags);
}
if let Some(remove_torrents) = data.torrents_removed {
for t in remove_torrents {
self.torrents.remove(&t);
}
}
if let Some(add_torrents) = data.torrents {
for (hash, torrent_patch) in add_torrents {
if let Some(torrent_full) = self.torrents.get_mut(&hash) {
*torrent_full = merge(torrent_full, &torrent_patch).unwrap_or_else(|_| {
unreachable!("failed to merge torrents, but they are same type")
});
} else {
self.torrents.insert(hash, torrent_patch);
}
}
}
if let Some(remove_trackers) = data.trackers_removed {
for t in remove_trackers {
self.trackers.remove(&t);
}
}
if let Some(add_trackers) = data.trackers {
self.trackers.extend(add_trackers);
}
if let Some(server_state) = data.server_state {
self.server_state = merge(&self.server_state, &server_state).unwrap_or_else(|_| {
unreachable!("failed to merge server state, but they are same type")
});
}
}
}
pub struct QBittorrentDownloader {
pub subscriber_id: i32,
pub downloader_id: i32,
pub endpoint_url: Url,
pub client: Arc<Qbit>,
pub save_path: PathBuf,
pub wait_sync_timeout: Duration,
pub sync_watch: watch::Sender<DateTime<Utc>>,
pub sync_data: Arc<RwLock<QBittorrentSyncData>>,
}
impl QBittorrentDownloader {
pub async fn from_creation(
creation: QBittorrentDownloaderCreation,
) -> Result<Arc<Self>, DownloaderError> {
let endpoint_url = Url::parse(&creation.endpoint)?;
let credential = Credential::new(creation.username, creation.password);
let client = Qbit::new(endpoint_url.clone(), credential);
client.login(false).await?;
client.sync(None).await?;
let downloader = Arc::new(Self {
client: Arc::new(client),
endpoint_url,
subscriber_id: creation.subscriber_id,
save_path: creation.save_path.into(),
wait_sync_timeout: creation
.wait_sync_timeout
.unwrap_or(Duration::from_secs(10)),
downloader_id: creation.downloader_id,
sync_watch: watch::channel(Utc::now()).0,
sync_data: Arc::new(RwLock::new(QBittorrentSyncData::default())),
});
let event_loop_me = Arc::downgrade(&downloader);
tokio::spawn(async move { Self::start_event_loop(event_loop_me).await });
Ok(downloader)
}
async fn start_event_loop(me: Weak<Self>) {
let mut tick = 0;
loop {
sleep(Duration::from_millis(100)).await;
if let Some(me) = me.upgrade() {
if tick >= 100 {
let _ = me.sync_data().await.inspect_err(|e| {
tracing::error!(name = "sync_data", error = ?e);
});
tick = 0;
continue;
}
let count = me.sync_watch.receiver_count();
if count > 0 && tick >= 10 {
let _ = me.sync_data().await.inspect_err(|e| {
tracing::error!(name = "sync_data", error = ?e);
});
tick = i32::max(0, tick - 10);
} else {
tick += 1;
}
}
}
}
#[instrument(level = "debug")]
pub async fn api_version(&self) -> Result<String, DownloaderError> {
let result = self.client.get_webapi_version().await?;
Ok(result)
}
#[instrument(level = "debug", skip(self))]
pub async fn add_category(&self, category: &str) -> Result<(), DownloaderError> {
self.client
.add_category(
NonEmptyStr::new(category)
.whatever_context::<_, DownloaderError>("category can not be empty")?,
self.save_path.as_str(),
)
.await?;
self.wait_sync_until(
|sync_data| sync_data.categories.contains_key(category),
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn check_connection(&self) -> Result<(), DownloaderError> {
self.api_version().await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn set_torrents_category(
&self,
hashes: Vec<String>,
category: &str,
) -> Result<(), DownloaderError> {
{
let category_no_exists = {
let sync_data = self.sync_data.read().await;
!sync_data.categories.contains_key(category)
};
if category_no_exists {
self.add_category(category).await?;
}
}
self.client
.set_torrent_category(hashes.clone(), category)
.await?;
self.wait_sync_until(
|sync_data| {
let torrents = &sync_data.torrents;
hashes.iter().all(|h| {
torrents
.get(h)
.is_some_and(|t| t.category.as_deref().is_some_and(|c| c == category))
})
},
None,
)
.await?;
Ok(())
}
pub fn get_save_path(&self, sub_path: &Path) -> PathBuf {
self.save_path.join(sub_path)
}
#[instrument(level = "debug", skip(self))]
pub async fn add_torrent_tags(
&self,
hashes: Vec<String>,
tags: Vec<String>,
) -> Result<(), DownloaderError> {
if tags.is_empty() {
whatever!("add bittorrent tags can not be empty");
}
self.client
.add_torrent_tags(hashes.clone(), tags.clone())
.await?;
let tag_sets = tags.iter().map(|s| s.as_str()).collect::<HashSet<&str>>();
self.wait_sync_until(
|sync_data| {
let torrents = &sync_data.torrents;
hashes.iter().all(|h| {
torrents.get(h).is_some_and(|t| {
t.tags.as_ref().is_some_and(|t| {
t.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<HashSet<&str>>()
.is_superset(&tag_sets)
})
})
})
},
None,
)
.await?;
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn move_torrents(
&self,
hashes: Vec<String>,
new_path: &str,
) -> Result<(), DownloaderError> {
self.client
.set_torrent_location(hashes.clone(), new_path)
.await?;
self.wait_sync_until(
|sync_data| -> bool {
let torrents = &sync_data.torrents;
hashes.iter().all(|h| {
torrents.get(h).is_some_and(|t| {
t.save_path.as_deref().is_some_and(|p| {
path_equals_as_file_url(p, new_path)
.inspect_err(|error| {
tracing::warn!(name = "path_equals_as_file_url", error = ?error);
})
.unwrap_or(false)
})
})
})
},
None,
)
.await?;
Ok(())
}
pub async fn get_torrent_path(
&self,
hashes: String,
) -> Result<Option<String>, DownloaderError> {
let mut torrent_list = self
.client
.get_torrent_list(GetTorrentListArg {
hashes: Some(hashes),
..Default::default()
})
.await?;
let torrent = torrent_list
.first_mut()
.whatever_context::<_, DownloaderError>("No bittorrent found")?;
Ok(torrent.save_path.take())
}
#[instrument(level = "debug", skip(self))]
async fn sync_data(&self) -> Result<(), DownloaderError> {
let rid = { self.sync_data.read().await.rid };
let sync_data_patch = self.client.sync(Some(rid)).await?;
{
let mut sync_data = self.sync_data.write().await;
sync_data.patch(sync_data_patch);
}
let now = Utc::now();
self.sync_watch.send_replace(now);
Ok(())
}
async fn wait_sync_until<S>(
&self,
stop_wait_fn: S,
timeout: Option<Duration>,
) -> Result<(), DownloaderError>
where
S: Fn(&QBittorrentSyncData) -> bool,
{
{
let sync_data = &self.sync_data.read().await;
if stop_wait_fn(sync_data) {
return Ok(());
}
}
let timeout = timeout.unwrap_or(self.wait_sync_timeout);
let start_time = Utc::now();
let mut receiver = self.sync_watch.subscribe();
while let Ok(()) = receiver.changed().await {
let has_timeout = {
let sync_time = *receiver.borrow();
let diff_time = sync_time - start_time;
diff_time.num_milliseconds() > timeout.as_millis() as i64
};
if has_timeout {
tracing::warn!(name = "wait_until timeout", timeout = ?timeout);
return Err(DownloaderError::DownloadTimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
});
}
{
let sync_data = &self.sync_data.read().await;
if stop_wait_fn(sync_data) {
break;
}
}
}
Ok(())
}
}
#[async_trait]
impl DownloaderTrait for QBittorrentDownloader {
type State = QBittorrentState;
type Id = QBittorrentHash;
type Task = QBittorrentTask;
type Creation = QBittorrentCreation;
type Selector = QBittorrentSelector;
async fn add_downloads(
&self,
creation: <Self as DownloaderTrait>::Creation,
) -> Result<HashSet<<Self as DownloaderTrait>::Id>, DownloaderError> {
let tags = {
let mut tags = vec![TORRENT_TAG_NAME.to_string()];
tags.extend(creation.tags);
Some(tags.into_iter().filter(|s| !s.is_empty()).join(","))
};
let save_path = Some(creation.save_path.into_string());
let sources = creation.sources;
let hashes = HashSet::from_iter(sources.iter().map(|s| s.hash_info().to_string()));
let (urls_source, files_source) = {
let mut urls = vec![];
let mut files = vec![];
for s in sources {
match s {
HashTorrentSource::MagnetUrl(MagnetUrlSource { url, .. }) => {
urls.push(Url::parse(&url)?)
}
HashTorrentSource::TorrentFile(TorrentFileSource {
payload, filename, ..
}) => files.push(TorrentFile {
filename,
data: payload.into(),
}),
}
}
(
if urls.is_empty() {
None
} else {
Some(TorrentSource::Urls {
urls: Sep::from(urls),
})
},
if files.is_empty() {
None
} else {
Some(TorrentSource::TorrentFiles { torrents: files })
},
)
};
let category = creation.category;
if let Some(category) = category.as_deref() {
let has_caetgory = {
self.sync_data
.read()
.await
.categories
.contains_key(category)
};
if !has_caetgory {
self.add_category(category).await?;
}
}
if let Some(source) = urls_source {
self.client
.add_torrent(AddTorrentArg {
source,
savepath: save_path.clone(),
auto_torrent_management: Some(false),
category: category.clone(),
tags: tags.clone(),
..Default::default()
})
.await?;
}
if let Some(source) = files_source {
self.client
.add_torrent(AddTorrentArg {
source,
savepath: save_path,
auto_torrent_management: Some(false),
category,
tags,
..Default::default()
})
.await?;
}
self.wait_sync_until(
|sync_data| {
let torrents = &sync_data.torrents;
hashes.iter().all(|hash| torrents.contains_key(hash))
},
None,
)
.await?;
Ok(hashes)
}
async fn pause_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::pause_downloads(self, selector).await
}
async fn resume_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::resume_downloads(self, selector).await
}
async fn remove_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::remove_downloads(self, selector).await
}
async fn query_downloads(
&self,
selector: QBittorrentSelector,
) -> Result<Vec<<Self as DownloaderTrait>::Task>, DownloaderError> {
let selector = match selector {
QBittorrentSelector::Hash(h) => h.into(),
QBittorrentSelector::Complex(c) => c,
};
let torrent_list = self.client.get_torrent_list(selector.query).await?;
let torrent_contents = futures::future::try_join_all(torrent_list.iter().map(|s| async {
if let Some(hash) = &s.hash {
self.client.get_torrent_contents(hash as &str, None).await
} else {
Ok(vec![])
}
}))
.await?;
let tasks = torrent_list
.into_iter()
.zip(torrent_contents)
.map(|(t, c)| Self::Task::from_query(t, c))
.collect::<Result<Vec<Self::Task>, _>>()?;
Ok(tasks)
}
}
#[async_trait]
impl TorrentDownloaderTrait for QBittorrentDownloader {
type IdSelector = DownloadIdSelector<Self::Task>;
#[instrument(level = "debug", skip(self))]
async fn pause_torrents(
&self,
hashes: <Self as TorrentDownloaderTrait>::IdSelector,
) -> Result<<Self as TorrentDownloaderTrait>::IdSelector, DownloaderError> {
self.client.pause_torrents(hashes.clone()).await?;
Ok(hashes)
}
#[instrument(level = "debug", skip(self))]
async fn resume_torrents(
&self,
hashes: <Self as TorrentDownloaderTrait>::IdSelector,
) -> Result<<Self as TorrentDownloaderTrait>::IdSelector, DownloaderError> {
self.client.resume_torrents(hashes.clone()).await?;
Ok(hashes)
}
#[instrument(level = "debug", skip(self))]
async fn remove_torrents(
&self,
hashes: <Self as TorrentDownloaderTrait>::IdSelector,
) -> Result<<Self as TorrentDownloaderTrait>::IdSelector, DownloaderError> {
self.client
.delete_torrents(hashes.clone(), Some(true))
.await?;
self.wait_sync_until(
|sync_data| -> bool {
let torrents = &sync_data.torrents;
hashes.iter().all(|h| !torrents.contains_key(h))
},
None,
)
.await?;
Ok(hashes)
}
}
impl Debug for QBittorrentDownloader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QBittorrentDownloader")
.field("subscriber_id", &self.subscriber_id)
.field("client", &self.endpoint_url.as_str())
.finish()
}
}

View File

@@ -0,0 +1,11 @@
pub mod downloader;
pub mod task;
#[cfg(test)]
mod test;
pub use downloader::{QBittorrentDownloader, QBittorrentDownloaderCreation, QBittorrentSyncData};
pub use task::{
QBittorrentComplexSelector, QBittorrentCreation, QBittorrentHash, QBittorrentHashSelector,
QBittorrentSelector, QBittorrentState, QBittorrentTask,
};

View File

@@ -0,0 +1,223 @@
use std::{borrow::Cow, time::Duration};
use itertools::Itertools;
use qbit_rs::model::{
GetTorrentListArg, State, Torrent as QbitTorrent, TorrentContent as QbitTorrentContent,
};
use quirks_path::{Path, PathBuf};
use crate::{
DownloaderError,
bittorrent::{
source::HashTorrentSource,
task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait},
},
core::{
DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait,
DownloadTaskTrait,
},
};
pub type QBittorrentHash = SimpleTorrentHash;
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct QBittorrentState(Option<State>);
impl From<State> for QBittorrentState {
fn from(value: State) -> Self {
Self(Some(value))
}
}
impl From<Option<State>> for QBittorrentState {
fn from(value: Option<State>) -> Self {
Self(value)
}
}
impl DownloadStateTrait for QBittorrentState {}
impl TorrentStateTrait for QBittorrentState {}
#[derive(Debug)]
pub struct QBittorrentTask {
pub hash_info: QBittorrentHash,
pub torrent: QbitTorrent,
pub contents: Vec<QbitTorrentContent>,
pub state: QBittorrentState,
}
impl QBittorrentTask {
pub fn from_query(
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
) -> Result<Self, DownloaderError> {
let hash = torrent
.hash
.clone()
.ok_or_else(|| DownloaderError::TorrentMetaError {
message: "missing hash".to_string(),
source: None.into(),
})?;
let state = QBittorrentState(torrent.state.clone());
Ok(Self {
hash_info: hash,
contents,
state,
torrent,
})
}
}
impl DownloadTaskTrait for QBittorrentTask {
type State = QBittorrentState;
type Id = QBittorrentHash;
fn id(&self) -> &Self::Id {
&self.hash_info
}
fn into_id(self) -> Self::Id {
self.hash_info
}
fn name(&self) -> Cow<'_, str> {
self.torrent
.name
.as_deref()
.map(Cow::Borrowed)
.unwrap_or_else(|| DownloadTaskTrait::name(self))
}
fn speed(&self) -> Option<u64> {
self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok())
}
fn state(&self) -> &Self::State {
&self.state
}
fn dl_bytes(&self) -> Option<u64> {
self.torrent.downloaded.and_then(|v| u64::try_from(v).ok())
}
fn total_bytes(&self) -> Option<u64> {
self.torrent.size.and_then(|v| u64::try_from(v).ok())
}
fn left_bytes(&self) -> Option<u64> {
self.torrent.amount_left.and_then(|v| u64::try_from(v).ok())
}
fn et(&self) -> Option<Duration> {
self.torrent
.time_active
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
}
fn eta(&self) -> Option<Duration> {
self.torrent
.eta
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
}
fn progress(&self) -> Option<f32> {
self.torrent.progress.as_ref().map(|s| *s as f32)
}
}
impl TorrentTaskTrait for QBittorrentTask {
fn hash_info(&self) -> &str {
&self.hash_info
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>> {
self.torrent
.tags
.as_deref()
.unwrap_or("")
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(Cow::Borrowed)
}
fn category(&self) -> Option<Cow<'_, str>> {
self.torrent.category.as_deref().map(Cow::Borrowed)
}
}
#[derive(Debug, Clone, Default)]
pub struct QBittorrentCreation {
pub save_path: PathBuf,
pub tags: Vec<String>,
pub category: Option<String>,
pub sources: Vec<HashTorrentSource>,
}
impl DownloadCreationTrait for QBittorrentCreation {
type Task = QBittorrentTask;
}
impl TorrentCreationTrait for QBittorrentCreation {
fn save_path(&self) -> &Path {
self.save_path.as_ref()
}
fn save_path_mut(&mut self) -> &mut PathBuf {
&mut self.save_path
}
fn sources_mut(&mut self) -> &mut Vec<HashTorrentSource> {
&mut self.sources
}
}
pub type QBittorrentHashSelector = DownloadIdSelector<QBittorrentTask>;
pub struct QBittorrentComplexSelector {
pub query: GetTorrentListArg,
}
impl From<QBittorrentHashSelector> for QBittorrentComplexSelector {
fn from(value: QBittorrentHashSelector) -> Self {
Self {
query: GetTorrentListArg {
hashes: Some(value.ids.join("|")),
..Default::default()
},
}
}
}
impl DownloadSelectorTrait for QBittorrentComplexSelector {
type Id = QBittorrentHash;
type Task = QBittorrentTask;
}
pub enum QBittorrentSelector {
Hash(QBittorrentHashSelector),
Complex(QBittorrentComplexSelector),
}
impl DownloadSelectorTrait for QBittorrentSelector {
type Id = QBittorrentHash;
type Task = QBittorrentTask;
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
match self {
QBittorrentSelector::Complex(c) => {
c.try_into_ids_only().map_err(QBittorrentSelector::Complex)
}
QBittorrentSelector::Hash(h) => {
let result = h
.try_into_ids_only()
.unwrap_or_else(|_| unreachable!("hash selector must contains hash"))
.into_iter();
Ok(result.collect_vec())
}
}
}
}

View File

@@ -0,0 +1,274 @@
use std::time::Duration;
use chrono::Utc;
use qbit_rs::model::{GetTorrentListArg, TorrentFilter as QbitTorrentFilter};
use quirks_path::Path;
use snafu::OptionExt;
use crate::{
DownloaderError,
bittorrent::{
downloader::TorrentDownloaderTrait, source::HashTorrentSource, task::TorrentTaskTrait,
},
core::{DownloadIdSelectorTrait, DownloaderTrait},
qbit::{
QBittorrentDownloader, QBittorrentDownloaderCreation,
task::{
QBittorrentComplexSelector, QBittorrentCreation, QBittorrentHashSelector,
QBittorrentSelector, QBittorrentTask,
},
},
utils::path_equals_as_file_url,
};
fn get_tmp_qbit_test_folder() -> &'static str {
if cfg!(all(windows, not(feature = "testcontainers"))) {
"C:\\Windows\\Temp\\konobangu\\qbit"
} else {
"/tmp/konobangu/qbit"
}
}
#[cfg(feature = "testcontainers")]
pub async fn create_qbit_testcontainers()
-> anyhow::Result<testcontainers::ContainerRequest<testcontainers::GenericImage>> {
use testcontainers::{
GenericImage,
core::{
ContainerPort,
// ReuseDirective,
WaitFor,
},
};
use testcontainers_ext::{ImageDefaultLogConsumerExt, ImagePruneExistedLabelExt};
use testcontainers_modules::testcontainers::ImageExt;
let container = GenericImage::new("linuxserver/qbittorrent", "latest")
.with_wait_for(WaitFor::message_on_stderr("Connection to localhost"))
.with_env_var("WEBUI_PORT", "8080")
.with_env_var("TZ", "Asia/Singapore")
.with_env_var("TORRENTING_PORT", "6881")
.with_mapped_port(6881, ContainerPort::Tcp(6881))
.with_mapped_port(8080, ContainerPort::Tcp(8080))
// .with_reuse(ReuseDirective::Always)
.with_default_log_consumer()
.with_prune_existed_label(env!("CARGO_PKG_NAME"), "qbit-downloader", true, true)
.await?;
Ok(container)
}
#[cfg(not(feature = "testcontainers"))]
#[tokio::test]
async fn test_qbittorrent_downloader() {
let hash = "47ee2d69e7f19af783ad896541a07b012676f858".to_string();
let torrent_url = format!("https://mikanani.me/Download/20240301/{}.torrent", hash);
let _ = test_qbittorrent_downloader_impl(torrent_url, hash, None, None).await;
}
#[cfg(feature = "testcontainers")]
#[tokio::test(flavor = "multi_thread")]
async fn test_qbittorrent_downloader() -> anyhow::Result<()> {
use testcontainers::runners::AsyncRunner;
use testing_torrents::{TestTorrentRequest, TestTorrentResponse, TestingTorrentFileItem};
use tokio::io::AsyncReadExt;
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.init();
let torrents_image = testing_torrents::create_testcontainers().await?;
let _torrents_container = torrents_image.start().await?;
let torrents_req = TestTorrentRequest {
id: "f10ebdda-dd2e-43f8-b80c-bf0884d071c4".into(),
file_list: vec![TestingTorrentFileItem {
path: "[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p \
HEVC-10bit AAC ASSx2].mkv"
.into(),
size: 1024,
}],
};
let torrent_res: TestTorrentResponse = reqwest::Client::new()
.post("http://127.0.0.1:6080/api/torrents/mock")
.json(&torrents_req)
.send()
.await?
.json()
.await?;
let qbit_image = create_qbit_testcontainers().await?;
let qbit_container = qbit_image.start().await?;
let mut logs = String::new();
qbit_container
.stdout(false)
.read_to_string(&mut logs)
.await?;
let username = logs
.lines()
.find_map(|line| {
if line.contains("The WebUI administrator username is") {
line.split_whitespace().last()
} else {
None
}
})
.expect("should have username")
.trim();
let password = logs
.lines()
.find_map(|line| {
if line.contains("A temporary password is provided for") {
line.split_whitespace().last()
} else {
None
}
})
.expect("should have password")
.trim();
tracing::info!(username, password);
test_qbittorrent_downloader_impl(
torrent_res.torrent_url,
torrent_res.hash,
Some(username),
Some(password),
)
.await?;
Ok(())
}
async fn test_qbittorrent_downloader_impl(
torrent_url: String,
torrent_hash: String,
username: Option<&str>,
password: Option<&str>,
) -> anyhow::Result<()> {
let http_client = fetch::test_util::build_testing_http_client()?;
let base_save_path = Path::new(get_tmp_qbit_test_folder());
let downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation {
endpoint: "http://127.0.0.1:8080".to_string(),
password: password.unwrap_or_default().to_string(),
username: username.unwrap_or_default().to_string(),
subscriber_id: 0,
save_path: base_save_path.to_string(),
downloader_id: 0,
wait_sync_timeout: Some(Duration::from_secs(3)),
})
.await?;
downloader.check_connection().await?;
downloader
.remove_torrents(vec![torrent_hash.clone()].into())
.await?;
let torrent_source =
HashTorrentSource::from_url_and_http_client(&http_client, torrent_url).await?;
let folder_name = format!("torrent_test_{}", Utc::now().timestamp());
let save_path = base_save_path.join(&folder_name);
let torrent_creation = QBittorrentCreation {
save_path,
tags: vec![],
sources: vec![torrent_source],
category: None,
};
downloader.add_downloads(torrent_creation).await?;
let get_torrent = async || -> Result<QBittorrentTask, DownloaderError> {
let torrent_infos = downloader
.query_downloads(QBittorrentSelector::Hash(QBittorrentHashSelector::from_id(
torrent_hash.clone(),
)))
.await?;
let result = torrent_infos
.into_iter()
.find(|t| t.hash_info() == torrent_hash)
.whatever_context::<_, DownloaderError>("no bittorrent")?;
Ok(result)
};
let target_torrent = get_torrent().await?;
let files = target_torrent.contents;
assert!(!files.is_empty());
let first_file = files.first().expect("should have first file");
assert!(
&first_file.name.ends_with(r#"[Nekomoe kissaten&LoliHouse] Boku no Kokoro no Yabai Yatsu - 20 [WebRip 1080p HEVC-10bit AAC ASSx2].mkv"#)
);
let test_tag = "test_tag".to_string();
downloader
.add_torrent_tags(vec![torrent_hash.clone()], vec![test_tag.clone()])
.await?;
let target_torrent = get_torrent().await?;
assert!(target_torrent.tags().any(|s| s == test_tag));
let test_category = format!("test_category_{}", Utc::now().timestamp());
downloader
.set_torrents_category(vec![torrent_hash.clone()], &test_category)
.await?;
let target_torrent = get_torrent().await?;
assert_eq!(
Some(test_category.as_str()),
target_torrent.category().as_deref()
);
let moved_torrent_path = base_save_path.join(format!("moved_{}", Utc::now().timestamp()));
downloader
.move_torrents(vec![torrent_hash.clone()], moved_torrent_path.as_str())
.await?;
let target_torrent = get_torrent().await?;
let actual_content_path = &target_torrent
.torrent
.save_path
.expect("failed to get actual save path");
assert!(
path_equals_as_file_url(actual_content_path, moved_torrent_path)
.expect("failed to compare actual torrent path and found expected torrent path")
);
downloader
.remove_torrents(vec![torrent_hash.clone()].into())
.await?;
let torrent_infos1 = downloader
.query_downloads(QBittorrentSelector::Complex(QBittorrentComplexSelector {
query: GetTorrentListArg::builder()
.filter(QbitTorrentFilter::All)
.build(),
}))
.await?;
assert!(torrent_infos1.is_empty());
tracing::info!("test finished");
Ok(())
}

View File

@@ -0,0 +1 @@
pub struct RqbitDownloaderCreation {}

View File

@@ -0,0 +1,4 @@
pub mod downloader;
pub mod task;
#[cfg(test)]
mod test;

View File

@@ -0,0 +1,215 @@
use std::{borrow::Cow, time::Duration};
use itertools::Itertools;
use qbit_rs::model::{
GetTorrentListArg, State, Torrent as QbitTorrent, TorrentContent as QbitTorrentContent,
};
use quirks_path::{Path, PathBuf};
use crate::{
DownloaderError,
bittorrent::{
source::HashTorrentSource,
task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait},
},
core::{
DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait,
DownloadTaskTrait,
},
};
pub type RqbitHash = SimpleTorrentHash;
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct RqbitState(Option<State>);
impl DownloadStateTrait for RqbitState {}
impl TorrentStateTrait for RqbitState {}
impl From<Option<State>> for RqbitState {
fn from(value: Option<State>) -> Self {
Self(value)
}
}
#[derive(Debug)]
pub struct RqbitTask {
pub hash_info: RqbitHash,
pub torrent: QbitTorrent,
pub contents: Vec<QbitTorrentContent>,
pub state: RqbitState,
}
impl RqbitTask {
pub fn from_query(
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
) -> Result<Self, DownloaderError> {
let hash = torrent
.hash
.clone()
.ok_or_else(|| DownloaderError::TorrentMetaError {
message: "missing hash".to_string(),
source: None.into(),
})?;
let state = RqbitState::from(torrent.state.clone());
Ok(Self {
hash_info: hash,
contents,
state,
torrent,
})
}
}
impl DownloadTaskTrait for RqbitTask {
type State = RqbitState;
type Id = RqbitHash;
fn id(&self) -> &Self::Id {
&self.hash_info
}
fn into_id(self) -> Self::Id {
self.hash_info
}
fn name(&self) -> Cow<'_, str> {
self.torrent
.name
.as_deref()
.map(Cow::Borrowed)
.unwrap_or_else(|| DownloadTaskTrait::name(self))
}
fn speed(&self) -> Option<u64> {
self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok())
}
fn state(&self) -> &Self::State {
&self.state
}
fn dl_bytes(&self) -> Option<u64> {
self.torrent.downloaded.and_then(|v| u64::try_from(v).ok())
}
fn total_bytes(&self) -> Option<u64> {
self.torrent.size.and_then(|v| u64::try_from(v).ok())
}
fn left_bytes(&self) -> Option<u64> {
self.torrent.amount_left.and_then(|v| u64::try_from(v).ok())
}
fn et(&self) -> Option<Duration> {
self.torrent
.time_active
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
}
fn eta(&self) -> Option<Duration> {
self.torrent
.eta
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
}
fn progress(&self) -> Option<f32> {
self.torrent.progress.as_ref().map(|s| *s as f32)
}
}
impl TorrentTaskTrait for RqbitTask {
fn hash_info(&self) -> &str {
&self.hash_info
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>> {
self.torrent
.tags
.as_deref()
.unwrap_or("")
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(Cow::Borrowed)
}
fn category(&self) -> Option<Cow<'_, str>> {
self.torrent.category.as_deref().map(Cow::Borrowed)
}
}
#[derive(Debug, Clone, Default)]
pub struct RqbitCreation {
pub save_path: PathBuf,
pub tags: Vec<String>,
pub category: Option<String>,
pub sources: Vec<HashTorrentSource>,
}
impl DownloadCreationTrait for RqbitCreation {
type Task = RqbitTask;
}
impl TorrentCreationTrait for RqbitCreation {
fn save_path(&self) -> &Path {
self.save_path.as_ref()
}
fn save_path_mut(&mut self) -> &mut PathBuf {
&mut self.save_path
}
fn sources_mut(&mut self) -> &mut Vec<HashTorrentSource> {
&mut self.sources
}
}
pub type RqbitHashSelector = DownloadIdSelector<RqbitTask>;
pub struct RqbitComplexSelector {
pub query: GetTorrentListArg,
}
impl From<RqbitHashSelector> for RqbitComplexSelector {
fn from(value: RqbitHashSelector) -> Self {
Self {
query: GetTorrentListArg {
hashes: Some(value.ids.join("|")),
..Default::default()
},
}
}
}
impl DownloadSelectorTrait for RqbitComplexSelector {
type Id = RqbitHash;
type Task = RqbitTask;
}
pub enum RqbitSelector {
Hash(RqbitHashSelector),
Complex(RqbitComplexSelector),
}
impl DownloadSelectorTrait for RqbitSelector {
type Id = RqbitHash;
type Task = RqbitTask;
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
match self {
RqbitSelector::Complex(c) => c.try_into_ids_only().map_err(RqbitSelector::Complex),
RqbitSelector::Hash(h) => {
let result = h
.try_into_ids_only()
.unwrap_or_else(|_| unreachable!("hash selector must contains hash"))
.into_iter();
Ok(result.collect_vec())
}
}
}
}

View File

View File

@@ -0,0 +1,11 @@
use quirks_path::{Path, PathToUrlError};
pub fn path_equals_as_file_url<A: AsRef<Path>, B: AsRef<Path>>(
a: A,
b: B,
) -> Result<bool, PathToUrlError> {
let u1 = a.as_ref().to_file_url()?;
let u2 = b.as_ref().to_file_url()?;
Ok(u1.as_str() == u2.as_str())
}

35
packages/fetch/Cargo.toml Normal file
View File

@@ -0,0 +1,35 @@
[package]
name = "fetch"
version = "0.1.0"
edition = "2024"
[dependencies]
snafu = { workspace = true }
bytes = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
serde_with = { workspace = true }
lazy_static = { workspace = true }
serde_json = { workspace = true }
axum = { workspace = true }
axum-extra = { workspace = true }
async-trait = { workspace = true }
moka = { workspace = true }
reqwest = { workspace = true }
leaky-bucket = "1.1"
cookie = "0.18"
http-cache-reqwest = { version = "0.15", features = [
"manager-cacache",
"manager-moka",
] }
http-cache-semantics = "2.1"
fastrand = "2.3"
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
reqwest-tracing = "0.5"
http-cache = { version = "0.20", features = [
"cacache-tokio",
"manager-cacache",
"manager-moka",
], default-features = false }

View File

@@ -0,0 +1,19 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::client::HttpClientTrait;
use crate::FetchError;
pub async fn fetch_bytes<T: IntoUrl, H: HttpClientTrait>(
client: &H,
url: T,
) -> Result<Bytes, FetchError> {
let bytes = client
.get(url)
.send()
.await?
.error_for_status()?
.bytes()
.await?;
Ok(bytes)
}

View File

@@ -0,0 +1,322 @@
use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration};
use async_trait::async_trait;
use axum::http::{self, Extensions};
use http_cache_reqwest::{
Cache, CacheManager, CacheMode, HttpCache, HttpCacheOptions, MokaManager,
};
use leaky_bucket::RateLimiter;
use reqwest::{ClientBuilder, Request, Response};
use reqwest_middleware::{
ClientBuilder as ClientWithMiddlewareBuilder, ClientWithMiddleware, Middleware, Next,
};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use reqwest_tracing::TracingMiddleware;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use snafu::Snafu;
use super::HttpClientSecrecyDataTrait;
use crate::get_random_mobile_ua;
pub struct RateLimiterMiddleware {
rate_limiter: RateLimiter,
}
#[async_trait]
impl Middleware for RateLimiterMiddleware {
async fn handle(
&self,
req: Request,
extensions: &'_ mut Extensions,
next: Next<'_>,
) -> reqwest_middleware::Result<Response> {
self.rate_limiter.acquire_one().await;
next.run(req, extensions).await
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum HttpClientCacheBackendConfig {
Moka { cache_size: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum HttpClientCachePresetConfig {
#[serde(rename = "rfc7234")]
RFC7234,
}
impl Default for HttpClientCachePresetConfig {
fn default() -> Self {
Self::RFC7234
}
}
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct HttpClientConfig {
pub exponential_backoff_max_retries: Option<u32>,
pub leaky_bucket_max_tokens: Option<u32>,
pub leaky_bucket_initial_tokens: Option<u32>,
pub leaky_bucket_refill_tokens: Option<u32>,
#[serde_as(as = "Option<serde_with::DurationMilliSeconds>")]
pub leaky_bucket_refill_interval: Option<Duration>,
pub user_agent: Option<String>,
pub cache_backend: Option<HttpClientCacheBackendConfig>,
pub cache_preset: Option<HttpClientCachePresetConfig>,
}
pub(crate) struct CacheBackend(Box<dyn CacheManager>);
impl CacheBackend {
pub(crate) fn new<T: CacheManager>(backend: T) -> Self {
Self(Box::new(backend))
}
}
#[async_trait::async_trait]
impl CacheManager for CacheBackend {
async fn get(
&self,
cache_key: &str,
) -> http_cache::Result<Option<(http_cache::HttpResponse, http_cache_semantics::CachePolicy)>>
{
self.0.get(cache_key).await
}
/// Attempts to cache a response and related policy.
async fn put(
&self,
cache_key: String,
res: http_cache::HttpResponse,
policy: http_cache_semantics::CachePolicy,
) -> http_cache::Result<http_cache::HttpResponse> {
self.0.put(cache_key, res, policy).await
}
/// Attempts to remove a record from cache.
async fn delete(&self, cache_key: &str) -> http_cache::Result<()> {
self.0.delete(cache_key).await
}
}
#[derive(Debug, Snafu)]
pub enum HttpClientError {
#[snafu(transparent)]
ReqwestError { source: reqwest::Error },
#[snafu(transparent)]
ReqwestMiddlewareError { source: reqwest_middleware::Error },
#[snafu(transparent)]
HttpError { source: http::Error },
}
pub trait HttpClientTrait: Deref<Target = ClientWithMiddleware> + Debug {}
pub struct HttpClientFork {
pub client_builder: ClientBuilder,
pub middleware_stack: Vec<Arc<dyn Middleware>>,
pub config: HttpClientConfig,
}
impl HttpClientFork {
pub fn attach_secrecy<S: HttpClientSecrecyDataTrait>(self, secrecy: S) -> Self {
let mut fork = self;
fork.client_builder = secrecy.attach_secrecy_to_client(fork.client_builder);
fork
}
}
pub struct HttpClient {
client: ClientWithMiddleware,
middleware_stack: Vec<Arc<dyn Middleware>>,
pub config: HttpClientConfig,
}
impl Debug for HttpClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpClient")
.field("config", &self.config)
.finish()
}
}
impl From<HttpClient> for ClientWithMiddleware {
fn from(val: HttpClient) -> Self {
val.client
}
}
impl Deref for HttpClient {
type Target = ClientWithMiddleware;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl HttpClient {
pub fn from_config(config: HttpClientConfig) -> Result<Self, HttpClientError> {
let mut middleware_stack: Vec<Arc<dyn Middleware>> = vec![];
let reqwest_client_builder = ClientBuilder::new().user_agent(
config
.user_agent
.as_deref()
.unwrap_or_else(|| get_random_mobile_ua()),
);
#[cfg(not(target_arch = "wasm32"))]
let reqwest_client_builder =
reqwest_client_builder.redirect(reqwest::redirect::Policy::none());
let reqwest_client = reqwest_client_builder.build()?;
let mut reqwest_with_middleware_builder = ClientWithMiddlewareBuilder::new(reqwest_client);
{
let tracing_middleware = Arc::new(TracingMiddleware::default());
middleware_stack.push(tracing_middleware.clone());
reqwest_with_middleware_builder =
reqwest_with_middleware_builder.with_arc(tracing_middleware)
}
{
if let Some(ref x) = config.exponential_backoff_max_retries {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(*x);
let retry_transient_middleware =
Arc::new(RetryTransientMiddleware::new_with_policy(retry_policy));
middleware_stack.push(retry_transient_middleware.clone());
reqwest_with_middleware_builder =
reqwest_with_middleware_builder.with_arc(retry_transient_middleware);
}
}
{
if let (None, None, None, None) = (
config.leaky_bucket_initial_tokens.as_ref(),
config.leaky_bucket_refill_tokens.as_ref(),
config.leaky_bucket_refill_interval.as_ref(),
config.leaky_bucket_max_tokens.as_ref(),
) {
} else {
let mut rate_limiter_builder = RateLimiter::builder();
if let Some(ref x) = config.leaky_bucket_max_tokens {
rate_limiter_builder.max(*x as usize);
}
if let Some(ref x) = config.leaky_bucket_initial_tokens {
rate_limiter_builder.initial(*x as usize);
}
if let Some(ref x) = config.leaky_bucket_refill_tokens {
rate_limiter_builder.refill(*x as usize);
}
if let Some(ref x) = config.leaky_bucket_refill_interval {
rate_limiter_builder.interval(*x);
}
let rate_limiter = rate_limiter_builder.build();
let rate_limiter_middleware = Arc::new(RateLimiterMiddleware { rate_limiter });
middleware_stack.push(rate_limiter_middleware.clone());
reqwest_with_middleware_builder =
reqwest_with_middleware_builder.with_arc(rate_limiter_middleware);
}
}
{
if let (None, None) = (config.cache_backend.as_ref(), config.cache_preset.as_ref()) {
} else {
let cache_preset = config.cache_preset.as_ref().cloned().unwrap_or_default();
let cache_backend = config
.cache_backend
.as_ref()
.map(|b| match b {
HttpClientCacheBackendConfig::Moka { cache_size } => {
CacheBackend::new(MokaManager {
cache: Arc::new(moka::future::Cache::new(*cache_size)),
})
}
})
.unwrap_or_else(|| CacheBackend::new(MokaManager::default()));
let http_cache = match cache_preset {
HttpClientCachePresetConfig::RFC7234 => HttpCache {
mode: CacheMode::Default,
manager: cache_backend,
options: HttpCacheOptions::default(),
},
};
let http_cache_middleware = Arc::new(Cache(http_cache));
middleware_stack.push(http_cache_middleware.clone());
reqwest_with_middleware_builder =
reqwest_with_middleware_builder.with_arc(http_cache_middleware);
}
}
let reqwest_with_middleware = reqwest_with_middleware_builder.build();
Ok(Self {
client: reqwest_with_middleware,
middleware_stack,
config,
})
}
pub fn fork(&self) -> HttpClientFork {
let reqwest_client_builder = ClientBuilder::new().user_agent(
self.config
.user_agent
.as_deref()
.unwrap_or_else(|| get_random_mobile_ua()),
);
#[cfg(not(target_arch = "wasm32"))]
let reqwest_client_builder =
reqwest_client_builder.redirect(reqwest::redirect::Policy::none());
HttpClientFork {
client_builder: reqwest_client_builder,
middleware_stack: self.middleware_stack.clone(),
config: self.config.clone(),
}
}
pub fn from_fork(fork: HttpClientFork) -> Result<Self, HttpClientError> {
let HttpClientFork {
client_builder,
middleware_stack,
config,
} = fork;
let reqwest_client = client_builder.build()?;
let mut reqwest_with_middleware_builder = ClientWithMiddlewareBuilder::new(reqwest_client);
for m in &middleware_stack {
reqwest_with_middleware_builder = reqwest_with_middleware_builder.with_arc(m.clone());
}
let reqwest_with_middleware = reqwest_with_middleware_builder.build();
Ok(Self {
client: reqwest_with_middleware,
middleware_stack,
config,
})
}
}
impl Default for HttpClient {
fn default() -> Self {
HttpClient::from_config(Default::default()).expect("Failed to create default HttpClient")
}
}
impl HttpClientTrait for HttpClient {}

View File

@@ -0,0 +1,9 @@
pub mod core;
pub mod secrecy;
pub use core::{
HttpClient, HttpClientCacheBackendConfig, HttpClientCachePresetConfig, HttpClientConfig,
HttpClientError, HttpClientTrait,
};
pub use secrecy::{HttpClientCookiesAuth, HttpClientSecrecyDataTrait};

View File

@@ -0,0 +1,47 @@
use std::sync::Arc;
use cookie::Cookie;
use reqwest::{ClientBuilder, cookie::Jar};
use url::Url;
use crate::FetchError;
pub trait HttpClientSecrecyDataTrait {
fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder {
client_builder
}
}
#[derive(Default)]
pub struct HttpClientCookiesAuth {
pub cookie_jar: Arc<Jar>,
pub user_agent: Option<String>,
}
impl HttpClientCookiesAuth {
pub fn from_cookies(
cookies: &str,
url: &Url,
user_agent: Option<String>,
) -> Result<Self, FetchError> {
let cookie_jar = Arc::new(Jar::default());
for cookie in Cookie::split_parse(cookies).collect::<Result<Vec<Cookie<'_>>, _>>()? {
cookie_jar.add_cookie_str(&cookie.to_string(), url);
}
Ok(Self {
cookie_jar,
user_agent,
})
}
}
impl HttpClientSecrecyDataTrait for HttpClientCookiesAuth {
fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder {
let mut client_builder = client_builder.cookie_provider(self.cookie_jar.clone());
if let Some(ref user_agent) = self.user_agent {
client_builder = client_builder.user_agent(user_agent);
}
client_builder
}
}

View File

@@ -0,0 +1,11 @@
use lazy_static::lazy_static;
lazy_static! {
static ref DEFAULT_HTTP_CLIENT_USER_AGENT: Vec<String> =
serde_json::from_str::<Vec<String>>(include_str!("./ua.json")).unwrap();
}
pub fn get_random_mobile_ua() -> &'static str {
DEFAULT_HTTP_CLIENT_USER_AGENT[fastrand::usize(0..DEFAULT_HTTP_CLIENT_USER_AGENT.len())]
.as_str()
}

View File

@@ -0,0 +1,12 @@
use snafu::Snafu;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum FetchError {
#[snafu(transparent)]
CookieParseError { source: cookie::ParseError },
#[snafu(transparent)]
ReqwestError { source: reqwest::Error },
#[snafu(transparent)]
RequestMiddlewareError { source: reqwest_middleware::Error },
}

View File

@@ -0,0 +1,19 @@
use reqwest::IntoUrl;
use super::client::HttpClientTrait;
use crate::FetchError;
pub async fn fetch_html<T: IntoUrl, H: HttpClientTrait>(
client: &H,
url: T,
) -> Result<String, FetchError> {
let content = client
.get(url)
.send()
.await?
.error_for_status()?
.text()
.await?;
Ok(content)
}

View File

@@ -0,0 +1,12 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::{bytes::fetch_bytes, client::HttpClientTrait};
use crate::FetchError;
pub async fn fetch_image<T: IntoUrl, H: HttpClientTrait>(
client: &H,
url: T,
) -> Result<Bytes, FetchError> {
fetch_bytes(client, url).await
}

20
packages/fetch/src/lib.rs Normal file
View File

@@ -0,0 +1,20 @@
pub mod bytes;
pub mod client;
pub mod core;
pub mod errors;
pub mod html;
pub mod image;
pub mod test_util;
pub use core::get_random_mobile_ua;
pub use bytes::fetch_bytes;
pub use client::{
HttpClient, HttpClientConfig, HttpClientCookiesAuth, HttpClientError,
HttpClientSecrecyDataTrait, HttpClientTrait,
};
pub use errors::FetchError;
pub use html::fetch_html;
pub use image::fetch_image;
pub use reqwest::{self, IntoUrl};
pub use reqwest_middleware;

View File

@@ -0,0 +1,6 @@
use crate::{FetchError, HttpClient};
pub fn build_testing_http_client() -> Result<HttpClient, FetchError> {
let mikan_client = HttpClient::default();
Ok(mikan_client)
}

View File

@@ -0,0 +1,15 @@
[
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.1",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.3",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Herring/97.1.8280.8",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.3",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 OPR/115.0.0.",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 AtContent/95.5.5462.5",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.3",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 OPR/114.0.0.",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.3"
]

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
serde = { version = "1", features = ["derive"] }
testcontainers = { version = "0.23.3" }
testcontainers-modules = { version = "0.11.4" }
testcontainers-ext = { version = "0.1.0", features = ["tracing"] }
testcontainers = { workspace = true }
testcontainers-modules = { workspace = true }
testcontainers-ext = { workspace = true }
serde = { workspace = true }

11
packages/util/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "util"
version = "0.1.0"
edition = "2024"
[dependencies]
futures = { workspace = true }
quirks_path = { workspace = true }
snafu = { workspace = true }
anyhow = { workspace = true }
bytes = { workspace = true }

View File

@@ -0,0 +1,65 @@
use std::fmt::Display;
#[derive(Debug)]
pub struct OptDynErr(Option<Box<dyn std::error::Error + Send + Sync>>);
impl AsRef<dyn snafu::Error> for OptDynErr {
fn as_ref(&self) -> &(dyn snafu::Error + 'static) {
self
}
}
impl OptDynErr {
pub fn some_boxed<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self(Some(Box::new(e)))
}
pub fn some(e: Box<dyn std::error::Error + Send + Sync>) -> Self {
Self(Some(e))
}
pub fn none() -> Self {
Self(None)
}
}
impl Display for OptDynErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
Some(e) => e.fmt(f),
None => write!(f, "None"),
}
}
}
impl snafu::Error for OptDynErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
fn cause(&self) -> Option<&dyn std::error::Error> {
self.source()
}
}
impl From<Option<Box<dyn std::error::Error + Send + Sync>>> for OptDynErr {
fn from(value: Option<Box<dyn std::error::Error + Send + Sync>>) -> Self {
Self(value)
}
}
impl From<Box<dyn std::error::Error + Send + Sync>> for OptDynErr {
fn from(value: Box<dyn std::error::Error + Send + Sync>) -> Self {
Self::some(value)
}
}
pub trait AnyhowResultExt<T>: snafu::ResultExt<T, anyhow::Error> {
fn to_dyn_boxed(self) -> Result<T, Box<dyn std::error::Error + Send + Sync>>;
}
impl<T> AnyhowResultExt<T> for Result<T, anyhow::Error> {
fn to_dyn_boxed(self) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
self.map_err(|e| e.into())
}
}

3
packages/util/src/lib.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod errors;
pub use errors::OptDynErr;