feat: add api client

This commit is contained in:
master 2024-03-27 01:16:00 +08:00
parent 5e51b2752d
commit 8a03dc28a2
25 changed files with 328 additions and 163 deletions

View File

@ -1,12 +0,0 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::defs::DEFAULT_USER_AGENT;
pub async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
let bytes = request_client.get(url).send().await?.bytes().await?;
Ok(bytes)
}

View File

@ -1,3 +1,4 @@
use bytes::Bytes;
use itertools::Itertools;
use lazy_static::lazy_static;
use librqbit_core::{
@ -9,10 +10,20 @@ pub use qbit_rs::model::{
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
};
use regex::Regex;
use serde::{Deserialize, Serialize};
use reqwest::{header::HeaderMap, IntoUrl};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio_utils::RateLimiter;
use url::Url;
use crate::downloaders::{bytes::download_bytes, error::DownloaderError};
use super::error::DownloaderError;
async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
let bytes = request_client.get(url).send().await?.bytes().await?;
Ok(bytes)
}
pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent";
pub const MAGNET_SCHEMA: &str = "magnet";
@ -247,3 +258,73 @@ impl Torrent {
}
}
}
pub struct ApiClient {
headers: HeaderMap,
rate_limiter: RateLimiter,
fetch_client: reqwest::Client,
}
impl ApiClient {
pub fn new(
throttle_duration: std::time::Duration,
override_headers: Option<HeaderMap>,
) -> eyre::Result<Self> {
Ok(Self {
headers: override_headers.unwrap_or_else(HeaderMap::new),
rate_limiter: RateLimiter::new(throttle_duration),
fetch_client: reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?,
})
}
pub async fn fetch_json<R, F>(&self, f: F) -> Result<R, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
R: DeserializeOwned,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.json::<R>()
.await
})
.await
}
pub async fn fetch_bytes<F>(&self, f: F) -> Result<Bytes, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.bytes()
.await
})
.await
}
pub async fn fetch_text<F>(&self, f: F) -> Result<String, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.text()
.await
})
.await
}
}

View File

@ -1,11 +0,0 @@
use reqwest::IntoUrl;
use super::defs::DEFAULT_USER_AGENT;
pub async fn download_html<U: IntoUrl>(url: U) -> eyre::Result<String> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
let content = request_client.get(url).send().await?.text().await?;
Ok(content)
}

View File

@ -1,8 +0,0 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::bytes::download_bytes;
pub async fn download_image<U: IntoUrl>(url: U) -> eyre::Result<Bytes> {
download_bytes(url).await
}

View File

@ -1,7 +1,4 @@
pub mod bytes;
pub mod defs;
pub mod error;
pub mod html;
pub mod qbitorrent;
pub mod torrent_downloader;
pub mod image;

View File

@ -15,3 +15,5 @@ pub mod views;
pub mod workers;
pub mod i18n;
pub mod subscribe;

View File

@ -42,7 +42,7 @@ impl MigrationTrait for Migration {
subscriptions::SubscriptionCategoryEnum,
&[
subscriptions::SubscriptionCategory::Mikan,
subscriptions::SubscriptionCategory::Manual,
subscriptions::SubscriptionCategory::Tmdb,
],
)
.await?;

View File

@ -2,6 +2,7 @@ use regex::Regex;
use sea_orm::entity::prelude::*;
pub use super::entities::bangumi::*;
use crate::models::downloads;
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
@ -23,3 +24,8 @@ impl BangumiFilter {
Ok(false)
}
}
impl Model {
pub async fn search_all() {}
pub async fn match_list(dnlds: Vec<downloads::Model>) {}
}

View File

@ -0,0 +1,83 @@
use sea_orm::{
sea_query::{Expr, InsertStatement, IntoColumnRef, Query, SimpleExpr},
ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, EntityName, EntityTrait,
FromQueryResult, Iterable, SelectModel, SelectorRaw, TryGetable,
};
#[derive(FromQueryResult)]
pub(crate) struct OnlyIdsModel<Id>
where
Id: TryGetable,
{
pub id: Id,
}
pub(crate) async fn insert_many_with_returning_columns<M, D, V, T, F>(
db: &D,
insert_values: impl IntoIterator<Item = V>,
returning_columns: impl IntoIterator<Item = T>,
extra_config: F,
) -> eyre::Result<Vec<M>>
where
D: ConnectionTrait,
V: ActiveModelTrait,
T: Into<SimpleExpr>,
F: FnOnce(&mut InsertStatement),
M: FromQueryResult,
{
let db_backend = db.get_database_backend();
assert!(
db_backend.support_returning(),
"db backend must support returning!"
);
let ent = V::Entity::default();
let mut insert = Query::insert();
let mut insert_statement = insert
.into_table(ent.table_ref())
.returning(Query::returning().exprs(returning_columns));
{
extra_config(&mut insert_statement);
}
let mut columns = vec![];
for new_item in insert_values {
let mut values = vec![];
for c in <V::Entity as EntityTrait>::Column::iter() {
if let ActiveValue::Set(value) = new_item.get(c.clone()) {
columns.push(c);
values.push(SimpleExpr::Value(value));
}
}
insert_statement.values(values)?;
}
insert_statement.columns(columns);
let result = SelectorRaw::<SelectModel<M>>::from_statement(db_backend.build(insert_statement))
.all(db)
.await?;
Ok(result)
}
pub(crate) async fn insert_many_with_returning_all<D, V, F>(
db: &D,
insert_values: impl IntoIterator<Item = V>,
extra_config: F,
) -> eyre::Result<Vec<<V::Entity as EntityTrait>::Model>>
where
D: ConnectionTrait,
V: ActiveModelTrait,
F: FnOnce(&mut InsertStatement),
{
let result: Vec<<V::Entity as EntityTrait>::Model> = insert_many_with_returning_columns(
db,
insert_values,
<V::Entity as EntityTrait>::Column::iter().map(|c| c.select_as(Expr::col(c))),
extra_config,
)
.await?;
Ok(result)
}

View File

@ -1,10 +1,19 @@
use itertools::Itertools;
use loco_rs::app::AppContext;
use sea_orm::{prelude::*, sea_query::OnConflict, ActiveValue, Condition, QueryOrder, QuerySelect};
use sea_orm::{
prelude::*,
sea_query::{InsertStatement, OnConflict},
};
pub use crate::models::entities::downloads::*;
use crate::{
models::subscriptions::{self, SubscriptionCategory},
parsers::mikan::{parse_mikan_rss_items_from_rss_link, MikanRssItem},
models::{
db_utils::insert_many_with_returning_all,
subscriptions::{self, SubscriptionCategory},
},
parsers::mikan::{
mikan_client::MikanClient, parse_mikan_rss_items_from_rss_link, MikanRssItem,
},
};
#[async_trait::async_trait]
@ -12,18 +21,6 @@ impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn from_mikan_rss_item(m: MikanRssItem, subscription_id: i32) -> Self {
let _ = Self {
origin_name: ActiveValue::Set(m.title.clone()),
display_name: ActiveValue::Set(m.title),
subscription_id: ActiveValue::Set(subscription_id),
status: ActiveValue::Set(DownloadStatus::Pending),
mime: ActiveValue::Set(DownloadMime::BitTorrent),
url: ActiveValue::Set(m.url),
curr_size: ActiveValue::Set(m.content_length.as_ref().map(|_| 0)),
all_size: ActiveValue::Set(m.content_length),
homepage: ActiveValue::Set(m.homepage),
..Default::default()
};
todo!()
}
}
@ -31,56 +28,34 @@ impl ActiveModel {
impl Model {
pub async fn pull_subscription(
ctx: AppContext,
item: &subscriptions::Model,
) -> eyre::Result<Vec<i32>> {
subscription: &subscriptions::Model,
) -> eyre::Result<Vec<Model>> {
let db = &ctx.db;
match &item.category {
match &subscription.category {
SubscriptionCategory::Mikan => {
let items = parse_mikan_rss_items_from_rss_link(&item.source_url).await?;
let subscriber_id = subscription.subscriber_id;
let client = MikanClient::new(subscriber_id).await?;
let items =
parse_mikan_rss_items_from_rss_link(&client, &subscription.source_url).await?;
let all_items = items.collect::<Vec<_>>();
let last_old_id = {
Entity::find()
.select_only()
.column(Column::Id)
.order_by_desc(Column::Id)
.filter(Column::SubscriptionId.eq(item.id))
.one(db)
.await?
}
.map(|i| i.id);
if all_items.is_empty() {
return Ok(vec![]);
}
let new_items = all_items
.into_iter()
.map(|i| ActiveModel::from_mikan_rss_item(i, item.id));
.map(|i| ActiveModel::from_mikan_rss_item(i, subscription.id))
.collect_vec();
let insert_result = Entity::insert_many(new_items)
.on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned())
.exec(db)
.await?;
let insert_ids = Entity::find()
.select_only()
.column(Column::Id)
.filter({
let mut cond = Condition::all()
.add(Column::SubscriptionId.eq(item.id))
.add(Column::Id.lte(insert_result.last_insert_id));
if let Some(last_old_id) = last_old_id {
cond = cond.add(Column::Id.gt(last_old_id))
}
cond
// insert and filter out duplicated items
let new_items: Vec<Model> =
insert_many_with_returning_all(db, new_items, |stat: &mut InsertStatement| {
stat.on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned());
})
.all(db)
.await?;
Ok(insert_ids.into_iter().map(|i| i.id).collect::<Vec<_>>())
Ok(new_items)
}
_ => {
todo!("other subscription categories")

View File

@ -1,4 +1,4 @@
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
#[derive(

View File

@ -13,8 +13,8 @@ use serde::{Deserialize, Serialize};
pub enum SubscriptionCategory {
#[sea_orm(string_value = "mikan")]
Mikan,
#[sea_orm(string_value = "manual")]
Manual,
#[sea_orm(string_value = "tmdb")]
Tmdb,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]

View File

@ -1,6 +1,37 @@
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, ActiveValue};
pub use super::entities::episodes::*;
use crate::models::{bangumi, downloads};
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub async fn from_mikan_rss_item(dl: &downloads::Model, bgm: &bangumi::Model) -> Self {
let _ = Self {
raw_name: ActiveValue::Set(dl.origin_name.clone()),
official_title: ActiveValue::Set(bgm.official_title.clone()),
display_name: ActiveValue::Set(bgm.display_name.clone()),
name_zh: Default::default(),
name_jp: Default::default(),
name_en: Default::default(),
s_name_zh: Default::default(),
s_name_jp: Default::default(),
s_name_en: Default::default(),
bangumi_id: Default::default(),
download_id: Default::default(),
save_path: Default::default(),
resolution: Default::default(),
season: Default::default(),
season_raw: Default::default(),
fansub: Default::default(),
poster_link: Default::default(),
home_page: Default::default(),
subtitle: Default::default(),
deleted: Default::default(),
source: Default::default(),
..Default::default()
};
todo!()
}
}

View File

@ -1,4 +1,5 @@
pub mod bangumi;
pub(crate) mod db_utils;
pub mod downloaders;
pub mod downloads;
pub mod entities;

View File

@ -0,0 +1,31 @@
use std::{ops::Deref, sync::Arc};
use tokio::sync::OnceCell;
use crate::downloaders::defs::ApiClient;
pub struct MikanClient {
api_client: ApiClient,
}
static MIKAN_CLIENT: OnceCell<Arc<MikanClient>> = OnceCell::const_new();
impl MikanClient {
pub async fn new(_subscriber_id: i32) -> eyre::Result<Arc<Self>> {
let res = MIKAN_CLIENT
.get_or_try_init(|| async {
ApiClient::new(std::time::Duration::from_millis(50), None)
.map(|api_client| Arc::new(Self { api_client }))
})
.await?;
Ok(res.clone())
}
}
impl Deref for MikanClient {
type Target = ApiClient;
fn deref(&self) -> &Self::Target {
&self.api_client
}
}

View File

@ -5,9 +5,9 @@ use lightningcss::{properties::Property, values::image::Image};
use regex::Regex;
use url::Url;
use crate::{
downloaders::{html::download_html, image::download_image},
parsers::html::{get_tag_style, query_selector_first_tag},
use crate::parsers::{
html::{get_tag_style, query_selector_first_tag},
mikan::mikan_client::MikanClient,
};
pub struct MikanEpisodeMeta {
@ -22,10 +22,11 @@ lazy_static! {
}
pub async fn parse_episode_meta_from_mikan_homepage(
client: &MikanClient,
url: Url,
) -> eyre::Result<Option<MikanEpisodeMeta>> {
let url_host = url.origin().unicode_serialization();
let content = download_html(url.as_str()).await?;
let content = client.fetch_text(|f| f.get(url.as_str())).await?;
let dom = tl::parse(&content, tl::ParserOptions::default())?;
let parser = dom.parser();
let poster_node = query_selector_first_tag(&dom, r"div.bangumi-poster", parser);
@ -62,7 +63,7 @@ pub async fn parse_episode_meta_from_mikan_homepage(
p
});
let poster_data = if let Some(p) = origin_poster_src.as_ref() {
download_image(p.clone()).await.ok()
client.fetch_bytes(|f| f.get(p.clone())).await.ok()
} else {
None
};
@ -93,6 +94,7 @@ mod test {
use url::Url;
use super::parse_episode_meta_from_mikan_homepage;
use crate::parsers::mikan::mikan_client::MikanClient;
#[tokio::test]
async fn test_parse_mikan() {
@ -101,7 +103,11 @@ mod test {
"https://mikanani.me/Home/Episode/475184dce83ea2b82902592a5ac3343f6d54b36a";
let url = Url::parse(url_str)?;
if let Some(ep_meta) = parse_episode_meta_from_mikan_homepage(url.clone()).await? {
let client = MikanClient::new(0).await.expect("should get mikan client");
if let Some(ep_meta) =
parse_episode_meta_from_mikan_homepage(&client, url.clone()).await?
{
assert_eq!(ep_meta.homepage, url);
assert_eq!(ep_meta.official_title, "葬送的芙莉莲");
assert_eq!(

View File

@ -3,8 +3,8 @@ use reqwest::IntoUrl;
use serde::{Deserialize, Serialize};
use crate::{
downloaders::{bytes::download_bytes, defs::BITTORRENT_MIME_TYPE},
parsers::errors::ParseError,
downloaders::defs::BITTORRENT_MIME_TYPE,
parsers::{errors::ParseError, mikan::mikan_client::MikanClient},
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -50,9 +50,10 @@ impl TryFrom<rss::Item> for MikanRssItem {
}
pub async fn parse_mikan_rss_items_from_rss_link(
client: &MikanClient,
url: impl IntoUrl,
) -> eyre::Result<impl Iterator<Item = MikanRssItem>> {
let bytes = download_bytes(url).await?;
let bytes = client.fetch_bytes(|f| f.get(url)).await?;
let channel = rss::Channel::read_from(&bytes[..])?;
@ -62,14 +63,17 @@ pub async fn parse_mikan_rss_items_from_rss_link(
#[cfg(test)]
mod tests {
use super::parse_mikan_rss_items_from_rss_link;
use crate::downloaders::defs::BITTORRENT_MIME_TYPE;
use crate::{
downloaders::defs::BITTORRENT_MIME_TYPE, parsers::mikan::mikan_client::MikanClient,
};
#[tokio::test]
pub async fn test_mikan_subscription_items_from_rss_url() {
let url = "https://mikanani.me/RSS/Bangumi?bangumiId=3141&subgroupid=370";
let items = parse_mikan_rss_items_from_rss_link(url)
let client = MikanClient::new(0).await.expect("should get mikan client");
let items = parse_mikan_rss_items_from_rss_link(&client, url)
.await
.expect("should get subscription items from rss url")
.expect("should get subscription items from subscription url")
.collect::<Vec<_>>();
let first_sub_item = items

View File

@ -1,3 +1,4 @@
pub mod mikan_client;
pub mod mikan_ep_parser;
pub mod mikan_rss_parser;

View File

@ -3,6 +3,5 @@ pub mod errors;
pub mod html;
pub mod mikan;
pub mod raw;
pub mod rss;
pub mod tmdb;
pub mod torrent;

View File

@ -1,15 +0,0 @@
use crate::{
models::entities::subscriptions,
parsers::mikan::{parse_episode_meta_from_mikan_homepage, MikanRssItem},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RssItem {
Mikan(MikanRssItem),
}
// pub async fn parse_official_title_from_rss_item (rss: &subscriptions::Model)
// -> String { if rss.category == subscriptions::SubscriptionCategory::Mikan
// { let res = parse_episode_meta_from_mikan_homepage(rss.source_url)
// }
// }

View File

@ -86,14 +86,17 @@ pub async fn search_tmdb_items_from_title_and_lang(
let mut items = vec![];
let page_num = {
let search_url = build_tmdb_search_api_url(title, lang, 1);
let first_page: TmdbSearchMultiPageDto =
tmdb_client.fetch(|fetch| fetch.get(search_url)).await?;
let first_page: TmdbSearchMultiPageDto = tmdb_client
.fetch_json(|fetch| fetch.get(search_url))
.await?;
items.extend(first_page.results);
first_page.total_pages
};
for i in 2..=page_num {
let search_url = build_tmdb_search_api_url(title, lang, i);
let page: TmdbSearchMultiPageDto = tmdb_client.fetch(|fetch| fetch.get(search_url)).await?;
let page: TmdbSearchMultiPageDto = tmdb_client
.fetch_json(|fetch| fetch.get(search_url))
.await?;
items.extend(page.results);
}
Ok(items)
@ -107,11 +110,12 @@ pub async fn get_tmdb_info_from_id_lang_and_distribution(
) -> eyre::Result<TmdbMediaDetailDto> {
let info_url = build_tmdb_info_api_url(id, lang, distribution);
let info = if distribution == &BangumiDistribution::Movie {
let info: Box<TmdbMovieDetailDto> = tmdb_client.fetch(|fetch| fetch.get(info_url)).await?;
let info: Box<TmdbMovieDetailDto> =
tmdb_client.fetch_json(|fetch| fetch.get(info_url)).await?;
TmdbMediaDetailDto::Movie(info)
} else {
let info: Box<TmdbTvSeriesDetailDto> =
tmdb_client.fetch(|fetch| fetch.get(info_url)).await?;
tmdb_client.fetch_json(|fetch| fetch.get(info_url)).await?;
TmdbMediaDetailDto::Tv(info)
};
Ok(info)

View File

@ -1,21 +1,20 @@
use std::sync::{Arc, Weak};
use std::{
ops::Deref,
sync::{Arc, Weak},
};
use lazy_static::lazy_static;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
use serde::de::DeserializeOwned;
use tokio::sync::RwLock;
use tokio_utils::RateLimiter;
use weak_table::WeakValueHashMap;
use crate::downloaders::defs::DEFAULT_USER_AGENT;
use crate::downloaders::defs::{ApiClient, DEFAULT_USER_AGENT};
pub(crate) const TMDB_API_ORIGIN: &str = "https://api.themoviedb.org";
pub struct TmdbApiClient {
api_token: String,
rate_limiter: RateLimiter,
fetch_client: reqwest::Client,
headers: HeaderMap,
api_client: ApiClient,
}
lazy_static! {
@ -34,11 +33,9 @@ impl TmdbApiClient {
}
let client = Arc::new(TmdbApiClient {
api_token: api_token.to_string(),
rate_limiter: RateLimiter::new(std::time::Duration::from_millis(50)),
fetch_client: reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?,
headers: {
api_client: ApiClient::new(
std::time::Duration::from_millis(50),
Some({
let mut header_map = HeaderMap::new();
header_map.insert(ACCEPT, HeaderValue::from_static("application/json"));
header_map.insert(
@ -46,7 +43,8 @@ impl TmdbApiClient {
HeaderValue::from_str(&format!("Bearer {api_token}"))?,
);
header_map
},
}),
)?,
});
{
let mut map_write = TMDB_API_CLIENT_MAP.write().await;
@ -58,22 +56,13 @@ impl TmdbApiClient {
pub fn get_api_token(&self) -> &str {
&self.api_token
}
}
pub async fn fetch<R, F>(&self, f: F) -> Result<R, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
R: DeserializeOwned,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.json::<R>()
.await
})
.await
impl Deref for TmdbApiClient {
type Target = ApiClient;
fn deref(&self) -> &Self::Target {
&self.api_client
}
}

View File

@ -27,7 +27,7 @@ pub async fn parse_tmdb_list_items_from_list_api(
let page_num = {
let first_page: TmdbListPageDto = tmdb_client
.fetch(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, 1)))
.fetch_json(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, 1)))
.await?;
items.extend(first_page.results);
@ -37,7 +37,7 @@ pub async fn parse_tmdb_list_items_from_list_api(
for i in 2..=page_num {
let page: TmdbListPageDto = tmdb_client
.fetch(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, i)))
.fetch_json(|fetch| fetch.get(build_tmdb_list_api_url(list_id, lang, i)))
.await?;
items.extend(page.results);

View File

@ -52,7 +52,7 @@ fn get_fansub(group_and_title: &str) -> (Option<&str>, &str) {
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
match (n.get(0), n.get(1)) {
match (n.first(), n.get(1)) {
(None, None) => (None, ""),
(Some(n0), None) => (None, *n0),
(Some(n0), Some(n1)) => {

View File

@ -0,0 +1 @@