feat(downloader): add rqbit impl

This commit is contained in:
2025-04-09 02:26:23 +08:00
parent 2686fa1d76
commit 1ff8a311ae
15 changed files with 457 additions and 146 deletions

View File

@@ -42,6 +42,7 @@ librqbit = { version = "8", features = ["async-bt", "watch"] }
util = { workspace = true }
testing-torrents = { workspace = true, optional = true }
fetch = { workspace = true }
dashmap = "6.1.0"
[dev-dependencies]

View File

@@ -24,9 +24,10 @@ where
Self::State: TorrentStateTrait,
Self::Id: TorrentHashTrait,
{
fn hash_info(&self) -> &str;
fn hash_info(&self) -> Cow<'_, str>;
fn name(&self) -> Cow<'_, str> {
Cow::Borrowed(self.hash_info())
self.hash_info()
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>>;

View File

@@ -7,7 +7,18 @@ use async_trait::async_trait;
use super::DownloaderError;
pub trait DownloadStateTrait: Sized + Debug {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DownloadSimpleState {
Paused,
Active,
Completed,
Error,
Unknown,
}
pub trait DownloadStateTrait: Sized + Debug {
fn to_download_state(&self) -> DownloadSimpleState;
}
pub trait DownloadIdTrait: Hash + Sized + Clone + Send + Debug {}

View File

@@ -35,6 +35,11 @@ pub enum DownloaderError {
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("{source}"))]
RqbitError {
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("{message}"))]
Whatever {
message: String,

View File

@@ -17,7 +17,7 @@ use qbit_rs::{
Torrent as QbitTorrent, TorrentFile, TorrentSource,
},
};
use quirks_path::{Path, PathBuf};
use quirks_path::PathBuf;
use snafu::{OptionExt, whatever};
use tokio::{
sync::{RwLock, watch},
@@ -26,6 +26,7 @@ use tokio::{
use tracing::instrument;
use url::Url;
use super::QBittorrentHashSelector;
use crate::{
DownloaderError,
bittorrent::{
@@ -33,7 +34,7 @@ use crate::{
source::{HashTorrentSource, HashTorrentSourceTrait, MagnetUrlSource, TorrentFileSource},
task::TORRENT_TAG_NAME,
},
core::{DownloadIdSelector, DownloaderTrait},
core::DownloaderTrait,
qbit::task::{
QBittorrentCreation, QBittorrentHash, QBittorrentSelector, QBittorrentState,
QBittorrentTask,
@@ -41,6 +42,7 @@ use crate::{
utils::path_equals_as_file_url,
};
#[derive(Debug)]
pub struct QBittorrentDownloaderCreation {
pub endpoint: String,
pub username: String,
@@ -130,6 +132,7 @@ pub struct QBittorrentDownloader {
}
impl QBittorrentDownloader {
#[instrument(level = "debug")]
pub async fn from_creation(
creation: QBittorrentDownloaderCreation,
) -> Result<Arc<Self>, DownloaderError> {
@@ -253,10 +256,6 @@ impl QBittorrentDownloader {
Ok(())
}
pub fn get_save_path(&self, sub_path: &Path) -> PathBuf {
self.save_path.join(sub_path)
}
#[instrument(level = "debug", skip(self))]
pub async fn add_torrent_tags(
&self,
@@ -324,6 +323,7 @@ impl QBittorrentDownloader {
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn get_torrent_path(
&self,
hashes: String,
@@ -406,6 +406,7 @@ impl DownloaderTrait for QBittorrentDownloader {
type Creation = QBittorrentCreation;
type Selector = QBittorrentSelector;
#[instrument(level = "debug", skip(self))]
async fn add_downloads(
&self,
creation: <Self as DownloaderTrait>::Creation,
@@ -524,6 +525,7 @@ impl DownloaderTrait for QBittorrentDownloader {
<Self as TorrentDownloaderTrait>::remove_downloads(self, selector).await
}
#[instrument(level = "debug", skip(self))]
async fn query_downloads(
&self,
selector: QBittorrentSelector,
@@ -555,13 +557,13 @@ impl DownloaderTrait for QBittorrentDownloader {
#[async_trait]
impl TorrentDownloaderTrait for QBittorrentDownloader {
type IdSelector = DownloadIdSelector<Self::Task>;
type IdSelector = QBittorrentHashSelector;
#[instrument(level = "debug", skip(self))]
async fn pause_torrents(
&self,
hashes: <Self as TorrentDownloaderTrait>::IdSelector,
) -> Result<<Self as TorrentDownloaderTrait>::IdSelector, DownloaderError> {
) -> Result<Self::IdSelector, DownloaderError> {
self.client.pause_torrents(hashes.clone()).await?;
Ok(hashes)
}
@@ -579,7 +581,7 @@ impl TorrentDownloaderTrait for QBittorrentDownloader {
async fn remove_torrents(
&self,
hashes: <Self as TorrentDownloaderTrait>::IdSelector,
) -> Result<<Self as TorrentDownloaderTrait>::IdSelector, DownloaderError> {
) -> Result<Self::IdSelector, DownloaderError> {
self.client
.delete_torrents(hashes.clone(), Some(true))
.await?;

View File

@@ -13,8 +13,8 @@ use crate::{
task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait},
},
core::{
DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait,
DownloadTaskTrait,
DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadSimpleState,
DownloadStateTrait, DownloadTaskTrait,
},
};
@@ -35,7 +35,34 @@ impl From<Option<State>> for QBittorrentState {
}
}
impl DownloadStateTrait for QBittorrentState {}
impl DownloadStateTrait for QBittorrentState {
fn to_download_state(&self) -> DownloadSimpleState {
if let Some(ref state) = self.0 {
match state {
State::ForcedUP
| State::Uploading
| State::PausedUP
| State::QueuedUP
| State::StalledUP
| State::CheckingUP => DownloadSimpleState::Completed,
State::Error | State::MissingFiles => DownloadSimpleState::Error,
State::Unknown => DownloadSimpleState::Unknown,
State::PausedDL => DownloadSimpleState::Paused,
State::Allocating
| State::Moving
| State::MetaDL
| State::ForcedDL
| State::CheckingResumeData
| State::QueuedDL
| State::Downloading
| State::StalledDL
| State::CheckingDL => DownloadSimpleState::Active,
}
} else {
DownloadSimpleState::Unknown
}
}
}
impl TorrentStateTrait for QBittorrentState {}
@@ -129,8 +156,8 @@ impl DownloadTaskTrait for QBittorrentTask {
}
impl TorrentTaskTrait for QBittorrentTask {
fn hash_info(&self) -> &str {
&self.hash_info
fn hash_info(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.hash_info)
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>> {
@@ -177,6 +204,7 @@ impl TorrentCreationTrait for QBittorrentCreation {
pub type QBittorrentHashSelector = DownloadIdSelector<QBittorrentTask>;
#[derive(Debug)]
pub struct QBittorrentComplexSelector {
pub query: GetTorrentListArg,
}
@@ -197,6 +225,7 @@ impl DownloadSelectorTrait for QBittorrentComplexSelector {
type Task = QBittorrentTask;
}
#[derive(Debug)]
pub enum QBittorrentSelector {
Hash(QBittorrentHashSelector),
Complex(QBittorrentComplexSelector),

View File

@@ -1 +1,278 @@
pub struct RqbitDownloaderCreation {}
use std::{str::FromStr, sync::Arc};
use async_trait::async_trait;
use librqbit::{
AddTorrent, AddTorrentOptions, ManagedTorrent, Session, SessionOptions, api::TorrentIdOrHash,
};
use librqbit_core::Id20;
use snafu::ResultExt;
use tracing::instrument;
use util::errors::AnyhowResultExt;
use super::task::{RqbitCreation, RqbitHash, RqbitSelector, RqbitState, RqbitTask};
use crate::{
DownloaderError,
bittorrent::{
downloader::TorrentDownloaderTrait,
source::{HashTorrentSource, HashTorrentSourceTrait},
},
core::{DownloadIdSelector, DownloaderTrait},
errors::RqbitSnafu,
};
#[derive(Debug)]
pub struct RqbitDownloaderCreation {
pub save_path: String,
pub subscriber_id: i32,
pub downloader_id: i32,
}
impl RqbitDownloaderCreation {}
pub struct RqbitDownloader {
pub save_path: String,
pub subscriber_id: i32,
pub downloader_id: i32,
pub session: Arc<Session>,
}
impl RqbitDownloader {
#[instrument(level = "debug")]
pub async fn from_creation(
creation: RqbitDownloaderCreation,
) -> Result<Arc<Self>, DownloaderError> {
let session_opt = SessionOptions {
..Default::default()
};
let session = Session::new_with_opts(creation.save_path.clone().into(), session_opt)
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(Arc::new(Self {
session,
save_path: creation.save_path,
subscriber_id: creation.subscriber_id,
downloader_id: creation.downloader_id,
}))
}
pub async fn add_torrent(
&self,
source: HashTorrentSource,
opt: Option<AddTorrentOptions>,
) -> Result<RqbitHash, DownloaderError> {
let hash = Id20::from_str(&source.hash_info() as &str)
.to_dyn_boxed()
.context(RqbitSnafu {})?;
let source = match source {
HashTorrentSource::TorrentFile(file) => AddTorrent::TorrentFileBytes(file.payload),
HashTorrentSource::MagnetUrl(magnet) => AddTorrent::Url(magnet.url.into()),
};
let response = self
.session
.add_torrent(source, opt)
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
let handle = response
.into_handle()
.ok_or_else(|| anyhow::anyhow!("failed to get handle of add torrent task"))
.to_dyn_boxed()
.context(RqbitSnafu {})?;
handle
.wait_until_initialized()
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(hash)
}
fn query_torrent_impl(&self, hash: RqbitHash) -> Result<Arc<ManagedTorrent>, DownloaderError> {
let torrent = self
.session
.get(TorrentIdOrHash::Hash(hash))
.ok_or_else(|| anyhow::anyhow!("could not find torrent by hash {}", hash.as_string()))
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(torrent)
}
pub fn query_torrent(&self, hash: RqbitHash) -> Result<RqbitTask, DownloaderError> {
let torrent = self.query_torrent_impl(hash)?;
let task = RqbitTask::from_query(torrent)?;
Ok(task)
}
pub async fn pause_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> {
let t = self.query_torrent_impl(hash)?;
self.session
.pause(&t)
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(())
}
pub async fn resume_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> {
let t = self.query_torrent_impl(hash)?;
self.session
.unpause(&t)
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(())
}
pub async fn delete_torrent(&self, hash: RqbitHash) -> Result<(), DownloaderError> {
self.session
.delete(TorrentIdOrHash::Hash(hash), true)
.await
.to_dyn_boxed()
.context(RqbitSnafu {})?;
Ok(())
}
}
#[async_trait]
impl DownloaderTrait for RqbitDownloader {
type State = RqbitState;
type Id = RqbitHash;
type Task = RqbitTask;
type Creation = RqbitCreation;
type Selector = RqbitSelector;
#[instrument(level = "debug", skip(self))]
async fn add_downloads(
&self,
creation: RqbitCreation,
) -> Result<Vec<<Self as DownloaderTrait>::Id>, DownloaderError> {
let mut sources = creation.sources;
if sources.len() == 1 {
let hash = self
.add_torrent(
sources.pop().unwrap(),
Some(AddTorrentOptions {
paused: false,
output_folder: Some(self.save_path.clone()),
..Default::default()
}),
)
.await?;
Ok(vec![hash])
} else {
let tasks = sources
.into_iter()
.map(|s| {
self.add_torrent(
s,
Some(AddTorrentOptions {
paused: false,
output_folder: Some(self.save_path.clone()),
..Default::default()
}),
)
})
.collect::<Vec<_>>();
let results = futures::future::try_join_all(tasks).await?;
Ok(results)
}
}
async fn pause_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::pause_downloads(self, selector).await
}
async fn resume_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::resume_downloads(self, selector).await
}
async fn remove_downloads(
&self,
selector: <Self as DownloaderTrait>::Selector,
) -> Result<impl IntoIterator<Item = Self::Id>, DownloaderError> {
<Self as TorrentDownloaderTrait>::remove_downloads(self, selector).await
}
#[instrument(level = "debug", skip(self))]
async fn query_downloads(
&self,
selector: RqbitSelector,
) -> Result<Vec<<Self as DownloaderTrait>::Task>, DownloaderError> {
let hashes = selector.into_iter();
let tasks = hashes
.map(|h| self.query_torrent(h))
.collect::<Result<Vec<_>, DownloaderError>>()?;
Ok(tasks)
}
}
#[async_trait]
impl TorrentDownloaderTrait for RqbitDownloader {
type IdSelector = DownloadIdSelector<Self::Task>;
#[instrument(level = "debug", skip(self))]
async fn pause_torrents(
&self,
selector: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError> {
let mut hashes: Vec<_> = selector.clone();
if hashes.len() == 1 {
self.pause_torrent(hashes.pop().unwrap()).await?;
} else {
futures::future::try_join_all(hashes.into_iter().map(|h| self.pause_torrent(h)))
.await?;
}
Ok(selector)
}
#[instrument(level = "debug", skip(self))]
async fn resume_torrents(
&self,
selector: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError> {
let mut hashes: Vec<_> = selector.clone();
if hashes.len() == 1 {
self.resume_torrent(hashes.pop().unwrap()).await?;
} else {
futures::future::try_join_all(hashes.into_iter().map(|h| self.resume_torrent(h)))
.await?;
}
Ok(selector)
}
#[instrument(level = "debug", skip(self))]
async fn remove_torrents(
&self,
selector: Self::IdSelector,
) -> Result<Self::IdSelector, DownloaderError> {
let mut hashes: Vec<_> = selector.clone();
if hashes.len() == 1 {
self.delete_torrent(hashes.pop().unwrap()).await?;
} else {
futures::future::try_join_all(hashes.into_iter().map(|h| self.delete_torrent(h)))
.await?;
}
Ok(selector)
}
}

View File

@@ -1,68 +1,84 @@
use std::{borrow::Cow, time::Duration};
use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration};
use itertools::Itertools;
use qbit_rs::model::{
GetTorrentListArg, State, Torrent as QbitTorrent, TorrentContent as QbitTorrentContent,
};
use librqbit::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState};
use librqbit_core::Id20;
use quirks_path::{Path, PathBuf};
use crate::{
DownloaderError,
bittorrent::{
source::HashTorrentSource,
task::{SimpleTorrentHash, TorrentCreationTrait, TorrentStateTrait, TorrentTaskTrait},
task::{TorrentCreationTrait, TorrentHashTrait, TorrentStateTrait, TorrentTaskTrait},
},
core::{
DownloadCreationTrait, DownloadIdSelector, DownloadSelectorTrait, DownloadStateTrait,
DownloadTaskTrait,
DownloadCreationTrait, DownloadIdSelector, DownloadIdTrait, DownloadSimpleState,
DownloadStateTrait, DownloadTaskTrait,
},
};
pub type RqbitHash = SimpleTorrentHash;
pub type RqbitHash = Id20;
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct RqbitState(Option<State>);
impl DownloadIdTrait for RqbitHash {}
impl DownloadStateTrait for RqbitState {}
impl TorrentHashTrait for RqbitHash {}
#[derive(Debug, Clone)]
pub struct RqbitState(Arc<TorrentStats>);
impl DownloadStateTrait for RqbitState {
fn to_download_state(&self) -> DownloadSimpleState {
match self.0.state {
TorrentStatsState::Error => DownloadSimpleState::Error,
TorrentStatsState::Paused => DownloadSimpleState::Paused,
TorrentStatsState::Live => {
if self.0.finished {
DownloadSimpleState::Completed
} else {
DownloadSimpleState::Active
}
}
TorrentStatsState::Initializing => DownloadSimpleState::Active,
}
}
}
impl TorrentStateTrait for RqbitState {}
impl From<Option<State>> for RqbitState {
fn from(value: Option<State>) -> Self {
impl From<Arc<TorrentStats>> for RqbitState {
fn from(value: Arc<TorrentStats>) -> Self {
Self(value)
}
}
#[derive(Debug)]
pub struct RqbitTask {
pub hash_info: RqbitHash,
pub torrent: QbitTorrent,
pub contents: Vec<QbitTorrentContent>,
pub torrent: Arc<ManagedTorrent>,
pub state: RqbitState,
pub stats: Arc<TorrentStats>,
}
impl RqbitTask {
pub fn from_query(
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
) -> Result<Self, DownloaderError> {
let hash = torrent
.hash
.clone()
.ok_or_else(|| DownloaderError::TorrentMetaError {
message: "missing hash".to_string(),
source: None.into(),
})?;
let state = RqbitState::from(torrent.state.clone());
pub fn from_query(torrent: Arc<ManagedTorrent>) -> Result<Self, DownloaderError> {
let hash = torrent.info_hash();
let stats = Arc::new(torrent.stats());
Ok(Self {
hash_info: hash,
contents,
state,
state: stats.clone().into(),
stats,
torrent,
})
}
}
impl Debug for RqbitTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RqbitTask")
.field("hash_info", &self.hash_info)
.field("state", &self.id())
.finish()
}
}
impl DownloadTaskTrait for RqbitTask {
type State = RqbitState;
type Id = RqbitHash;
@@ -77,14 +93,26 @@ impl DownloadTaskTrait for RqbitTask {
fn name(&self) -> Cow<'_, str> {
self.torrent
.name
.as_deref()
.map(Cow::Borrowed)
.metadata
.load_full()
.and_then(|m| m.name.to_owned())
.map(Cow::Owned)
.unwrap_or_else(|| DownloadTaskTrait::name(self))
}
fn speed(&self) -> Option<u64> {
self.torrent.dlspeed.and_then(|s| u64::try_from(s).ok())
self.stats
.live
.as_ref()
.map(|s| s.download_speed.mbps)
.and_then(|u| {
let v = u * 1024f64 * 1024f64;
if v.is_finite() && v > 0.0 && v < u64::MAX as f64 {
Some(v as u64)
} else {
None
}
})
}
fn state(&self) -> &Self::State {
@@ -92,54 +120,41 @@ impl DownloadTaskTrait for RqbitTask {
}
fn dl_bytes(&self) -> Option<u64> {
self.torrent.downloaded.and_then(|v| u64::try_from(v).ok())
Some(self.stats.progress_bytes)
}
fn total_bytes(&self) -> Option<u64> {
self.torrent.size.and_then(|v| u64::try_from(v).ok())
}
fn left_bytes(&self) -> Option<u64> {
self.torrent.amount_left.and_then(|v| u64::try_from(v).ok())
Some(self.stats.total_bytes)
}
fn et(&self) -> Option<Duration> {
self.torrent
.time_active
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
self.torrent.with_state(|l| match l {
ManagedTorrentState::Live(l) => Some(Duration::from_millis(
l.stats_snapshot().total_piece_download_ms,
)),
_ => None,
})
}
fn eta(&self) -> Option<Duration> {
self.torrent
.eta
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
}
fn progress(&self) -> Option<f32> {
self.torrent.progress.as_ref().map(|s| *s as f32)
self.torrent.with_state(|l| match l {
ManagedTorrentState::Live(l) => l.down_speed_estimator().time_remaining(),
_ => None,
})
}
}
impl TorrentTaskTrait for RqbitTask {
fn hash_info(&self) -> &str {
&self.hash_info
fn hash_info(&self) -> Cow<'_, str> {
Cow::Owned(self.hash_info.as_string())
}
fn tags(&self) -> impl Iterator<Item = Cow<'_, str>> {
self.torrent
.tags
.as_deref()
.unwrap_or("")
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(Cow::Borrowed)
std::iter::empty()
}
fn category(&self) -> Option<Cow<'_, str>> {
self.torrent.category.as_deref().map(Cow::Borrowed)
None
}
}
@@ -171,45 +186,4 @@ impl TorrentCreationTrait for RqbitCreation {
pub type RqbitHashSelector = DownloadIdSelector<RqbitTask>;
pub struct RqbitComplexSelector {
pub query: GetTorrentListArg,
}
impl From<RqbitHashSelector> for RqbitComplexSelector {
fn from(value: RqbitHashSelector) -> Self {
Self {
query: GetTorrentListArg {
hashes: Some(value.ids.join("|")),
..Default::default()
},
}
}
}
impl DownloadSelectorTrait for RqbitComplexSelector {
type Id = RqbitHash;
type Task = RqbitTask;
}
pub enum RqbitSelector {
Hash(RqbitHashSelector),
Complex(RqbitComplexSelector),
}
impl DownloadSelectorTrait for RqbitSelector {
type Id = RqbitHash;
type Task = RqbitTask;
fn try_into_ids_only(self) -> Result<Vec<Self::Id>, Self> {
match self {
RqbitSelector::Complex(c) => c.try_into_ids_only().map_err(RqbitSelector::Complex),
RqbitSelector::Hash(h) => {
let result = h
.try_into_ids_only()
.unwrap_or_else(|_| unreachable!("hash selector must contains hash"))
.into_iter();
Ok(result.collect_vec())
}
}
}
}
pub type RqbitSelector = RqbitHashSelector;