From 8a03dc28a25963ef173309514615594ca481efc4 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Wed, 27 Mar 2024 01:16:00 +0800 Subject: [PATCH] feat: add api client --- crates/recorder/src/downloaders/bytes.rs | 12 --- crates/recorder/src/downloaders/defs.rs | 85 ++++++++++++++++++- crates/recorder/src/downloaders/html.rs | 11 --- crates/recorder/src/downloaders/image.rs | 8 -- crates/recorder/src/downloaders/mod.rs | 3 - crates/recorder/src/lib.rs | 2 + .../src/migrations/m20220101_000001_init.rs | 2 +- crates/recorder/src/models/bangumi.rs | 6 ++ crates/recorder/src/models/db_utils.rs | 83 ++++++++++++++++++ crates/recorder/src/models/downloads.rs | 77 ++++++----------- .../recorder/src/models/entities/downloads.rs | 2 +- .../src/models/entities/subscriptions.rs | 4 +- crates/recorder/src/models/episodes.rs | 33 ++++++- crates/recorder/src/models/mod.rs | 1 + .../src/parsers/mikan/mikan_client.rs | 31 +++++++ .../src/parsers/mikan/mikan_ep_parser.rs | 18 ++-- .../src/parsers/mikan/mikan_rss_parser.rs | 16 ++-- crates/recorder/src/parsers/mikan/mod.rs | 1 + crates/recorder/src/parsers/mod.rs | 1 - crates/recorder/src/parsers/rss/mod.rs | 15 ---- .../src/parsers/tmdb/tmdb_bgm_parser.rs | 14 +-- .../recorder/src/parsers/tmdb/tmdb_client.rs | 59 ++++++------- .../src/parsers/tmdb/tmdb_list_parser.rs | 4 +- .../src/parsers/torrent/torrent_ep_parser.rs | 2 +- crates/recorder/src/subscribe/mod.rs | 1 + 25 files changed, 328 insertions(+), 163 deletions(-) delete mode 100644 crates/recorder/src/downloaders/bytes.rs delete mode 100644 crates/recorder/src/downloaders/html.rs delete mode 100644 crates/recorder/src/downloaders/image.rs create mode 100644 crates/recorder/src/models/db_utils.rs create mode 100644 crates/recorder/src/parsers/mikan/mikan_client.rs delete mode 100644 crates/recorder/src/parsers/rss/mod.rs create mode 100644 crates/recorder/src/subscribe/mod.rs diff --git a/crates/recorder/src/downloaders/bytes.rs b/crates/recorder/src/downloaders/bytes.rs deleted file mode 100644 index 56fbfbf..0000000 --- a/crates/recorder/src/downloaders/bytes.rs +++ /dev/null @@ -1,12 +0,0 @@ -use bytes::Bytes; -use reqwest::IntoUrl; - -use super::defs::DEFAULT_USER_AGENT; - -pub async fn download_bytes(url: T) -> eyre::Result { - let request_client = reqwest::Client::builder() - .user_agent(DEFAULT_USER_AGENT) - .build()?; - let bytes = request_client.get(url).send().await?.bytes().await?; - Ok(bytes) -} diff --git a/crates/recorder/src/downloaders/defs.rs b/crates/recorder/src/downloaders/defs.rs index bb0cd8f..eb87f9c 100644 --- a/crates/recorder/src/downloaders/defs.rs +++ b/crates/recorder/src/downloaders/defs.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use itertools::Itertools; use lazy_static::lazy_static; use librqbit_core::{ @@ -9,10 +10,20 @@ pub use qbit_rs::model::{ TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource, }; use regex::Regex; -use serde::{Deserialize, Serialize}; +use reqwest::{header::HeaderMap, IntoUrl}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio_utils::RateLimiter; use url::Url; -use crate::downloaders::{bytes::download_bytes, error::DownloaderError}; +use super::error::DownloaderError; + +async fn download_bytes(url: T) -> eyre::Result { + let request_client = reqwest::Client::builder() + .user_agent(DEFAULT_USER_AGENT) + .build()?; + let bytes = request_client.get(url).send().await?.bytes().await?; + Ok(bytes) +} pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent"; pub const MAGNET_SCHEMA: &str = "magnet"; @@ -247,3 +258,73 @@ impl Torrent { } } } + +pub struct ApiClient { + headers: HeaderMap, + rate_limiter: RateLimiter, + fetch_client: reqwest::Client, +} + +impl ApiClient { + pub fn new( + throttle_duration: std::time::Duration, + override_headers: Option, + ) -> eyre::Result { + Ok(Self { + headers: override_headers.unwrap_or_else(HeaderMap::new), + rate_limiter: RateLimiter::new(throttle_duration), + fetch_client: reqwest::Client::builder() + .user_agent(DEFAULT_USER_AGENT) + .build()?, + }) + } + + pub async fn fetch_json(&self, f: F) -> Result + where + F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder, + R: DeserializeOwned, + { + self.rate_limiter + .throttle(|| async { + f(&self.fetch_client) + .headers(self.headers.clone()) + .send() + .await? + .json::() + .await + }) + .await + } + + pub async fn fetch_bytes(&self, f: F) -> Result + where + F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder, + { + self.rate_limiter + .throttle(|| async { + f(&self.fetch_client) + .headers(self.headers.clone()) + .send() + .await? + .bytes() + .await + }) + .await + } + + pub async fn fetch_text(&self, f: F) -> Result + where + F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder, + { + self.rate_limiter + .throttle(|| async { + f(&self.fetch_client) + .headers(self.headers.clone()) + .send() + .await? + .text() + .await + }) + .await + } +} diff --git a/crates/recorder/src/downloaders/html.rs b/crates/recorder/src/downloaders/html.rs deleted file mode 100644 index d1eed9e..0000000 --- a/crates/recorder/src/downloaders/html.rs +++ /dev/null @@ -1,11 +0,0 @@ -use reqwest::IntoUrl; - -use super::defs::DEFAULT_USER_AGENT; - -pub async fn download_html(url: U) -> eyre::Result { - let request_client = reqwest::Client::builder() - .user_agent(DEFAULT_USER_AGENT) - .build()?; - let content = request_client.get(url).send().await?.text().await?; - Ok(content) -} diff --git a/crates/recorder/src/downloaders/image.rs b/crates/recorder/src/downloaders/image.rs deleted file mode 100644 index 5316090..0000000 --- a/crates/recorder/src/downloaders/image.rs +++ /dev/null @@ -1,8 +0,0 @@ -use bytes::Bytes; -use reqwest::IntoUrl; - -use super::bytes::download_bytes; - -pub async fn download_image(url: U) -> eyre::Result { - download_bytes(url).await -} diff --git a/crates/recorder/src/downloaders/mod.rs b/crates/recorder/src/downloaders/mod.rs index 26dd556..a229823 100644 --- a/crates/recorder/src/downloaders/mod.rs +++ b/crates/recorder/src/downloaders/mod.rs @@ -1,7 +1,4 @@ -pub mod bytes; pub mod defs; pub mod error; -pub mod html; pub mod qbitorrent; pub mod torrent_downloader; -pub mod image; diff --git a/crates/recorder/src/lib.rs b/crates/recorder/src/lib.rs index 9788cfa..7702581 100644 --- a/crates/recorder/src/lib.rs +++ b/crates/recorder/src/lib.rs @@ -15,3 +15,5 @@ pub mod views; pub mod workers; pub mod i18n; + +pub mod subscribe; diff --git a/crates/recorder/src/migrations/m20220101_000001_init.rs b/crates/recorder/src/migrations/m20220101_000001_init.rs index ac12ab2..21a6632 100644 --- a/crates/recorder/src/migrations/m20220101_000001_init.rs +++ b/crates/recorder/src/migrations/m20220101_000001_init.rs @@ -42,7 +42,7 @@ impl MigrationTrait for Migration { subscriptions::SubscriptionCategoryEnum, &[ subscriptions::SubscriptionCategory::Mikan, - subscriptions::SubscriptionCategory::Manual, + subscriptions::SubscriptionCategory::Tmdb, ], ) .await?; diff --git a/crates/recorder/src/models/bangumi.rs b/crates/recorder/src/models/bangumi.rs index 17f203f..b28d9a3 100644 --- a/crates/recorder/src/models/bangumi.rs +++ b/crates/recorder/src/models/bangumi.rs @@ -2,6 +2,7 @@ use regex::Regex; use sea_orm::entity::prelude::*; pub use super::entities::bangumi::*; +use crate::models::downloads; #[async_trait::async_trait] impl ActiveModelBehavior for ActiveModel {} @@ -23,3 +24,8 @@ impl BangumiFilter { Ok(false) } } + +impl Model { + pub async fn search_all() {} + pub async fn match_list(dnlds: Vec) {} +} diff --git a/crates/recorder/src/models/db_utils.rs b/crates/recorder/src/models/db_utils.rs new file mode 100644 index 0000000..3e06a53 --- /dev/null +++ b/crates/recorder/src/models/db_utils.rs @@ -0,0 +1,83 @@ +use sea_orm::{ + sea_query::{Expr, InsertStatement, IntoColumnRef, Query, SimpleExpr}, + ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, EntityName, EntityTrait, + FromQueryResult, Iterable, SelectModel, SelectorRaw, TryGetable, +}; + +#[derive(FromQueryResult)] +pub(crate) struct OnlyIdsModel +where + Id: TryGetable, +{ + pub id: Id, +} + +pub(crate) async fn insert_many_with_returning_columns( + db: &D, + insert_values: impl IntoIterator, + returning_columns: impl IntoIterator, + extra_config: F, +) -> eyre::Result> +where + D: ConnectionTrait, + V: ActiveModelTrait, + T: Into, + F: FnOnce(&mut InsertStatement), + M: FromQueryResult, +{ + let db_backend = db.get_database_backend(); + assert!( + db_backend.support_returning(), + "db backend must support returning!" + ); + let ent = V::Entity::default(); + let mut insert = Query::insert(); + let mut insert_statement = insert + .into_table(ent.table_ref()) + .returning(Query::returning().exprs(returning_columns)); + + { + extra_config(&mut insert_statement); + } + + let mut columns = vec![]; + + for new_item in insert_values { + let mut values = vec![]; + for c in ::Column::iter() { + if let ActiveValue::Set(value) = new_item.get(c.clone()) { + columns.push(c); + values.push(SimpleExpr::Value(value)); + } + } + insert_statement.values(values)?; + } + insert_statement.columns(columns); + + let result = SelectorRaw::>::from_statement(db_backend.build(insert_statement)) + .all(db) + .await?; + + Ok(result) +} + +pub(crate) async fn insert_many_with_returning_all( + db: &D, + insert_values: impl IntoIterator, + extra_config: F, +) -> eyre::Result::Model>> +where + D: ConnectionTrait, + V: ActiveModelTrait, + F: FnOnce(&mut InsertStatement), +{ + let result: Vec<::Model> = insert_many_with_returning_columns( + db, + insert_values, + ::Column::iter().map(|c| c.select_as(Expr::col(c))), + extra_config, + ) + .await?; + + Ok(result) +} diff --git a/crates/recorder/src/models/downloads.rs b/crates/recorder/src/models/downloads.rs index c82ae89..b2a22a8 100644 --- a/crates/recorder/src/models/downloads.rs +++ b/crates/recorder/src/models/downloads.rs @@ -1,10 +1,19 @@ +use itertools::Itertools; use loco_rs::app::AppContext; -use sea_orm::{prelude::*, sea_query::OnConflict, ActiveValue, Condition, QueryOrder, QuerySelect}; +use sea_orm::{ + prelude::*, + sea_query::{InsertStatement, OnConflict}, +}; pub use crate::models::entities::downloads::*; use crate::{ - models::subscriptions::{self, SubscriptionCategory}, - parsers::mikan::{parse_mikan_rss_items_from_rss_link, MikanRssItem}, + models::{ + db_utils::insert_many_with_returning_all, + subscriptions::{self, SubscriptionCategory}, + }, + parsers::mikan::{ + mikan_client::MikanClient, parse_mikan_rss_items_from_rss_link, MikanRssItem, + }, }; #[async_trait::async_trait] @@ -12,18 +21,6 @@ impl ActiveModelBehavior for ActiveModel {} impl ActiveModel { pub fn from_mikan_rss_item(m: MikanRssItem, subscription_id: i32) -> Self { - let _ = Self { - origin_name: ActiveValue::Set(m.title.clone()), - display_name: ActiveValue::Set(m.title), - subscription_id: ActiveValue::Set(subscription_id), - status: ActiveValue::Set(DownloadStatus::Pending), - mime: ActiveValue::Set(DownloadMime::BitTorrent), - url: ActiveValue::Set(m.url), - curr_size: ActiveValue::Set(m.content_length.as_ref().map(|_| 0)), - all_size: ActiveValue::Set(m.content_length), - homepage: ActiveValue::Set(m.homepage), - ..Default::default() - }; todo!() } } @@ -31,56 +28,34 @@ impl ActiveModel { impl Model { pub async fn pull_subscription( ctx: AppContext, - item: &subscriptions::Model, - ) -> eyre::Result> { + subscription: &subscriptions::Model, + ) -> eyre::Result> { let db = &ctx.db; - match &item.category { + match &subscription.category { SubscriptionCategory::Mikan => { - let items = parse_mikan_rss_items_from_rss_link(&item.source_url).await?; + let subscriber_id = subscription.subscriber_id; + let client = MikanClient::new(subscriber_id).await?; + let items = + parse_mikan_rss_items_from_rss_link(&client, &subscription.source_url).await?; let all_items = items.collect::>(); - let last_old_id = { - Entity::find() - .select_only() - .column(Column::Id) - .order_by_desc(Column::Id) - .filter(Column::SubscriptionId.eq(item.id)) - .one(db) - .await? - } - .map(|i| i.id); - if all_items.is_empty() { return Ok(vec![]); } let new_items = all_items .into_iter() - .map(|i| ActiveModel::from_mikan_rss_item(i, item.id)); + .map(|i| ActiveModel::from_mikan_rss_item(i, subscription.id)) + .collect_vec(); - let insert_result = Entity::insert_many(new_items) - .on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned()) - .exec(db) - .await?; - - let insert_ids = Entity::find() - .select_only() - .column(Column::Id) - .filter({ - let mut cond = Condition::all() - .add(Column::SubscriptionId.eq(item.id)) - .add(Column::Id.lte(insert_result.last_insert_id)); - - if let Some(last_old_id) = last_old_id { - cond = cond.add(Column::Id.gt(last_old_id)) - } - - cond + // insert and filter out duplicated items + let new_items: Vec = + insert_many_with_returning_all(db, new_items, |stat: &mut InsertStatement| { + stat.on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned()); }) - .all(db) .await?; - Ok(insert_ids.into_iter().map(|i| i.id).collect::>()) + Ok(new_items) } _ => { todo!("other subscription categories") diff --git a/crates/recorder/src/models/entities/downloads.rs b/crates/recorder/src/models/entities/downloads.rs index dd34bce..c01a701 100644 --- a/crates/recorder/src/models/entities/downloads.rs +++ b/crates/recorder/src/models/entities/downloads.rs @@ -1,4 +1,4 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, FromJsonQueryResult}; use serde::{Deserialize, Serialize}; #[derive( diff --git a/crates/recorder/src/models/entities/subscriptions.rs b/crates/recorder/src/models/entities/subscriptions.rs index 8fee5f2..cb2c12b 100644 --- a/crates/recorder/src/models/entities/subscriptions.rs +++ b/crates/recorder/src/models/entities/subscriptions.rs @@ -13,8 +13,8 @@ use serde::{Deserialize, Serialize}; pub enum SubscriptionCategory { #[sea_orm(string_value = "mikan")] Mikan, - #[sea_orm(string_value = "manual")] - Manual, + #[sea_orm(string_value = "tmdb")] + Tmdb, } #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] diff --git a/crates/recorder/src/models/episodes.rs b/crates/recorder/src/models/episodes.rs index 00c1435..7d9e9dc 100644 --- a/crates/recorder/src/models/episodes.rs +++ b/crates/recorder/src/models/episodes.rs @@ -1,6 +1,37 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, ActiveValue}; pub use super::entities::episodes::*; +use crate::models::{bangumi, downloads}; #[async_trait::async_trait] impl ActiveModelBehavior for ActiveModel {} + +impl ActiveModel { + pub async fn from_mikan_rss_item(dl: &downloads::Model, bgm: &bangumi::Model) -> Self { + let _ = Self { + raw_name: ActiveValue::Set(dl.origin_name.clone()), + official_title: ActiveValue::Set(bgm.official_title.clone()), + display_name: ActiveValue::Set(bgm.display_name.clone()), + name_zh: Default::default(), + name_jp: Default::default(), + name_en: Default::default(), + s_name_zh: Default::default(), + s_name_jp: Default::default(), + s_name_en: Default::default(), + bangumi_id: Default::default(), + download_id: Default::default(), + save_path: Default::default(), + resolution: Default::default(), + season: Default::default(), + season_raw: Default::default(), + fansub: Default::default(), + poster_link: Default::default(), + home_page: Default::default(), + subtitle: Default::default(), + deleted: Default::default(), + source: Default::default(), + ..Default::default() + }; + todo!() + } +} diff --git a/crates/recorder/src/models/mod.rs b/crates/recorder/src/models/mod.rs index 9c2dc99..ef0183b 100644 --- a/crates/recorder/src/models/mod.rs +++ b/crates/recorder/src/models/mod.rs @@ -1,4 +1,5 @@ pub mod bangumi; +pub(crate) mod db_utils; pub mod downloaders; pub mod downloads; pub mod entities; diff --git a/crates/recorder/src/parsers/mikan/mikan_client.rs b/crates/recorder/src/parsers/mikan/mikan_client.rs new file mode 100644 index 0000000..e705feb --- /dev/null +++ b/crates/recorder/src/parsers/mikan/mikan_client.rs @@ -0,0 +1,31 @@ +use std::{ops::Deref, sync::Arc}; + +use tokio::sync::OnceCell; + +use crate::downloaders::defs::ApiClient; + +pub struct MikanClient { + api_client: ApiClient, +} + +static MIKAN_CLIENT: OnceCell> = OnceCell::const_new(); + +impl MikanClient { + pub async fn new(_subscriber_id: i32) -> eyre::Result> { + let res = MIKAN_CLIENT + .get_or_try_init(|| async { + ApiClient::new(std::time::Duration::from_millis(50), None) + .map(|api_client| Arc::new(Self { api_client })) + }) + .await?; + Ok(res.clone()) + } +} + +impl Deref for MikanClient { + type Target = ApiClient; + + fn deref(&self) -> &Self::Target { + &self.api_client + } +} diff --git a/crates/recorder/src/parsers/mikan/mikan_ep_parser.rs b/crates/recorder/src/parsers/mikan/mikan_ep_parser.rs index cc955af..855debd 100644 --- a/crates/recorder/src/parsers/mikan/mikan_ep_parser.rs +++ b/crates/recorder/src/parsers/mikan/mikan_ep_parser.rs @@ -5,9 +5,9 @@ use lightningcss::{properties::Property, values::image::Image}; use regex::Regex; use url::Url; -use crate::{ - downloaders::{html::download_html, image::download_image}, - parsers::html::{get_tag_style, query_selector_first_tag}, +use crate::parsers::{ + html::{get_tag_style, query_selector_first_tag}, + mikan::mikan_client::MikanClient, }; pub struct MikanEpisodeMeta { @@ -22,10 +22,11 @@ lazy_static! { } pub async fn parse_episode_meta_from_mikan_homepage( + client: &MikanClient, url: Url, ) -> eyre::Result> { let url_host = url.origin().unicode_serialization(); - let content = download_html(url.as_str()).await?; + let content = client.fetch_text(|f| f.get(url.as_str())).await?; let dom = tl::parse(&content, tl::ParserOptions::default())?; let parser = dom.parser(); let poster_node = query_selector_first_tag(&dom, r"div.bangumi-poster", parser); @@ -62,7 +63,7 @@ pub async fn parse_episode_meta_from_mikan_homepage( p }); let poster_data = if let Some(p) = origin_poster_src.as_ref() { - download_image(p.clone()).await.ok() + client.fetch_bytes(|f| f.get(p.clone())).await.ok() } else { None }; @@ -93,6 +94,7 @@ mod test { use url::Url; use super::parse_episode_meta_from_mikan_homepage; + use crate::parsers::mikan::mikan_client::MikanClient; #[tokio::test] async fn test_parse_mikan() { @@ -101,7 +103,11 @@ mod test { "https://mikanani.me/Home/Episode/475184dce83ea2b82902592a5ac3343f6d54b36a"; let url = Url::parse(url_str)?; - if let Some(ep_meta) = parse_episode_meta_from_mikan_homepage(url.clone()).await? { + let client = MikanClient::new(0).await.expect("should get mikan client"); + + if let Some(ep_meta) = + parse_episode_meta_from_mikan_homepage(&client, url.clone()).await? + { assert_eq!(ep_meta.homepage, url); assert_eq!(ep_meta.official_title, "葬送的芙莉莲"); assert_eq!( diff --git a/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs b/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs index 5175809..a062529 100644 --- a/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs +++ b/crates/recorder/src/parsers/mikan/mikan_rss_parser.rs @@ -3,8 +3,8 @@ use reqwest::IntoUrl; use serde::{Deserialize, Serialize}; use crate::{ - downloaders::{bytes::download_bytes, defs::BITTORRENT_MIME_TYPE}, - parsers::errors::ParseError, + downloaders::defs::BITTORRENT_MIME_TYPE, + parsers::{errors::ParseError, mikan::mikan_client::MikanClient}, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -50,9 +50,10 @@ impl TryFrom for MikanRssItem { } pub async fn parse_mikan_rss_items_from_rss_link( + client: &MikanClient, url: impl IntoUrl, ) -> eyre::Result> { - let bytes = download_bytes(url).await?; + let bytes = client.fetch_bytes(|f| f.get(url)).await?; let channel = rss::Channel::read_from(&bytes[..])?; @@ -62,14 +63,17 @@ pub async fn parse_mikan_rss_items_from_rss_link( #[cfg(test)] mod tests { use super::parse_mikan_rss_items_from_rss_link; - use crate::downloaders::defs::BITTORRENT_MIME_TYPE; + use crate::{ + downloaders::defs::BITTORRENT_MIME_TYPE, parsers::mikan::mikan_client::MikanClient, + }; #[tokio::test] pub async fn test_mikan_subscription_items_from_rss_url() { let url = "https://mikanani.me/RSS/Bangumi?bangumiId=3141&subgroupid=370"; - let items = parse_mikan_rss_items_from_rss_link(url) + let client = MikanClient::new(0).await.expect("should get mikan client"); + let items = parse_mikan_rss_items_from_rss_link(&client, url) .await - .expect("should get subscription items from rss url") + .expect("should get subscription items from subscription url") .collect::>(); let first_sub_item = items diff --git a/crates/recorder/src/parsers/mikan/mod.rs b/crates/recorder/src/parsers/mikan/mod.rs index 831a0e8..bb6736d 100644 --- a/crates/recorder/src/parsers/mikan/mod.rs +++ b/crates/recorder/src/parsers/mikan/mod.rs @@ -1,3 +1,4 @@ +pub mod mikan_client; pub mod mikan_ep_parser; pub mod mikan_rss_parser; diff --git a/crates/recorder/src/parsers/mod.rs b/crates/recorder/src/parsers/mod.rs index 43c3b1f..5914cb0 100644 --- a/crates/recorder/src/parsers/mod.rs +++ b/crates/recorder/src/parsers/mod.rs @@ -3,6 +3,5 @@ pub mod errors; pub mod html; pub mod mikan; pub mod raw; -pub mod rss; pub mod tmdb; pub mod torrent; diff --git a/crates/recorder/src/parsers/rss/mod.rs b/crates/recorder/src/parsers/rss/mod.rs deleted file mode 100644 index 2de3653..0000000 --- a/crates/recorder/src/parsers/rss/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -use crate::{ - models::entities::subscriptions, - parsers::mikan::{parse_episode_meta_from_mikan_homepage, MikanRssItem}, -}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum RssItem { - Mikan(MikanRssItem), -} - -// pub async fn parse_official_title_from_rss_item (rss: &subscriptions::Model) -// -> String { if rss.category == subscriptions::SubscriptionCategory::Mikan -// { let res = parse_episode_meta_from_mikan_homepage(rss.source_url) -// } -// } diff --git a/crates/recorder/src/parsers/tmdb/tmdb_bgm_parser.rs b/crates/recorder/src/parsers/tmdb/tmdb_bgm_parser.rs index 8adf856..aaf1bea 100644 --- a/crates/recorder/src/parsers/tmdb/tmdb_bgm_parser.rs +++ b/crates/recorder/src/parsers/tmdb/tmdb_bgm_parser.rs @@ -86,14 +86,17 @@ pub async fn search_tmdb_items_from_title_and_lang( let mut items = vec![]; let page_num = { let search_url = build_tmdb_search_api_url(title, lang, 1); - let first_page: TmdbSearchMultiPageDto = - tmdb_client.fetch(|fetch| fetch.get(search_url)).await?; + let first_page: TmdbSearchMultiPageDto = tmdb_client + .fetch_json(|fetch| fetch.get(search_url)) + .await?; items.extend(first_page.results); first_page.total_pages }; for i in 2..=page_num { let search_url = build_tmdb_search_api_url(title, lang, i); - let page: TmdbSearchMultiPageDto = tmdb_client.fetch(|fetch| fetch.get(search_url)).await?; + let page: TmdbSearchMultiPageDto = tmdb_client + .fetch_json(|fetch| fetch.get(search_url)) + .await?; items.extend(page.results); } Ok(items) @@ -107,11 +110,12 @@ pub async fn get_tmdb_info_from_id_lang_and_distribution( ) -> eyre::Result { let info_url = build_tmdb_info_api_url(id, lang, distribution); let info = if distribution == &BangumiDistribution::Movie { - let info: Box = tmdb_client.fetch(|fetch| fetch.get(info_url)).await?; + let info: Box = + tmdb_client.fetch_json(|fetch| fetch.get(info_url)).await?; TmdbMediaDetailDto::Movie(info) } else { let info: Box = - tmdb_client.fetch(|fetch| fetch.get(info_url)).await?; + tmdb_client.fetch_json(|fetch| fetch.get(info_url)).await?; TmdbMediaDetailDto::Tv(info) }; Ok(info) diff --git a/crates/recorder/src/parsers/tmdb/tmdb_client.rs b/crates/recorder/src/parsers/tmdb/tmdb_client.rs index 940d569..9bc1ec1 100644 --- a/crates/recorder/src/parsers/tmdb/tmdb_client.rs +++ b/crates/recorder/src/parsers/tmdb/tmdb_client.rs @@ -1,21 +1,20 @@ -use std::sync::{Arc, Weak}; +use std::{ + ops::Deref, + sync::{Arc, Weak}, +}; use lazy_static::lazy_static; use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION}; -use serde::de::DeserializeOwned; use tokio::sync::RwLock; -use tokio_utils::RateLimiter; use weak_table::WeakValueHashMap; -use crate::downloaders::defs::DEFAULT_USER_AGENT; +use crate::downloaders::defs::{ApiClient, DEFAULT_USER_AGENT}; pub(crate) const TMDB_API_ORIGIN: &str = "https://api.themoviedb.org"; pub struct TmdbApiClient { api_token: String, - rate_limiter: RateLimiter, - fetch_client: reqwest::Client, - headers: HeaderMap, + api_client: ApiClient, } lazy_static! { @@ -34,19 +33,18 @@ impl TmdbApiClient { } let client = Arc::new(TmdbApiClient { api_token: api_token.to_string(), - rate_limiter: RateLimiter::new(std::time::Duration::from_millis(50)), - fetch_client: reqwest::Client::builder() - .user_agent(DEFAULT_USER_AGENT) - .build()?, - headers: { - let mut header_map = HeaderMap::new(); - header_map.insert(ACCEPT, HeaderValue::from_static("application/json")); - header_map.insert( - AUTHORIZATION, - HeaderValue::from_str(&format!("Bearer {api_token}"))?, - ); - header_map - }, + api_client: ApiClient::new( + std::time::Duration::from_millis(50), + Some({ + let mut header_map = HeaderMap::new(); + header_map.insert(ACCEPT, HeaderValue::from_static("application/json")); + header_map.insert( + AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {api_token}"))?, + ); + header_map + }), + )?, }); { let mut map_write = TMDB_API_CLIENT_MAP.write().await; @@ -58,22 +56,13 @@ impl TmdbApiClient { pub fn get_api_token(&self) -> &str { &self.api_token } +} - pub async fn fetch(&self, f: F) -> Result - where - F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder, - R: DeserializeOwned, - { - self.rate_limiter - .throttle(|| async { - f(&self.fetch_client) - .headers(self.headers.clone()) - .send() - .await? - .json::() - .await - }) - .await +impl Deref for TmdbApiClient { + type Target = ApiClient; + + fn deref(&self) -> &Self::Target { + &self.api_client } } diff --git a/crates/recorder/src/parsers/tmdb/tmdb_list_parser.rs b/crates/recorder/src/parsers/tmdb/tmdb_list_parser.rs index 84a1e43..7fefb42 100644 --- a/crates/recorder/src/parsers/tmdb/tmdb_list_parser.rs +++ b/crates/recorder/src/parsers/tmdb/tmdb_list_parser.rs @@ -27,7 +27,7 @@ pub async fn parse_tmdb_list_items_from_list_api( let page_num = { let first_page: TmdbListPageDto = tmdb_client - .fetch(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, 1))) + .fetch_json(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, 1))) .await?; items.extend(first_page.results); @@ -37,7 +37,7 @@ pub async fn parse_tmdb_list_items_from_list_api( for i in 2..=page_num { let page: TmdbListPageDto = tmdb_client - .fetch(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, i))) + .fetch_json(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, i))) .await?; items.extend(page.results); diff --git a/crates/recorder/src/parsers/torrent/torrent_ep_parser.rs b/crates/recorder/src/parsers/torrent/torrent_ep_parser.rs index 4b94d4c..2f1f4e8 100644 --- a/crates/recorder/src/parsers/torrent/torrent_ep_parser.rs +++ b/crates/recorder/src/parsers/torrent/torrent_ep_parser.rs @@ -52,7 +52,7 @@ fn get_fansub(group_and_title: &str) -> (Option<&str>, &str) { .filter(|s| !s.is_empty()) .collect::>(); - match (n.get(0), n.get(1)) { + match (n.first(), n.get(1)) { (None, None) => (None, ""), (Some(n0), None) => (None, *n0), (Some(n0), Some(n1)) => { diff --git a/crates/recorder/src/subscribe/mod.rs b/crates/recorder/src/subscribe/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/recorder/src/subscribe/mod.rs @@ -0,0 +1 @@ +