diff --git a/apps/recorder/src/auth/errors.rs b/apps/recorder/src/auth/errors.rs index bcf82d7..e661231 100644 --- a/apps/recorder/src/auth/errors.rs +++ b/apps/recorder/src/auth/errors.rs @@ -11,6 +11,7 @@ use openidconnect::{ }; use serde::{Deserialize, Serialize}; use snafu::prelude::*; +use util::OptDynErr; use crate::models::auth::AuthType; @@ -87,23 +88,29 @@ pub enum AuthError { (if column.is_empty() { "" } else { "." }), source.message ))] - GraphQLPermissionError { + GraphqlDynamicPermissionError { #[snafu(source(false))] source: Box, field: String, column: String, context_path: String, }, + #[snafu(display("GraphQL permission denied since {field}"))] + GraphqlStaticPermissionError { + #[snafu(source)] + source: OptDynErr, + field: String, + }, } impl AuthError { - pub fn from_graphql_subscribe_id_guard( + pub fn from_graphql_dynamic_subscribe_id_guard( source: async_graphql::Error, context: &ResolverContext, field_name: &str, column_name: &str, ) -> AuthError { - AuthError::GraphQLPermissionError { + AuthError::GraphqlDynamicPermissionError { source: Box::new(source), field: field_name.to_string(), column: column_name.to_string(), diff --git a/apps/recorder/src/extract/mikan/mod.rs b/apps/recorder/src/extract/mikan/mod.rs index 4942b77..20b158d 100644 --- a/apps/recorder/src/extract/mikan/mod.rs +++ b/apps/recorder/src/extract/mikan/mod.rs @@ -2,7 +2,6 @@ mod client; mod config; mod constants; mod credential; -mod rss; mod subscription; mod web; @@ -14,23 +13,22 @@ pub use constants::{ MIKAN_SEASON_FLOW_PAGE_PATH, MIKAN_UNKNOWN_FANSUB_ID, MIKAN_UNKNOWN_FANSUB_NAME, }; pub use credential::MikanCredentialForm; -pub use rss::{ - MikanBangumiRssChannel, MikanBangumiRssUrlMeta, MikanRssChannel, MikanRssItem, - MikanSubscriberRssChannel, MikanSubscriberSubscriptionRssUrlMeta, - build_mikan_bangumi_subscription_rss_url, build_mikan_subscriber_subscription_rss_url, -}; pub use subscription::{ MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, }; pub use web::{ MikanBangumiHash, MikanBangumiIndexHash, MikanBangumiIndexMeta, MikanBangumiMeta, - MikanBangumiPosterMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanSeasonFlowUrlMeta, - MikanSeasonStr, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url, - build_mikan_episode_homepage_url, build_mikan_season_flow_url, + MikanBangumiPosterMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanRssItem, + MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSubscriberSubscriptionRssUrlMeta, + build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url, + build_mikan_bangumi_subscription_rss_url, build_mikan_episode_homepage_url, + build_mikan_season_flow_url, build_mikan_subscriber_subscription_rss_url, extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, extract_mikan_bangumi_meta_from_expand_subscribed_fragment, extract_mikan_episode_meta_from_episode_homepage_html, scrape_mikan_bangumi_meta_from_bangumi_homepage_url, + scrape_mikan_bangumi_meta_list_from_season_flow_url, + scrape_mikan_bangumi_meta_stream_from_season_flow_url, scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_data_from_image_url, scrape_mikan_poster_meta_from_image_url, }; diff --git a/apps/recorder/src/extract/mikan/rss.rs b/apps/recorder/src/extract/mikan/rss.rs deleted file mode 100644 index 8d0e527..0000000 --- a/apps/recorder/src/extract/mikan/rss.rs +++ /dev/null @@ -1,204 +0,0 @@ -use std::borrow::Cow; - -use chrono::DateTime; -use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE; -use serde::{Deserialize, Serialize}; -use url::Url; - -use crate::{errors::app_error::RecorderError, extract::mikan::MikanEpisodeHash}; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanRssItem { - pub title: String, - pub homepage: Url, - pub url: Url, - pub content_length: Option, - pub mime: String, - pub pub_date: Option, - pub mikan_episode_id: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanBangumiRssChannel { - pub name: String, - pub url: Url, - pub mikan_bangumi_id: String, - pub mikan_fansub_id: String, - pub items: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanSubscriberRssChannel { - pub mikan_subscription_token: String, - pub url: Url, - pub items: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum MikanRssChannel { - Bangumi(MikanBangumiRssChannel), - Subscriber(MikanSubscriberRssChannel), -} - -impl MikanRssChannel { - pub fn items(&self) -> &[MikanRssItem] { - match &self { - Self::Bangumi(MikanBangumiRssChannel { items, .. }) - | Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items, - } - } - - pub fn into_items(self) -> Vec { - match self { - Self::Bangumi(MikanBangumiRssChannel { items, .. }) - | Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items, - } - } - - pub fn name(&self) -> Option<&str> { - match &self { - Self::Bangumi(MikanBangumiRssChannel { name, .. }) => Some(name.as_str()), - Self::Subscriber(MikanSubscriberRssChannel { .. }) => None, - } - } - - pub fn url(&self) -> &Url { - match &self { - Self::Bangumi(MikanBangumiRssChannel { url, .. }) - | Self::Subscriber(MikanSubscriberRssChannel { url, .. }) => url, - } - } -} - -impl TryFrom for MikanRssItem { - type Error = RecorderError; - - fn try_from(item: rss::Item) -> Result { - let enclosure = item.enclosure.ok_or_else(|| { - RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("enclosure")) - })?; - - let mime_type = enclosure.mime_type; - if mime_type != BITTORRENT_MIME_TYPE { - return Err(RecorderError::MimeError { - expected: String::from(BITTORRENT_MIME_TYPE), - found: mime_type.to_string(), - desc: String::from("MikanRssItem"), - }); - } - - let title = item.title.ok_or_else(|| { - RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("title:title")) - })?; - - let enclosure_url = Url::parse(&enclosure.url).map_err(|err| { - RecorderError::from_mikan_rss_invalid_field_and_source( - "enclosure_url:enclosure.link".into(), - err, - ) - })?; - - let homepage = item - .link - .and_then(|link| Url::parse(&link).ok()) - .ok_or_else(|| { - RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("homepage:link")) - })?; - - let MikanEpisodeHash { - mikan_episode_token: mikan_episode_id, - .. - } = MikanEpisodeHash::from_homepage_url(&homepage).ok_or_else(|| { - RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id")) - })?; - - Ok(MikanRssItem { - title, - homepage, - url: enclosure_url, - content_length: enclosure.length.parse().ok(), - mime: mime_type, - pub_date: item - .pub_date - .and_then(|s| DateTime::parse_from_rfc2822(&s).ok()) - .map(|s| s.timestamp_millis()), - mikan_episode_id, - }) - } -} - -#[derive(Debug, Clone)] -pub struct MikanBangumiRssUrlMeta { - pub mikan_bangumi_id: String, - pub mikan_fansub_id: String, -} - -impl MikanBangumiRssUrlMeta { - pub fn from_url(url: &Url) -> Option { - if url.path() == "/RSS/Bangumi" { - if let (Some(mikan_fansub_id), Some(mikan_bangumi_id)) = ( - url.query_pairs() - .find(|(k, _)| k == "subgroupid") - .map(|(_, v)| v.to_string()), - url.query_pairs() - .find(|(k, _)| k == "bangumiId") - .map(|(_, v)| v.to_string()), - ) { - Some(MikanBangumiRssUrlMeta { - mikan_bangumi_id, - mikan_fansub_id, - }) - } else { - None - } - } else { - None - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanSubscriberSubscriptionRssUrlMeta { - pub mikan_subscription_token: String, -} - -impl MikanSubscriberSubscriptionRssUrlMeta { - pub fn from_url(url: &Url) -> Option { - if url.path() == "/RSS/MyBangumi" { - url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| { - MikanSubscriberSubscriptionRssUrlMeta { - mikan_subscription_token: v.to_string(), - } - }) - } else { - None - } - } -} - -pub fn build_mikan_bangumi_subscription_rss_url( - mikan_base_url: Url, - mikan_bangumi_id: &str, - mikan_fansub_id: Option<&str>, -) -> Url { - let mut url = mikan_base_url; - url.set_path("/RSS/Bangumi"); - url.query_pairs_mut() - .append_pair("bangumiId", mikan_bangumi_id); - if let Some(mikan_fansub_id) = mikan_fansub_id { - url.query_pairs_mut() - .append_pair("subgroupid", mikan_fansub_id); - }; - url -} - -pub fn build_mikan_subscriber_subscription_rss_url( - mikan_base_url: Url, - mikan_subscription_token: &str, -) -> Url { - let mut url = mikan_base_url; - url.set_path("/RSS/MyBangumi"); - url.query_pairs_mut() - .append_pair("token", mikan_subscription_token); - url -} diff --git a/apps/recorder/src/extract/mikan/subscription.rs b/apps/recorder/src/extract/mikan/subscription.rs index 56d45fb..6cffaec 100644 --- a/apps/recorder/src/extract/mikan/subscription.rs +++ b/apps/recorder/src/extract/mikan/subscription.rs @@ -1,146 +1,206 @@ use std::{ collections::{HashMap, HashSet}, + fmt::Debug, sync::Arc, }; use async_graphql::{InputObject, SimpleObject}; -use async_stream::try_stream; -use fetch::{fetch_bytes, fetch_html}; -use futures::Stream; +use fetch::fetch_bytes; +use futures::try_join; use itertools::Itertools; use maplit::hashmap; -use scraper::Html; use sea_orm::{ - ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoSimpleExpr, QueryFilter, - QuerySelect, prelude::Expr, sea_query::OnConflict, + ActiveValue::Set, ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect, + RelationTrait, }; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::OptionExt; use url::Url; +use super::scrape_mikan_bangumi_meta_list_from_season_flow_url; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, extract::mikan::{ - MikanBangumiHash, MikanBangumiMeta, MikanBangumiRssUrlMeta, MikanEpisodeHash, - MikanEpisodeMeta, MikanRssItem, MikanSeasonFlowUrlMeta, MikanSeasonStr, - MikanSubscriberSubscriptionRssUrlMeta, build_mikan_bangumi_expand_subscribed_url, + MikanBangumiHash, MikanBangumiMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanRssItem, + MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSubscriberSubscriptionRssUrlMeta, build_mikan_bangumi_subscription_rss_url, build_mikan_season_flow_url, build_mikan_subscriber_subscription_rss_url, - extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, - extract_mikan_bangumi_meta_from_expand_subscribed_fragment, scrape_mikan_episode_meta_from_episode_homepage_url, }, - migrations::defs::Bangumi, - models::{bangumi, episodes, subscription_bangumi, subscription_episode, subscriptions}, + models::{ + bangumi, episodes, subscription_bangumi, subscription_episode, + subscriptions::{self, SubscriptionTrait}, + }, }; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] +#[tracing::instrument(err, skip(ctx, rss_item_list))] +async fn sync_mikan_feeds_from_rss_item_list( + ctx: &dyn AppContextTrait, + rss_item_list: Vec, + subscriber_id: i32, + subscription_id: i32, +) -> RecorderResult<()> { + let (new_episode_meta_list, existed_episode_hash2id_map) = { + let existed_episode_hash2id_map = episodes::Model::get_existed_mikan_episode_list( + ctx, + rss_item_list.iter().map(|s| MikanEpisodeHash { + mikan_episode_id: s.mikan_episode_id.clone(), + }), + subscriber_id, + subscription_id, + ) + .await? + .map(|(episode_id, hash, bangumi_id)| (hash.mikan_episode_id, (episode_id, bangumi_id))) + .collect::>(); + + let mut new_episode_meta_list: Vec = vec![]; + + let mikan_client = ctx.mikan(); + for to_insert_rss_item in rss_item_list.into_iter().filter(|rss_item| { + !existed_episode_hash2id_map.contains_key(&rss_item.mikan_episode_id) + }) { + let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url( + mikan_client, + to_insert_rss_item.homepage, + ) + .await?; + new_episode_meta_list.push(episode_meta); + } + + (new_episode_meta_list, existed_episode_hash2id_map) + }; + + // subscribe existed but not subscribed episode and bangumi + let (existed_episode_id_list, existed_episode_bangumi_id_set): (Vec, HashSet) = + existed_episode_hash2id_map.into_values().unzip(); + + try_join!( + subscription_episode::Model::add_episodes_for_subscription( + ctx, + existed_episode_id_list.into_iter(), + subscriber_id, + subscription_id, + ), + subscription_bangumi::Model::add_bangumis_for_subscription( + ctx, + existed_episode_bangumi_id_set.into_iter(), + subscriber_id, + subscription_id, + ), + )?; + + let new_episode_meta_list_group_by_bangumi_hash: HashMap< + MikanBangumiHash, + Vec, + > = { + let mut m = hashmap! {}; + for episode_meta in new_episode_meta_list { + let bangumi_hash = episode_meta.bangumi_hash(); + + m.entry(bangumi_hash) + .or_insert_with(Vec::new) + .push(episode_meta); + } + m + }; + + for (group_bangumi_hash, group_episode_meta_list) in new_episode_meta_list_group_by_bangumi_hash + { + let first_episode_meta = group_episode_meta_list.first().unwrap(); + let group_bangumi_model = bangumi::Model::get_or_insert_from_mikan( + ctx, + group_bangumi_hash, + subscriber_id, + subscription_id, + async || { + let bangumi_meta: MikanBangumiMeta = first_episode_meta.clone().into(); + let bangumi_am = bangumi::ActiveModel::from_mikan_bangumi_meta( + ctx, + bangumi_meta, + subscriber_id, + subscription_id, + ) + .await?; + Ok(bangumi_am) + }, + ) + .await?; + let group_episode_creation_list = group_episode_meta_list + .into_iter() + .map(|episode_meta| (&group_bangumi_model, episode_meta)); + + episodes::Model::add_mikan_episodes_for_subscription( + ctx, + group_episode_creation_list.into_iter(), + subscriber_id, + subscription_id, + ) + .await?; + } + Ok(()) +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MikanSubscriberSubscription { pub id: i32, pub mikan_subscription_token: String, pub subscriber_id: i32, } -impl MikanSubscriberSubscription { - #[tracing::instrument(skip(ctx))] - pub async fn pull_subscription( - &self, - ctx: Arc, - ) -> RecorderResult> { - let mikan_client = ctx.mikan(); - let db = ctx.db(); - - let to_insert_episode_meta_list: Vec = { - let rss_item_list = self.pull_rss_items(ctx.clone()).await?; - - let existed_episode_token_list = episodes::Model::get_existed_mikan_episode_list( - ctx.as_ref(), - rss_item_list.iter().map(|s| MikanEpisodeHash { - mikan_episode_token: s.mikan_episode_id.clone(), - }), - self.subscriber_id, - self.id, - ) - .await? - .into_iter() - .map(|(id, hash)| (hash.mikan_episode_token, id)) - .collect::>(); - - let mut to_insert_episode_meta_list = vec![]; - - for to_insert_rss_item in rss_item_list.into_iter().filter(|rss_item| { - !existed_episode_token_list.contains_key(&rss_item.mikan_episode_id) - }) { - let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url( - mikan_client, - to_insert_rss_item.homepage, - ) - .await?; - to_insert_episode_meta_list.push(episode_meta); - } - - subscription_episode::Model::add_episodes_for_subscription( - ctx.as_ref(), - existed_episode_token_list.into_values(), - self.subscriber_id, - self.id, - ) - .await?; - - to_insert_episode_meta_list - }; - - let new_episode_meta_bangumi_map = { - let bangumi_hash_map = to_insert_episode_meta_list - .iter() - .map(|episode_meta| (episode_meta.bangumi_hash(), episode_meta)) - .collect::>(); - - let existed_bangumi_set = bangumi::Model::get_existed_mikan_bangumi_list( - ctx.as_ref(), - bangumi_hash_map.keys().cloned(), - self.subscriber_id, - self.id, - ) - .await? - .map(|(_, bangumi_hash)| bangumi_hash) - .collect::>(); - - let mut to_insert_bangumi_list = vec![]; - - for (bangumi_hash, episode_meta) in bangumi_hash_map.iter() { - if !existed_bangumi_set.contains(&bangumi_hash) { - let bangumi_meta: MikanBangumiMeta = (*episode_meta).clone().into(); - - let bangumi_active_model = bangumi::ActiveModel::from_mikan_bangumi_meta( - ctx.as_ref(), - bangumi_meta, - self.subscriber_id, - self.id, - ) - .await?; - - to_insert_bangumi_list.push(bangumi_active_model); - } - } - - bangumi::Entity::insert_many(to_insert_bangumi_list) - .on_conflict_do_nothing() - .exec(db) - .await?; - - let mut new_episode_meta_bangumi_map: HashMap = - hashmap! {}; - }; - - todo!() +#[async_trait::async_trait] +impl SubscriptionTrait for MikanSubscriberSubscription { + fn get_subscriber_id(&self) -> i32 { + self.subscriber_id } - #[tracing::instrument(skip(ctx))] - pub async fn pull_rss_items( + fn get_subscription_id(&self) -> i32 { + self.id + } + + async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + + sync_mikan_feeds_from_rss_item_list( + ctx.as_ref(), + rss_item_list, + self.get_subscriber_id(), + self.get_subscription_id(), + ) + .await?; + + Ok(()) + } + + async fn sync_sources(&self, _ctx: Arc) -> RecorderResult<()> { + Ok(()) + } + + fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + let source_url = Url::parse(&model.source_url)?; + + let meta = MikanSubscriberSubscriptionRssUrlMeta::from_rss_url(&source_url) + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanSubscriberSubscription should extract mikan_subscription_token from \ + source_url = {}, subscription_id = {}", + source_url, model.id + ) + })?; + + Ok(Self { + id: model.id, + mikan_subscription_token: meta.mikan_subscription_token, + subscriber_id: model.subscriber_id, + }) + } +} + +impl MikanSubscriberSubscription { + #[tracing::instrument(err, skip(ctx))] + async fn get_rss_item_list( &self, - ctx: Arc, + ctx: &dyn AppContextTrait, ) -> RecorderResult> { let mikan_base_url = ctx.mikan().base_url().clone(); let rss_url = build_mikan_subscriber_subscription_rss_url( @@ -160,25 +220,6 @@ impl MikanSubscriberSubscription { } Ok(result) } - - pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { - let source_url = Url::parse(&model.source_url)?; - - let meta = MikanSubscriberSubscriptionRssUrlMeta::from_url(&source_url) - .with_whatever_context::<_, String, RecorderError>(|| { - format!( - "MikanSubscriberSubscription should extract mikan_subscription_token from \ - source_url = {}, subscription_id = {}", - source_url, model.id - ) - })?; - - Ok(Self { - id: model.id, - mikan_subscription_token: meta.mikan_subscription_token, - subscriber_id: model.subscriber_id, - }) - } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] @@ -190,77 +231,60 @@ pub struct MikanSeasonSubscription { pub subscriber_id: i32, } -impl MikanSeasonSubscription { - #[tracing::instrument] - pub fn pull_bangumi_meta_stream( - &self, - ctx: Arc, - ) -> impl Stream> { - let credential_id = self.credential_id; - let year = self.year; - let season_str = self.season_str.clone(); - - try_stream! { - let mikan_base_url = ctx.mikan().base_url().clone(); - - let mikan_client = ctx.mikan() - .fork_with_credential(ctx.clone(), credential_id) - .await?; - - let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url.clone(), year, season_str); - - let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?; - - let mut bangumi_indices_meta = { - let html = Html::parse_document(&content); - extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url) - }; - - if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? { - mikan_client.login().await?; - let content = fetch_html(&mikan_client, mikan_season_flow_url).await?; - let html = Html::parse_document(&content); - bangumi_indices_meta = - extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url); - } - - - mikan_client - .sync_credential_cookies(ctx.clone(), credential_id) - .await?; - - for bangumi_index in bangumi_indices_meta { - let bangumi_title = bangumi_index.bangumi_title.clone(); - let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url( - mikan_base_url.clone(), - &bangumi_index.mikan_bangumi_id, - ); - let bangumi_expand_subscribed_fragment = - fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?; - - let bangumi_meta = { - let html = Html::parse_document(&bangumi_expand_subscribed_fragment); - - extract_mikan_bangumi_meta_from_expand_subscribed_fragment( - &html, - bangumi_index, - mikan_base_url.clone(), - ) - .with_whatever_context::<_, String, RecorderError>(|| { - format!("failed to extract mikan bangumi fansub of title = {bangumi_title}") - }) - }?; - - yield bangumi_meta; - } - - mikan_client - .sync_credential_cookies(ctx, credential_id) - .await?; - } +#[async_trait::async_trait] +impl SubscriptionTrait for MikanSeasonSubscription { + fn get_subscriber_id(&self) -> i32 { + self.subscriber_id } - pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + fn get_subscription_id(&self) -> i32 { + self.id + } + + async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + + sync_mikan_feeds_from_rss_item_list( + ctx.as_ref(), + rss_item_list, + self.get_subscriber_id(), + self.get_subscription_id(), + ) + .await?; + + Ok(()) + } + + async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()> { + let bangumi_meta_list = self.get_bangumi_meta_list(ctx.clone()).await?; + + let mikan_base_url = ctx.mikan().base_url(); + + let rss_link_list = bangumi_meta_list + .into_iter() + .map(|bangumi_meta| { + build_mikan_bangumi_subscription_rss_url( + mikan_base_url.clone(), + &bangumi_meta.mikan_bangumi_id, + Some(&bangumi_meta.mikan_fansub_id), + ) + .to_string() + }) + .collect_vec(); + + subscriptions::Entity::update_many() + .set(subscriptions::ActiveModel { + source_urls: Set(Some(rss_link_list)), + ..Default::default() + }) + .filter(subscription_bangumi::Column::SubscriptionId.eq(self.id)) + .exec(ctx.db()) + .await?; + + Ok(()) + } + + fn try_from_model(model: &subscriptions::Model) -> RecorderResult { let source_url = Url::parse(&model.source_url)?; let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url) @@ -291,6 +315,68 @@ impl MikanSeasonSubscription { } } +impl MikanSeasonSubscription { + #[tracing::instrument(err, skip(ctx))] + async fn get_bangumi_meta_list( + &self, + ctx: Arc, + ) -> RecorderResult> { + let credential_id = self.credential_id; + let year = self.year; + let season_str = self.season_str; + + let mikan_base_url = ctx.mikan().base_url().clone(); + let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url, year, season_str); + + scrape_mikan_bangumi_meta_list_from_season_flow_url( + ctx, + mikan_season_flow_url, + credential_id, + ) + .await + } + + #[tracing::instrument(err, skip(ctx))] + async fn get_rss_item_list( + &self, + ctx: &dyn AppContextTrait, + ) -> RecorderResult> { + let db = ctx.db(); + + let subscribed_bangumi_list = bangumi::Entity::find() + .filter(Condition::all().add(subscription_bangumi::Column::SubscriptionId.eq(self.id))) + .join_rev( + JoinType::InnerJoin, + subscription_bangumi::Relation::Bangumi.def(), + ) + .all(db) + .await?; + + let mut rss_item_list = vec![]; + for subscribed_bangumi in subscribed_bangumi_list { + let rss_url = subscribed_bangumi + .rss_link + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanSeasonSubscription rss_link is required, subscription_id = {}", + self.id + ) + })?; + let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; + + let channel = rss::Channel::read_from(&bytes[..])?; + + for (idx, item) in channel.items.into_iter().enumerate() { + let item = MikanRssItem::try_from(item).inspect_err( + |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), + )?; + rss_item_list.push(item); + } + } + Ok(rss_item_list) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] pub struct MikanBangumiSubscription { pub id: i32, @@ -299,35 +385,38 @@ pub struct MikanBangumiSubscription { pub subscriber_id: i32, } -impl MikanBangumiSubscription { - #[tracing::instrument] - pub fn pull_rss_items( - &self, - ctx: Arc, - ) -> impl Stream> { - let mikan_bangumi_id = self.mikan_bangumi_id.clone(); - let mikan_fansub_id = self.mikan_fansub_id.clone(); - - try_stream! { - let mikan_base_url = ctx.mikan().base_url().clone(); - let rss_url = build_mikan_bangumi_subscription_rss_url(mikan_base_url.clone(), &mikan_bangumi_id, Some(&mikan_fansub_id)); - let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; - - let channel = rss::Channel::read_from(&bytes[..])?; - - for (idx, item) in channel.items.into_iter().enumerate() { - let item = MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - )?; - yield item - } - } +#[async_trait::async_trait] +impl SubscriptionTrait for MikanBangumiSubscription { + fn get_subscriber_id(&self) -> i32 { + self.subscriber_id } - pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + fn get_subscription_id(&self) -> i32 { + self.id + } + + async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + + sync_mikan_feeds_from_rss_item_list( + ctx.as_ref(), + rss_item_list, + ::get_subscriber_id(self), + ::get_subscription_id(self), + ) + .await?; + + Ok(()) + } + + async fn sync_sources(&self, _ctx: Arc) -> RecorderResult<()> { + Ok(()) + } + + fn try_from_model(model: &subscriptions::Model) -> RecorderResult { let source_url = Url::parse(&model.source_url)?; - let meta = MikanBangumiRssUrlMeta::from_url(&source_url) + let meta = MikanBangumiHash::from_rss_url(&source_url) .with_whatever_context::<_, String, RecorderError>(|| { format!( "MikanBangumiSubscription need to extract bangumi id and fansub id from \ @@ -345,96 +434,133 @@ impl MikanBangumiSubscription { } } -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; +impl MikanBangumiSubscription { + #[tracing::instrument(err, skip(ctx))] + async fn get_rss_item_list( + &self, + ctx: &dyn AppContextTrait, + ) -> RecorderResult> { + let mikan_base_url = ctx.mikan().base_url().clone(); + let rss_url = build_mikan_bangumi_subscription_rss_url( + mikan_base_url.clone(), + &self.mikan_bangumi_id, + Some(&self.mikan_fansub_id), + ); + let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; - use downloader::bittorrent::BITTORRENT_MIME_TYPE; - use rstest::rstest; - use url::Url; + let channel = rss::Channel::read_from(&bytes[..])?; - use crate::{ - errors::RecorderResult, - extract::mikan::{ - MikanBangumiIndexRssChannel, MikanBangumiRssChannel, MikanRssChannel, - extract_mikan_rss_channel_from_rss_link, - }, - test_utils::mikan::build_testing_mikan_client, - }; - - #[rstest] - #[tokio::test] - async fn test_parse_mikan_rss_channel_from_rss_link() -> RecorderResult<()> { - let mut mikan_server = mockito::Server::new_async().await; - - let mikan_base_url = Url::parse(&mikan_server.url())?; - - let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; - - { - let bangumi_rss_url = - mikan_base_url.join("/RSS/Bangumi?bangumiId=3141&subgroupid=370")?; - - let bangumi_rss_mock = mikan_server - .mock("GET", bangumi_rss_url.path()) - .with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss") - .match_query(mockito::Matcher::Any) - .create_async() - .await; - - let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) - .await - .expect("should get mikan channel from rss url"); - - assert_matches!( - &channel, - MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. }) - ); - - assert_matches!(&channel.name(), Some("葬送的芙莉莲")); - - let items = channel.items(); - let first_sub_item = items - .first() - .expect("mikan subscriptions should have at least one subs"); - - assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE); - - assert!( - &first_sub_item - .homepage - .as_str() - .starts_with("https://mikanani.me/Home/Episode") - ); - - let name = first_sub_item.title.as_str(); - assert!(name.contains("葬送的芙莉莲")); - - bangumi_rss_mock.expect(1); + let mut result = vec![]; + for (idx, item) in channel.items.into_iter().enumerate() { + let item = MikanRssItem::try_from(item).inspect_err( + |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), + )?; + result.push(item); } - { - let bangumi_rss_url = mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?; - - let bangumi_rss_mock = mikan_server - .mock("GET", bangumi_rss_url.path()) - .match_query(mockito::Matcher::Any) - .with_body_from_file("tests/resources/mikan/Bangumi-3416.rss") - .create_async() - .await; - - let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) - .await - .expect("should get mikan channel from rss url"); - - assert_matches!( - &channel, - MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { .. }) - ); - - assert_matches!(&channel.name(), Some("叹气的亡灵想隐退")); - - bangumi_rss_mock.expect(1); - } - Ok(()) + Ok(result) } } + +// #[cfg(test)] +// mod tests { +// use std::assert_matches::assert_matches; + +// use downloader::bittorrent::BITTORRENT_MIME_TYPE; +// use rstest::rstest; +// use url::Url; + +// use crate::{ +// errors::RecorderResult, +// extract::mikan::{ +// MikanBangumiIndexRssChannel, MikanBangumiRssChannel, +// MikanRssChannel, build_mikan_bangumi_subscription_rss_url, +// extract_mikan_rss_channel_from_rss_link, }, +// test_utils::mikan::build_testing_mikan_client, +// }; + +// #[rstest] +// #[tokio::test] +// async fn test_parse_mikan_rss_channel_from_rss_link() -> +// RecorderResult<()> { let mut mikan_server = +// mockito::Server::new_async().await; + +// let mikan_base_url = Url::parse(&mikan_server.url())?; + +// let mikan_client = +// build_testing_mikan_client(mikan_base_url.clone()).await?; + +// { +// let bangumi_rss_url = build_mikan_bangumi_subscription_rss_url( +// mikan_base_url.clone(), +// "3141", +// Some("370"), +// ); + +// let bangumi_rss_mock = mikan_server +// .mock("GET", bangumi_rss_url.path()) +// +// .with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss") +// .match_query(mockito::Matcher::Any) +// .create_async() +// .await; + +// let channel = +// scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) +// .await +// .expect("should get mikan channel from rss url"); + +// assert_matches!( +// &channel, +// MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. }) +// ); + +// assert_matches!(&channel.name(), Some("葬送的芙莉莲")); + +// let items = channel.items(); +// let first_sub_item = items +// .first() +// .expect("mikan subscriptions should have at least one subs"); + +// assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE); + +// assert!( +// &first_sub_item +// .homepage +// .as_str() +// .starts_with("https://mikanani.me/Home/Episode") +// ); + +// let name = first_sub_item.title.as_str(); +// assert!(name.contains("葬送的芙莉莲")); + +// bangumi_rss_mock.expect(1); +// } +// { +// let bangumi_rss_url = +// mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?; + +// let bangumi_rss_mock = mikan_server +// .mock("GET", bangumi_rss_url.path()) +// .match_query(mockito::Matcher::Any) +// +// .with_body_from_file("tests/resources/mikan/Bangumi-3416.rss") +// .create_async() +// .await; + +// let channel = +// scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) +// .await +// .expect("should get mikan channel from rss url"); + +// assert_matches!( +// &channel, +// MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { +// .. }) ); + +// assert_matches!(&channel.name(), Some("叹气的亡灵想隐退")); + +// bangumi_rss_mock.expect(1); +// } +// Ok(()) +// } +// } diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index 3ad9a61..1b56855 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -2,29 +2,132 @@ use std::{borrow::Cow, fmt, str::FromStr, sync::Arc}; use async_stream::try_stream; use bytes::Bytes; +use chrono::DateTime; +use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE; use fetch::{html::fetch_html, image::fetch_image}; use futures::{Stream, TryStreamExt, pin_mut}; use html_escape::decode_html_entities; use scraper::{Html, Selector}; use serde::{Deserialize, Serialize}; -use snafu::FromString; +use snafu::{FromString, OptionExt}; use tracing::instrument; use url::Url; -use super::{ - MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_POSTER_BUCKET_KEY, - MIKAN_SEASON_FLOW_PAGE_PATH, MikanBangumiRssUrlMeta, MikanClient, -}; use crate::{ app::AppContextTrait, errors::app_error::{RecorderError, RecorderResult}, extract::{ html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref}, media::extract_image_src_from_str, + mikan::{ + MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_POSTER_BUCKET_KEY, + MIKAN_SEASON_FLOW_PAGE_PATH, MikanClient, + }, }, storage::{StorageContentCategory, StorageServiceTrait}, }; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MikanRssItem { + pub title: String, + pub homepage: Url, + pub url: Url, + pub content_length: Option, + pub mime: String, + pub pub_date: Option, + pub mikan_episode_id: String, +} + +impl TryFrom for MikanRssItem { + type Error = RecorderError; + + fn try_from(item: rss::Item) -> Result { + let enclosure = item.enclosure.ok_or_else(|| { + RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("enclosure")) + })?; + + let mime_type = enclosure.mime_type; + if mime_type != BITTORRENT_MIME_TYPE { + return Err(RecorderError::MimeError { + expected: String::from(BITTORRENT_MIME_TYPE), + found: mime_type.to_string(), + desc: String::from("MikanRssItem"), + }); + } + + let title = item.title.ok_or_else(|| { + RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("title:title")) + })?; + + let enclosure_url = Url::parse(&enclosure.url).map_err(|err| { + RecorderError::from_mikan_rss_invalid_field_and_source( + "enclosure_url:enclosure.link".into(), + err, + ) + })?; + + let homepage = item + .link + .and_then(|link| Url::parse(&link).ok()) + .ok_or_else(|| { + RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("homepage:link")) + })?; + + let MikanEpisodeHash { + mikan_episode_id, .. + } = MikanEpisodeHash::from_homepage_url(&homepage).ok_or_else(|| { + RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id")) + })?; + + Ok(MikanRssItem { + title, + homepage, + url: enclosure_url, + content_length: enclosure.length.parse().ok(), + mime: mime_type, + pub_date: item + .pub_date + .and_then(|s| DateTime::parse_from_rfc2822(&s).ok()) + .map(|s| s.timestamp_millis()), + mikan_episode_id, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MikanSubscriberSubscriptionRssUrlMeta { + pub mikan_subscription_token: String, +} + +impl MikanSubscriberSubscriptionRssUrlMeta { + pub fn from_rss_url(url: &Url) -> Option { + if url.path() == "/RSS/MyBangumi" { + url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| { + MikanSubscriberSubscriptionRssUrlMeta { + mikan_subscription_token: v.to_string(), + } + }) + } else { + None + } + } + + pub fn build_rss_url(self, mikan_base_url: Url) -> Url { + build_mikan_subscriber_subscription_rss_url(mikan_base_url, &self.mikan_subscription_token) + } +} + +pub fn build_mikan_subscriber_subscription_rss_url( + mikan_base_url: Url, + mikan_subscription_token: &str, +) -> Url { + let mut url = mikan_base_url; + url.set_path("/RSS/MyBangumi"); + url.query_pairs_mut() + .append_pair("token", mikan_subscription_token); + url +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)] pub struct MikanBangumiIndexMeta { pub homepage: Url, @@ -147,6 +250,26 @@ impl MikanBangumiIndexHash { None } } + + pub fn build_homepage_url(self, mikan_base_url: Url) -> Url { + build_mikan_bangumi_homepage_url(mikan_base_url, &self.mikan_bangumi_id, None) + } +} + +pub fn build_mikan_bangumi_subscription_rss_url( + mikan_base_url: Url, + mikan_bangumi_id: &str, + mikan_fansub_id: Option<&str>, +) -> Url { + let mut url = mikan_base_url; + url.set_path("/RSS/Bangumi"); + url.query_pairs_mut() + .append_pair("bangumiId", mikan_bangumi_id); + if let Some(mikan_fansub_id) = mikan_fansub_id { + url.query_pairs_mut() + .append_pair("subgroupid", mikan_fansub_id); + }; + url } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -170,24 +293,70 @@ impl MikanBangumiHash { None } } + + pub fn from_rss_url(url: &Url) -> Option { + if url.path() == "/RSS/Bangumi" { + if let (Some(mikan_fansub_id), Some(mikan_bangumi_id)) = ( + url.query_pairs() + .find(|(k, _)| k == "subgroupid") + .map(|(_, v)| v.to_string()), + url.query_pairs() + .find(|(k, _)| k == "bangumiId") + .map(|(_, v)| v.to_string()), + ) { + Some(Self { + mikan_bangumi_id, + mikan_fansub_id, + }) + } else { + None + } + } else { + None + } + } + + pub fn build_rss_url(self, mikan_base_url: Url) -> Url { + build_mikan_bangumi_subscription_rss_url( + mikan_base_url, + &self.mikan_bangumi_id, + Some(&self.mikan_fansub_id), + ) + } + + pub fn build_homepage_url(self, mikan_base_url: Url) -> Url { + build_mikan_bangumi_homepage_url( + mikan_base_url, + &self.mikan_bangumi_id, + Some(&self.mikan_fansub_id), + ) + } +} + +pub fn build_mikan_episode_homepage_url(mikan_base_url: Url, mikan_episode_id: &str) -> Url { + let mut url = mikan_base_url; + url.set_path(&format!("/Home/Episode/{mikan_episode_id}")); + url } #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct MikanEpisodeHash { - pub mikan_episode_token: String, + pub mikan_episode_id: String, } impl MikanEpisodeHash { pub fn from_homepage_url(url: &Url) -> Option { if url.path().starts_with("/Home/Episode/") { let mikan_episode_id = url.path().replace("/Home/Episode/", ""); - Some(Self { - mikan_episode_token: mikan_episode_id, - }) + Some(Self { mikan_episode_id }) } else { None } } + + pub fn build_homepage_url(self, mikan_base_url: Url) -> Url { + build_mikan_episode_homepage_url(mikan_base_url, &self.mikan_episode_id) + } } #[derive(async_graphql::Enum, Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -227,8 +396,7 @@ impl FromStr for MikanSeasonStr { "秋" => Ok(MikanSeasonStr::Autumn), "冬" => Ok(MikanSeasonStr::Winter), _ => Err(RecorderError::without_source(format!( - "MikanSeasonStr must be one of '春', '夏', '秋', '冬', but got '{}'", - s + "MikanSeasonStr must be one of '春', '夏', '秋', '冬', but got '{s}'" ))), } } @@ -284,12 +452,6 @@ pub fn build_mikan_season_flow_url( url } -pub fn build_mikan_episode_homepage_url(mikan_base_url: Url, mikan_episode_id: &str) -> Url { - let mut url = mikan_base_url; - url.set_path(&format!("/Home/Episode/{mikan_episode_id}")); - url -} - pub fn build_mikan_bangumi_expand_subscribed_url( mikan_base_url: Url, mikan_bangumi_id: &str, @@ -322,7 +484,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html( RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("bangumi_title")) })?; - let MikanBangumiRssUrlMeta { + let MikanBangumiHash { mikan_bangumi_id, mikan_fansub_id, .. @@ -331,7 +493,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html( .next() .and_then(|el| el.value().attr("href")) .and_then(|s| mikan_episode_homepage_url.join(s).ok()) - .and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) + .and_then(|rss_link_url| MikanBangumiHash::from_rss_url(&rss_link_url)) .ok_or_else(|| { RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_bangumi_id")) })?; @@ -345,8 +507,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html( })?; let MikanEpisodeHash { - mikan_episode_token, - .. + mikan_episode_id, .. } = MikanEpisodeHash::from_homepage_url(&mikan_episode_homepage_url).ok_or_else(|| { RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_episode_id")) })?; @@ -436,9 +597,9 @@ pub fn extract_mikan_bangumi_index_meta_from_bangumi_homepage_html( .next() .and_then(|el| el.value().attr("href")) .and_then(|s| mikan_bangumi_homepage_url.join(s).ok()) - .and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) + .and_then(|rss_link_url| MikanBangumiHash::from_rss_url(&rss_link_url)) .map( - |MikanBangumiRssUrlMeta { + |MikanBangumiHash { mikan_bangumi_id, .. }| mikan_bangumi_id, ) @@ -734,10 +895,86 @@ pub fn extract_mikan_bangumi_meta_from_expand_subscribed_fragment( } } +pub fn scrape_mikan_bangumi_meta_stream_from_season_flow_url( + ctx: Arc, + mikan_season_flow_url: Url, + credential_id: i32, +) -> impl Stream> { + try_stream! { + let mikan_base_url = ctx.mikan().base_url().clone(); + let mikan_client = ctx.mikan().fork_with_credential(ctx.clone(), credential_id).await?; + + let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?; + + let mut bangumi_indices_meta = { + let html = Html::parse_document(&content); + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url) + }; + + if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? { + mikan_client.login().await?; + let content = fetch_html(&mikan_client, mikan_season_flow_url).await?; + let html = Html::parse_document(&content); + bangumi_indices_meta = + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url); + } + + + mikan_client + .sync_credential_cookies(ctx.clone(), credential_id) + .await?; + + for bangumi_index in bangumi_indices_meta { + let bangumi_title = bangumi_index.bangumi_title.clone(); + let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url( + mikan_base_url.clone(), + &bangumi_index.mikan_bangumi_id, + ); + let bangumi_expand_subscribed_fragment = + fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?; + + let bangumi_meta = { + let html = Html::parse_document(&bangumi_expand_subscribed_fragment); + + extract_mikan_bangumi_meta_from_expand_subscribed_fragment( + &html, + bangumi_index, + mikan_base_url.clone(), + ) + .with_whatever_context::<_, String, RecorderError>(|| { + format!("failed to extract mikan bangumi fansub of title = {bangumi_title}") + }) + }?; + + yield bangumi_meta; + } + + mikan_client + .sync_credential_cookies(ctx, credential_id) + .await?; + } +} + +pub async fn scrape_mikan_bangumi_meta_list_from_season_flow_url( + ctx: Arc, + mikan_season_flow_url: Url, + credential_id: i32, +) -> RecorderResult> { + let stream = scrape_mikan_bangumi_meta_stream_from_season_flow_url( + ctx, + mikan_season_flow_url, + credential_id, + ); + + pin_mut!(stream); + + stream.try_collect().await +} + #[cfg(test)] mod test { #![allow(unused_variables)] - use std::fs; + use std::{fs, sync::Arc}; use rstest::{fixture, rstest}; use tracing::Level; @@ -1035,7 +1272,6 @@ mod test { build_mikan_season_flow_url(mikan_base_url.clone(), 2025, MikanSeasonStr::Spring); let bangumi_meta_list = scrape_mikan_bangumi_meta_list_from_season_flow_url( - mikan_client, app_ctx.clone(), mikan_season_flow_url, credential.id, diff --git a/apps/recorder/src/graphql/infra/guard.rs b/apps/recorder/src/graphql/infra/guard.rs index 14a1ece..a9a4529 100644 --- a/apps/recorder/src/graphql/infra/guard.rs +++ b/apps/recorder/src/graphql/infra/guard.rs @@ -4,8 +4,10 @@ use async_graphql::dynamic::{ResolverContext, ValueAccessor}; use sea_orm::EntityTrait; use seaography::{BuilderContext, FnGuard, GuardAction}; -use super::util::{get_column_key, get_entity_key}; -use crate::auth::{AuthError, AuthUserInfo}; +use crate::{ + auth::{AuthError, AuthUserInfo}, + graphql::infra::util::{get_column_key, get_entity_key}, +}; fn guard_data_object_accessor_with_subscriber_id( value: ValueAccessor<'_>, @@ -108,7 +110,7 @@ where subscriber_id, ) .map_err(|inner_error| { - AuthError::from_graphql_subscribe_id_guard( + AuthError::from_graphql_dynamic_subscribe_id_guard( inner_error, context, &entity_create_one_mutation_data_field_name, @@ -136,7 +138,7 @@ where }) }) .map_err(|inner_error| { - AuthError::from_graphql_subscribe_id_guard( + AuthError::from_graphql_dynamic_subscribe_id_guard( inner_error, context, &entity_create_batch_mutation_data_field_name, @@ -157,7 +159,7 @@ where subscriber_id, ) .map_err(|inner_error| { - AuthError::from_graphql_subscribe_id_guard( + AuthError::from_graphql_dynamic_subscribe_id_guard( inner_error, context, &entity_update_mutation_data_field_name, diff --git a/apps/recorder/src/graphql/mikan/mod.rs b/apps/recorder/src/graphql/mikan/mod.rs deleted file mode 100644 index 8c05bcc..0000000 --- a/apps/recorder/src/graphql/mikan/mod.rs +++ /dev/null @@ -1,64 +0,0 @@ -mod scrape_season_subscription; - -use std::sync::Arc; - -use async_graphql::{Context, Object, Result as GraphQLResult}; -use snafu::FromString; - -use crate::{ - app::AppContextTrait, - auth::AuthUserInfo, - errors::RecorderError, - graphql::mikan::scrape_season_subscription::{ - MikanScrapeSeasonSubscriptionInput, MikanScrapeSeasonSubscriptionOutput, - }, - models::{ - subscriber_tasks, - subscriptions::{self, SubscriptionCategory}, - }, - task::{SubscriberTaskPayload, mikan::MikanScrapeSeasonSubscriptionTask}, -}; - -struct MikanQuery; - -struct MikanMutation; - -#[Object] -impl MikanMutation { - async fn mikan_scrape_season_subscription( - &self, - ctx: &Context<'_>, - input: MikanScrapeSeasonSubscriptionInput, - ) -> GraphQLResult { - let auth_user = ctx.data::()?; - let app_ctx = ctx.data::>()?; - - let subscription = - subscriptions::Model::find_by_id(app_ctx.as_ref(), input.subscription_id) - .await? - .ok_or_else(|| RecorderError::DbError { - source: sea_orm::DbErr::RecordNotFound(String::from("subscription not found")), - })?; - - if subscription.category != SubscriptionCategory::MikanSeason { - Err(RecorderError::without_source( - "subscription must be a mikan season subscription".to_string(), - ))?; - } - - let credential_id = subscription.credential_id.ok_or_else(|| { - RecorderError::without_source("subscription must have a credential".to_string()) - })?; - - let task = subscriber_tasks::Model::add_subscriber_task( - app_ctx.clone(), - auth_user.subscriber_auth.subscriber_id, - SubscriberTaskPayload::MikanScrapeSeasonSubscription(todo!()), - ) - .await?; - - Ok(MikanScrapeSeasonSubscriptionOutput { task_id: 1 }) - } -} - -struct MikanSubscription; diff --git a/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs b/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs deleted file mode 100644 index 4503ab8..0000000 --- a/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs +++ /dev/null @@ -1,12 +0,0 @@ -use async_graphql::{InputObject, SimpleObject}; -use serde::{Deserialize, Serialize}; - -#[derive(InputObject, Serialize, Deserialize)] -pub struct MikanScrapeSeasonSubscriptionInput { - pub subscription_id: i32, -} - -#[derive(SimpleObject, Serialize, Deserialize)] -pub struct MikanScrapeSeasonSubscriptionOutput { - pub task_id: i32, -} diff --git a/apps/recorder/src/graphql/mod.rs b/apps/recorder/src/graphql/mod.rs index 216dda3..74632ec 100644 --- a/apps/recorder/src/graphql/mod.rs +++ b/apps/recorder/src/graphql/mod.rs @@ -1,8 +1,8 @@ pub mod config; pub mod infra; -pub mod mikan; pub mod schema_root; pub mod service; +pub mod views; pub use config::GraphQLConfig; pub use schema_root::schema; diff --git a/apps/recorder/src/graphql/schema_root.rs b/apps/recorder/src/graphql/schema_root.rs index f57600d..c9d3314 100644 --- a/apps/recorder/src/graphql/schema_root.rs +++ b/apps/recorder/src/graphql/schema_root.rs @@ -111,10 +111,6 @@ pub fn schema( &mut context, &subscription_episode::Column::SubscriberId, ); - restrict_subscriber_for_entity::( - &mut context, - &subscriber_tasks::Column::SubscriberId, - ); for column in subscribers::Column::iter() { if !matches!(column, subscribers::Column::Id) { restrict_filter_input_for_entity::( @@ -156,7 +152,6 @@ pub fn schema( subscription_bangumi, subscription_episode, subscriptions, - subscriber_tasks, ] ); @@ -165,7 +160,6 @@ pub fn schema( builder.register_enumeration::(); builder.register_enumeration::(); builder.register_enumeration::(); - builder.register_enumeration::(); } let schema = builder.schema_builder(); diff --git a/apps/recorder/src/graphql/views/mod.rs b/apps/recorder/src/graphql/views/mod.rs new file mode 100644 index 0000000..0ed6393 --- /dev/null +++ b/apps/recorder/src/graphql/views/mod.rs @@ -0,0 +1 @@ +mod subscription; diff --git a/apps/recorder/src/graphql/views/subscription.rs b/apps/recorder/src/graphql/views/subscription.rs new file mode 100644 index 0000000..6bd6e20 --- /dev/null +++ b/apps/recorder/src/graphql/views/subscription.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +use async_graphql::{Context, InputObject, Object, Result as GraphQLResult, SimpleObject}; +use sea_orm::{DbErr, EntityTrait}; + +use crate::{ + app::AppContextTrait, + auth::AuthUserInfo, + errors::RecorderError, + models::subscriptions::{self, SubscriptionTrait}, + task::SubscriberTaskPayload, +}; + +pub struct SubscriptionMutation; + +#[derive(InputObject)] +struct SyncOneSubscriptionFilterInput { + pub subscription_id: i32, +} + +#[derive(SimpleObject)] +struct SyncOneSubscriptionTaskOutput { + pub task_id: String, +} + +#[Object] +impl SubscriptionMutation { + async fn sync_one_subscription_feeds( + &self, + ctx: &Context<'_>, + input: SyncOneSubscriptionFilterInput, + ) -> GraphQLResult { + let auth_user_info = ctx.data::()?; + + let app_ctx = ctx.data::>()?; + let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; + + let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id) + .one(app_ctx.db()) + .await? + .ok_or_else(|| RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id = {} not found", + input.subscription_id + )), + })?; + + if subscription_model.subscriber_id != subscriber_id { + Err(RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id = {} not found", + input.subscription_id + )), + })?; + } + + let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; + + let task_service = app_ctx.task(); + + let task_id = task_service + .add_subscriber_task( + auth_user_info.subscriber_auth.subscriber_id, + SubscriberTaskPayload::SyncOneSubscriptionFeeds(subscription.into()), + ) + .await?; + + Ok(SyncOneSubscriptionTaskOutput { + task_id: task_id.to_string(), + }) + } + + async fn sync_one_subscription_sources( + &self, + ctx: &Context<'_>, + input: SyncOneSubscriptionFilterInput, + ) -> GraphQLResult { + let auth_user_info = ctx.data::()?; + + let app_ctx = ctx.data::>()?; + let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; + + let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id) + .one(app_ctx.db()) + .await? + .ok_or_else(|| RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id = {} not found", + input.subscription_id + )), + })?; + + if subscription_model.subscriber_id != subscriber_id { + Err(RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id = {} not found", + input.subscription_id + )), + })?; + } + + let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; + + let task_service = app_ctx.task(); + + let task_id = task_service + .add_subscriber_task( + auth_user_info.subscriber_auth.subscriber_id, + SubscriberTaskPayload::SyncOneSubscriptionSources(subscription.into()), + ) + .await?; + + Ok(SyncOneSubscriptionTaskOutput { + task_id: task_id.to_string(), + }) + } +} diff --git a/apps/recorder/src/migrations/defs.rs b/apps/recorder/src/migrations/defs.rs index dd53b7c..61650b3 100644 --- a/apps/recorder/src/migrations/defs.rs +++ b/apps/recorder/src/migrations/defs.rs @@ -32,6 +32,7 @@ pub enum Subscriptions { SubscriberId, Category, SourceUrl, + SourceUrls, Enabled, CredentialId, } @@ -52,7 +53,6 @@ pub enum Bangumi { RssLink, PosterLink, SavePath, - Deleted, Homepage, Extra, } @@ -85,7 +85,6 @@ pub enum Episodes { EpisodeIndex, Homepage, Subtitle, - Deleted, Source, Extra, } @@ -150,18 +149,6 @@ pub enum Credential3rd { UserAgent, } -#[derive(DeriveIden)] -pub enum SubscriberTasks { - Table, - Id, - SubscriberId, - TaskType, - Request, - Result, - Error, - Yields, -} - macro_rules! create_postgres_enum_for_active_enum { ($manager: expr, $active_enum: expr, $($enum_value:expr),+) => { { diff --git a/apps/recorder/src/migrations/m20220101_000001_init.rs b/apps/recorder/src/migrations/m20220101_000001_init.rs index bf13f15..3e3c3df 100644 --- a/apps/recorder/src/migrations/m20220101_000001_init.rs +++ b/apps/recorder/src/migrations/m20220101_000001_init.rs @@ -64,6 +64,10 @@ impl MigrationTrait for Migration { .col(string(Subscriptions::DisplayName)) .col(integer(Subscriptions::SubscriberId)) .col(text(Subscriptions::SourceUrl)) + .col(array_null( + Subscriptions::SourceUrls, + ColumnType::String(StringLen::None), + )) .col(boolean(Subscriptions::Enabled)) .col(enumeration( Subscriptions::Category, @@ -105,7 +109,6 @@ impl MigrationTrait for Migration { .col(text_null(Bangumi::RssLink)) .col(text_null(Bangumi::PosterLink)) .col(text_null(Bangumi::SavePath)) - .col(boolean(Bangumi::Deleted).default(false)) .col(text_null(Bangumi::Homepage)) .col(json_binary_null(Bangumi::Extra)) .foreign_key( @@ -224,7 +227,6 @@ impl MigrationTrait for Migration { .col(integer(Episodes::EpisodeIndex)) .col(text_null(Episodes::Homepage)) .col(text_null(Episodes::Subtitle)) - .col(boolean(Episodes::Deleted).default(false)) .col(text_null(Episodes::Source)) .col(json_binary_null(Episodes::Extra)) .foreign_key( diff --git a/apps/recorder/src/migrations/m20250508_022044_subscriber_tasks.rs b/apps/recorder/src/migrations/m20250508_022044_subscriber_tasks.rs deleted file mode 100644 index 4d59b98..0000000 --- a/apps/recorder/src/migrations/m20250508_022044_subscriber_tasks.rs +++ /dev/null @@ -1,81 +0,0 @@ -use sea_orm_migration::{ - prelude::*, - schema::{array, enumeration, integer, json_binary, json_binary_null, pk_auto}, -}; - -use super::defs::{SubscriberTasks, Subscribers, table_auto_z}; -use crate::models::subscriber_tasks::{SubscriberTaskType, SubscriberTaskTypeEnum}; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .create_table( - table_auto_z(SubscriberTasks::Table) - .col(pk_auto(SubscriberTasks::Id)) - .col(integer(SubscriberTasks::SubscriberId)) - .col(enumeration( - SubscriberTasks::TaskType, - SubscriberTaskTypeEnum, - SubscriberTaskType::iden_values(), - )) - .col(json_binary(SubscriberTasks::Request)) - .col(json_binary_null(SubscriberTasks::Result)) - .col(json_binary_null(SubscriberTasks::Error)) - .col( - array(SubscriberTasks::Yields, ColumnType::JsonBinary) - .default(SimpleExpr::Custom(String::from("ARRAY[]::jsonb[]"))), - ) - .foreign_key( - ForeignKey::create() - .name("fk_subscriber_tasks_subscriber_id") - .from_tbl(SubscriberTasks::Table) - .from_col(SubscriberTasks::SubscriberId) - .to_tbl(Subscribers::Table) - .to_col(Subscribers::Id) - .on_delete(ForeignKeyAction::Cascade) - .on_update(ForeignKeyAction::Cascade), - ) - .to_owned(), - ) - .await?; - - manager - .create_index( - Index::create() - .if_not_exists() - .name("idx_subscriber_tasks_task_type") - .table(SubscriberTasks::Table) - .col(SubscriberTasks::TaskType) - .to_owned(), - ) - .await?; - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .drop_index( - Index::drop() - .if_exists() - .name("idx_subscriber_tasks_task_type") - .table(SubscriberTasks::Table) - .to_owned(), - ) - .await?; - - manager - .drop_table( - Table::drop() - .if_exists() - .table(SubscriberTasks::Table) - .to_owned(), - ) - .await?; - - Ok(()) - } -} diff --git a/apps/recorder/src/migrations/mod.rs b/apps/recorder/src/migrations/mod.rs index daca141..6d31a78 100644 --- a/apps/recorder/src/migrations/mod.rs +++ b/apps/recorder/src/migrations/mod.rs @@ -7,7 +7,6 @@ pub mod m20220101_000001_init; pub mod m20240224_082543_add_downloads; pub mod m20241231_000001_auth; pub mod m20250501_021523_credential_3rd; -pub mod m20250508_022044_subscriber_tasks; pub struct Migrator; @@ -19,7 +18,6 @@ impl MigratorTrait for Migrator { Box::new(m20240224_082543_add_downloads::Migration), Box::new(m20241231_000001_auth::Migration), Box::new(m20250501_021523_credential_3rd::Migration), - Box::new(m20250508_022044_subscriber_tasks::Migration), ] } } diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index 4be1b18..1c82e0c 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -1,18 +1,17 @@ -use std::sync::Arc; - use async_graphql::SimpleObject; use async_trait::async_trait; use sea_orm::{ - ActiveValue, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType, QuerySelect, + ActiveValue, Condition, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType, + QuerySelect, entity::prelude::*, - sea_query::{IntoCondition, OnConflict}, + sea_query::{Alias, IntoCondition, OnConflict}, }; use serde::{Deserialize, Serialize}; use super::subscription_bangumi; use crate::{ app::AppContextTrait, - errors::{RecorderError, RecorderResult}, + errors::RecorderResult, extract::{ mikan::{ MikanBangumiHash, MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url, @@ -63,8 +62,6 @@ pub struct Model { pub rss_link: Option, pub poster_link: Option, pub save_path: Option, - #[sea_orm(default = "false")] - pub deleted: bool, pub homepage: Option, pub extra: Option, } @@ -139,7 +136,7 @@ impl ActiveModel { let storage_service = ctx.storage(); let mikan_base_url = mikan_client.base_url(); - let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; + let rawname_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; let rss_url = build_mikan_bangumi_subscription_rss_url( mikan_base_url.clone(), @@ -166,12 +163,20 @@ impl ActiveModel { subscriber_id: ActiveValue::Set(subscriber_id), display_name: ActiveValue::Set(meta.bangumi_title.clone()), raw_name: ActiveValue::Set(meta.bangumi_title), - season: ActiveValue::Set(raw_meta.season), - season_raw: ActiveValue::Set(raw_meta.season_raw), + season: ActiveValue::Set(rawname_meta.season), + season_raw: ActiveValue::Set(rawname_meta.season_raw), fansub: ActiveValue::Set(Some(meta.fansub)), poster_link: ActiveValue::Set(poster_link), homepage: ActiveValue::Set(Some(meta.homepage.to_string())), rss_link: ActiveValue::Set(Some(rss_url.to_string())), + extra: ActiveValue::Set(Some(BangumiExtra { + name_zh: rawname_meta.name_zh, + name_en: rawname_meta.name_en, + name_jp: rawname_meta.name_jp, + s_name_en: rawname_meta.name_en_no_season, + s_name_jp: rawname_meta.name_jp_no_season, + s_name_zh: rawname_meta.name_zh_no_season, + })), ..Default::default() }) } @@ -183,35 +188,60 @@ impl ActiveModelBehavior for ActiveModel {} impl Model { pub async fn get_or_insert_from_mikan( ctx: &dyn AppContextTrait, + hash: MikanBangumiHash, subscriber_id: i32, subscription_id: i32, - mikan_bangumi_id: String, - mikan_fansub_id: String, - f: F, - ) -> RecorderResult + create_bangumi_fn: F, + ) -> RecorderResult where - F: AsyncFnOnce(&mut ActiveModel) -> RecorderResult<()>, + F: AsyncFnOnce() -> RecorderResult, { + #[derive(FromQueryResult)] + struct ModelWithIsSubscribed { + #[sea_orm(nested)] + bangumi: Model, + is_subscribed: bool, + } + let db = ctx.db(); - if let Some(existed) = Entity::find() + + let subscription_bangumi_alias = Alias::new("sb"); + let mut is_subscribed = false; + let new_bangumi_model = if let Some(existed) = Entity::find() .filter( - Column::MikanBangumiId - .eq(Some(mikan_bangumi_id.clone())) - .and(Column::MikanFansubId.eq(Some(mikan_fansub_id.clone()))), + Condition::all() + .add(Column::MikanBangumiId.eq(Some(hash.mikan_bangumi_id))) + .add(Column::MikanFansubId.eq(Some(hash.mikan_fansub_id))) + .add(Column::SubscriberId.eq(subscriber_id)), ) + .column_as( + Expr::col(( + subscription_bangumi_alias.clone(), + subscription_bangumi::Column::SubscriptionId, + )), + "is_subscribed", + ) + .join_as_rev( + JoinType::LeftJoin, + subscription_bangumi::Relation::Bangumi + .def() + .on_condition(move |_left, right| { + Expr::col((right, subscription_bangumi::Column::SubscriptionId)) + .eq(subscription_id) + .into_condition() + }), + subscription_bangumi_alias.clone(), + ) + .into_model::() .one(db) .await? { - Ok(existed) + is_subscribed = existed.is_subscribed; + existed.bangumi } else { - let mut bgm = ActiveModel { - mikan_bangumi_id: ActiveValue::Set(Some(mikan_bangumi_id)), - mikan_fansub_id: ActiveValue::Set(Some(mikan_fansub_id)), - subscriber_id: ActiveValue::Set(subscriber_id), - ..Default::default() - }; - f(&mut bgm).await?; - let bgm = Entity::insert(bgm) + let new_bangumi_active_model = create_bangumi_fn().await?; + + Entity::insert(new_bangumi_active_model) .on_conflict( OnConflict::columns([ Column::MikanBangumiId, @@ -220,26 +250,30 @@ impl Model { ]) .update_columns([ Column::RawName, - Column::Extra, Column::Fansub, Column::PosterLink, Column::Season, Column::SeasonRaw, + Column::RssLink, + Column::Homepage, ]) .to_owned(), ) .exec_with_returning(db) - .await?; + .await? + }; + if !is_subscribed { subscription_bangumi::Entity::insert(subscription_bangumi::ActiveModel { subscription_id: ActiveValue::Set(subscription_id), - bangumi_id: ActiveValue::Set(bgm.id), + bangumi_id: ActiveValue::Set(new_bangumi_model.id), + subscriber_id: ActiveValue::Set(subscriber_id), ..Default::default() }) .on_conflict_do_nothing() .exec(db) .await?; - Ok(bgm) } + Ok(new_bangumi_model) } pub async fn get_existed_mikan_bangumi_list( diff --git a/apps/recorder/src/models/episodes.rs b/apps/recorder/src/models/episodes.rs index ba806d5..4fe756f 100644 --- a/apps/recorder/src/models/episodes.rs +++ b/apps/recorder/src/models/episodes.rs @@ -1,17 +1,14 @@ -use std::sync::Arc; - use async_trait::async_trait; use sea_orm::{ - ActiveValue, ColumnTrait, FromJsonQueryResult, IntoSimpleExpr, JoinType, QuerySelect, - entity::prelude::*, - sea_query::{Alias, IntoCondition, OnConflict}, + ActiveValue, FromJsonQueryResult, IntoSimpleExpr, QuerySelect, entity::prelude::*, + sea_query::OnConflict, }; use serde::{Deserialize, Serialize}; -use super::{bangumi, query::InsertManyReturningExt, subscription_bangumi, subscription_episode}; +use super::{bangumi, query::InsertManyReturningExt, subscription_episode}; use crate::{ app::AppContextTrait, - errors::{RecorderError, RecorderResult}, + errors::RecorderResult, extract::{ mikan::{MikanEpisodeHash, MikanEpisodeMeta, build_mikan_episode_homepage_url}, rawname::parse_episode_meta_from_raw_name, @@ -52,8 +49,6 @@ pub struct Model { pub episode_index: i32, pub homepage: Option, pub subtitle: Option, - #[sea_orm(default = "false")] - pub deleted: bool, pub source: Option, pub extra: EpisodeExtra, } @@ -132,56 +127,47 @@ pub enum RelatedEntity { SubscriptionEpisode, } -#[derive(Clone, Debug, PartialEq)] -pub struct MikanEpsiodeCreation { - pub episode: MikanEpisodeMeta, - pub bangumi: Arc, -} - impl ActiveModel { - pub fn from_mikan_episode_meta( + #[tracing::instrument(err, skip(ctx), fields(bangumi_id = ?bangumi.id, mikan_episode_id = ?episode.mikan_episode_id))] + pub fn from_mikan_bangumi_and_episode_meta( ctx: &dyn AppContextTrait, - creation: MikanEpsiodeCreation, + bangumi: &bangumi::Model, + episode: MikanEpisodeMeta, ) -> RecorderResult { - let item = creation.episode; - let bgm = creation.bangumi; - let raw_meta = parse_episode_meta_from_raw_name(&item.episode_title) - .inspect_err(|e| { - tracing::warn!("Failed to parse episode meta: {:?}", e); - }) - .ok() - .unwrap_or_default(); - let homepage = build_mikan_episode_homepage_url( - ctx.mikan().base_url().clone(), - &item.mikan_episode_id, - ); + let mikan_base_url = ctx.mikan().base_url().clone(); + let rawname_meta = parse_episode_meta_from_raw_name(&episode.episode_title)?; + let homepage = build_mikan_episode_homepage_url(mikan_base_url, &episode.mikan_episode_id); Ok(Self { - mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)), - raw_name: ActiveValue::Set(item.episode_title.clone()), - display_name: ActiveValue::Set(item.episode_title.clone()), - bangumi_id: ActiveValue::Set(bgm.id), - subscriber_id: ActiveValue::Set(bgm.subscriber_id), - resolution: ActiveValue::Set(raw_meta.resolution), - season: ActiveValue::Set(if raw_meta.season > 0 { - raw_meta.season + mikan_episode_id: ActiveValue::Set(Some(episode.mikan_episode_id)), + raw_name: ActiveValue::Set(episode.episode_title.clone()), + display_name: ActiveValue::Set(episode.episode_title.clone()), + bangumi_id: ActiveValue::Set(bangumi.id), + subscriber_id: ActiveValue::Set(bangumi.subscriber_id), + resolution: ActiveValue::Set(rawname_meta.resolution), + season: ActiveValue::Set(if rawname_meta.season > 0 { + rawname_meta.season } else { - bgm.season + bangumi.season }), - season_raw: ActiveValue::Set(raw_meta.season_raw.or_else(|| bgm.season_raw.clone())), - fansub: ActiveValue::Set(raw_meta.fansub.or_else(|| bgm.fansub.clone())), - poster_link: ActiveValue::Set(bgm.poster_link.clone()), - episode_index: ActiveValue::Set(raw_meta.episode_index), + season_raw: ActiveValue::Set( + rawname_meta + .season_raw + .or_else(|| bangumi.season_raw.clone()), + ), + fansub: ActiveValue::Set(rawname_meta.fansub.or_else(|| bangumi.fansub.clone())), + poster_link: ActiveValue::Set(bangumi.poster_link.clone()), + episode_index: ActiveValue::Set(rawname_meta.episode_index), homepage: ActiveValue::Set(Some(homepage.to_string())), - subtitle: ActiveValue::Set(raw_meta.subtitle), - source: ActiveValue::Set(raw_meta.source), + subtitle: ActiveValue::Set(rawname_meta.subtitle), + source: ActiveValue::Set(rawname_meta.source), extra: ActiveValue::Set(EpisodeExtra { - name_zh: raw_meta.name_zh, - name_en: raw_meta.name_en, - name_jp: raw_meta.name_jp, - s_name_en: raw_meta.name_en_no_season, - s_name_jp: raw_meta.name_jp_no_season, - s_name_zh: raw_meta.name_zh_no_season, + name_zh: rawname_meta.name_zh, + name_en: rawname_meta.name_en, + name_jp: rawname_meta.name_jp, + s_name_en: rawname_meta.name_en_no_season, + s_name_jp: rawname_meta.name_jp_no_season, + s_name_zh: rawname_meta.name_zh_no_season, }), ..Default::default() }) @@ -197,13 +183,14 @@ impl Model { ids: impl Iterator, subscriber_id: i32, _subscription_id: i32, - ) -> RecorderResult> { + ) -> RecorderResult> { let db = ctx.db(); Ok(Entity::find() .select_only() .column(Column::Id) .column(Column::MikanEpisodeId) + .column(Column::BangumiId) .filter( Expr::tuple([ Column::MikanEpisodeId.into_simple_expr(), @@ -211,44 +198,39 @@ impl Model { ]) .in_tuples( ids.into_iter() - .map(|id| (id.mikan_episode_token, subscriber_id)), + .map(|id| (id.mikan_episode_id, subscriber_id)), ), ) - .into_tuple::<(i32, String)>() + .into_tuple::<(i32, String, i32)>() .all(db) .await? .into_iter() - .map(|(id, mikan_episode_id)| { + .map(|(episode_id, mikan_episode_id, bangumi_id)| { ( - id, - MikanEpisodeHash { - mikan_episode_token: mikan_episode_id, - }, + episode_id, + MikanEpisodeHash { mikan_episode_id }, + bangumi_id, ) })) } - pub async fn add_episodes( + pub async fn add_mikan_episodes_for_subscription( ctx: &dyn AppContextTrait, + creations: impl Iterator, subscriber_id: i32, subscription_id: i32, - creations: impl IntoIterator, ) -> RecorderResult<()> { let db = ctx.db(); - let new_episode_active_modes = creations - .into_iter() - .map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr)) - .inspect(|result| { - if let Err(e) = result { - tracing::warn!("Failed to create episode: {:?}", e); - } + let new_episode_active_modes: Vec = creations + .map(|(bangumi, episode_meta)| { + ActiveModel::from_mikan_bangumi_and_episode_meta(ctx, bangumi, episode_meta) }) - .flatten(); + .collect::>()?; - let inserted_episodes = Entity::insert_many(new_episode_active_modes) + let new_episode_ids = Entity::insert_many(new_episode_active_modes) .on_conflict( - OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId]) - .do_nothing() + OnConflict::columns([Column::MikanEpisodeId, Column::SubscriberId]) + .update_columns([Column::RawName, Column::PosterLink, Column::Homepage]) .to_owned(), ) .exec_with_returning_columns(db, [Column::Id]) @@ -256,25 +238,13 @@ impl Model { .into_iter() .flat_map(|r| r.try_get_many_by_index::()); - let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| { - subscription_episode::ActiveModel::from_subscription_and_episode( - subscriber_id, - subscription_id, - episode_id, - ) - }); - - subscription_episode::Entity::insert_many(insert_subscription_episode_links) - .on_conflict( - OnConflict::columns([ - subscription_episode::Column::SubscriptionId, - subscription_episode::Column::EpisodeId, - ]) - .do_nothing() - .to_owned(), - ) - .exec(db) - .await?; + subscription_episode::Model::add_episodes_for_subscription( + ctx, + new_episode_ids, + subscriber_id, + subscription_id, + ) + .await?; Ok(()) } diff --git a/apps/recorder/src/models/mod.rs b/apps/recorder/src/models/mod.rs index 3ef7b58..4fd1ce7 100644 --- a/apps/recorder/src/models/mod.rs +++ b/apps/recorder/src/models/mod.rs @@ -5,7 +5,6 @@ pub mod downloaders; pub mod downloads; pub mod episodes; pub mod query; -pub mod subscriber_tasks; pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; diff --git a/apps/recorder/src/models/query/mod.rs b/apps/recorder/src/models/query/mod.rs index e6d87a3..ce7fe5a 100644 --- a/apps/recorder/src/models/query/mod.rs +++ b/apps/recorder/src/models/query/mod.rs @@ -1,29 +1,9 @@ use async_trait::async_trait; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel, - Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, Value, - prelude::Expr, - sea_query::{Alias, IntoColumnRef, IntoTableRef, Query, SelectStatement}, + Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, sea_query::Query, }; -pub fn filter_values_in< - I: IntoIterator, - T: Into, - R: IntoTableRef, - C: IntoColumnRef + Copy, ->( - tbl_ref: R, - col_ref: C, - values: I, -) -> SelectStatement { - Query::select() - .expr(Expr::col(("t", "column1"))) - .from_values(values, "t") - .left_join(tbl_ref, Expr::col(("t", "column1")).equals(col_ref)) - .and_where(Expr::col(col_ref).is_not_null()) - .to_owned() -} - #[async_trait] pub trait InsertManyReturningExt: Sized where diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks.rs deleted file mode 100644 index abbd7be..0000000 --- a/apps/recorder/src/models/subscriber_tasks.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::sync::Arc; - -use sea_orm::{ActiveValue, FromJsonQueryResult, JsonValue, TryIntoModel, prelude::*}; -use serde::{Deserialize, Serialize}; - -pub use crate::task::{SubscriberTaskType, SubscriberTaskTypeEnum}; -use crate::{ - app::AppContextTrait, - errors::RecorderResult, - task::{SubscriberTask, SubscriberTaskPayload}, -}; - -#[derive(Debug, Clone, Serialize, Deserialize, FromJsonQueryResult, PartialEq, Eq)] -pub struct SubscriberTaskErrorSnapshot { - pub message: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize, DeriveEntityModel, PartialEq, Eq)] -#[sea_orm(table_name = "subscriber_tasks")] -pub struct Model { - #[sea_orm(default_expr = "Expr::current_timestamp()")] - pub created_at: DateTimeUtc, - #[sea_orm(default_expr = "Expr::current_timestamp()")] - pub updated_at: DateTimeUtc, - #[sea_orm(primary_key)] - pub id: i32, - pub subscriber_id: i32, - pub task_type: SubscriberTaskType, - pub request: JsonValue, - pub yields: Vec, - pub result: Option, - pub error: Option, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::subscribers::Entity", - from = "Column::SubscriberId", - to = "super::subscribers::Column::Id" - )] - Subscriber, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Subscriber.def() - } -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] -pub enum RelatedEntity { - #[sea_orm(entity = "super::subscribers::Entity")] - Subscriber, -} - -impl ActiveModelBehavior for ActiveModel {} - -impl Model { - pub async fn update_result( - ctx: Arc, - task_id: i32, - result: R, - ) -> RecorderResult<()> - where - R: Serialize, - { - let db = ctx.db(); - - let result_value = serde_json::to_value(result)?; - - Entity::update_many() - .filter(Column::Id.eq(task_id)) - .set(ActiveModel { - result: ActiveValue::Set(Some(result_value)), - ..Default::default() - }) - .exec(db) - .await?; - - Ok(()) - } - - pub async fn update_error( - ctx: Arc, - task_id: i32, - error: SubscriberTaskErrorSnapshot, - ) -> RecorderResult<()> { - let db = ctx.db(); - - let error_value = serde_json::to_value(&error)?; - - Entity::update_many() - .filter(Column::Id.eq(task_id)) - .set(ActiveModel { - error: ActiveValue::Set(Some(error_value)), - ..Default::default() - }) - .exec(db) - .await?; - - Ok(()) - } - - pub async fn append_yield( - ctx: Arc, - task_id: i32, - item: Y, - ) -> RecorderResult<()> - where - Y: Serialize, - { - let db = ctx.db(); - - let yield_value = serde_json::to_value(item)?; - - Entity::update_many() - .filter(Column::Id.eq(task_id)) - .col_expr( - Column::Yields, - Expr::cust_with_values("array_append($1)", [yield_value]), - ) - .exec(db) - .await?; - - Ok(()) - } - - pub async fn add_subscriber_task( - ctx: Arc, - subscriber_id: i32, - payload: SubscriberTaskPayload, - ) -> RecorderResult { - let task_type = payload.task_type(); - let request: JsonValue = payload.clone().try_into()?; - - let am = ActiveModel { - subscriber_id: ActiveValue::Set(subscriber_id), - task_type: ActiveValue::Set(task_type.clone()), - request: ActiveValue::Set(request.clone()), - ..Default::default() - }; - - let db = ctx.db(); - - let task_id = Entity::insert(am).exec(db).await?.last_insert_id; - - let subscriber_task = SubscriberTask { - id: task_id, - subscriber_id, - payload, - }; - - ctx.task() - .add_subscriber_task(subscriber_task.clone()) - .await?; - - Ok(subscriber_task) - } -} diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index ee5d62d..843c8a9 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -39,8 +39,6 @@ pub enum Relation { Episode, #[sea_orm(has_many = "super::auth::Entity")] Auth, - #[sea_orm(has_many = "super::subscriber_tasks::Entity")] - SubscriberTasks, } impl Related for Entity { @@ -73,12 +71,6 @@ impl Related for Entity { } } -impl Related for Entity { - fn to() -> RelationDef { - Relation::SubscriberTasks.def() - } -} - #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] pub enum RelatedEntity { #[sea_orm(entity = "super::subscriptions::Entity")] @@ -89,8 +81,6 @@ pub enum RelatedEntity { Bangumi, #[sea_orm(entity = "super::episodes::Entity")] Episode, - #[sea_orm(entity = "super::subscriber_tasks::Entity")] - SubscriberTasks, } #[derive(Debug, Deserialize, Serialize)] diff --git a/apps/recorder/src/models/subscription_episode.rs b/apps/recorder/src/models/subscription_episode.rs index 577d1aa..f32e2c1 100644 --- a/apps/recorder/src/models/subscription_episode.rs +++ b/apps/recorder/src/models/subscription_episode.rs @@ -57,21 +57,6 @@ pub enum RelatedEntity { #[async_trait] impl ActiveModelBehavior for ActiveModel {} -impl ActiveModel { - pub fn from_subscription_and_episode( - subscriber_id: i32, - subscription_id: i32, - episode_id: i32, - ) -> Self { - Self { - subscriber_id: ActiveValue::Set(subscriber_id), - subscription_id: ActiveValue::Set(subscription_id), - episode_id: ActiveValue::Set(episode_id), - ..Default::default() - } - } -} - impl Model { pub async fn add_episodes_for_subscription( ctx: &dyn AppContextTrait, diff --git a/apps/recorder/src/models/subscriptions.rs b/apps/recorder/src/models/subscriptions.rs index 419405b..a462f5a 100644 --- a/apps/recorder/src/models/subscriptions.rs +++ b/apps/recorder/src/models/subscriptions.rs @@ -1,26 +1,15 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use itertools::Itertools; -use sea_orm::{ActiveValue, entity::prelude::*}; +use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -use super::{bangumi, episodes, query::filter_values_in}; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, - extract::{ - mikan::{ - MikanBangumiPosterMeta, MikanBangumiSubscription, MikanSeasonSubscription, - MikanSubscriberSubscription, build_mikan_bangumi_homepage_url, - build_mikan_bangumi_subscription_rss_url, - scrape_mikan_bangumi_meta_from_bangumi_homepage_url, - scrape_mikan_episode_meta_from_episode_homepage_url, - scrape_mikan_poster_meta_from_image_url, - }, - rawname::extract_season_from_title_body, + extract::mikan::{ + MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, }, - models::episodes::MikanEpsiodeCreation, }; #[derive( @@ -43,45 +32,6 @@ pub enum SubscriptionCategory { Manual, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "category")] -pub enum SubscriptionPayload { - #[serde(rename = "mikan_subscriber")] - MikanSubscriber(MikanSubscriberSubscription), - #[serde(rename = "mikan_season")] - MikanSeason(MikanSeasonSubscription), - #[serde(rename = "mikan_bangumi")] - MikanBangumi(MikanBangumiSubscription), - #[serde(rename = "manual")] - Manual, -} - -impl SubscriptionPayload { - pub fn category(&self) -> SubscriptionCategory { - match self { - Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber, - Self::MikanSeason(_) => SubscriptionCategory::MikanSeason, - Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi, - Self::Manual => SubscriptionCategory::Manual, - } - } - - pub fn try_from_model(model: &Model) -> RecorderResult { - Ok(match model.category { - SubscriptionCategory::MikanSubscriber => { - Self::MikanSubscriber(MikanSubscriberSubscription::try_from_model(model)?) - } - SubscriptionCategory::MikanSeason => { - Self::MikanSeason(MikanSeasonSubscription::try_from_model(model)?) - } - SubscriptionCategory::MikanBangumi => { - Self::MikanBangumi(MikanBangumiSubscription::try_from_model(model)?) - } - SubscriptionCategory::Manual => Self::Manual, - }) - } -} - #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscriptions")] pub struct Model { @@ -95,6 +45,7 @@ pub struct Model { pub subscriber_id: i32, pub category: SubscriptionCategory, pub source_url: String, + pub source_urls: Option>, pub enabled: bool, pub credential_id: Option, } @@ -199,11 +150,6 @@ impl ActiveModelBehavior for ActiveModel {} impl ActiveModel {} impl Model { - pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RecorderResult> { - let db = ctx.db(); - Ok(Entity::find_by_id(id).one(db).await?) - } - pub async fn toggle_with_ids( ctx: &dyn AppContextTrait, ids: impl Iterator, @@ -230,127 +176,112 @@ impl Model { Ok(()) } - pub async fn pull_subscription(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { - match payload { - SubscriptionPayload::MikanSubscriber(payload) => { - let mikan_client = ctx.mikan(); - let channel = - extract_mikan_rss_channel_from_rss_link(mikan_client, &self.source_url).await?; - - let items = channel.into_items(); - - let db = ctx.db(); - let items = items.into_iter().collect_vec(); - - let mut stmt = filter_values_in( - episodes::Entity, - episodes::Column::MikanEpisodeId, - items - .iter() - .map(|s| Value::from(s.mikan_episode_id.clone())), - ); - stmt.and_where(Expr::col(episodes::Column::SubscriberId).eq(self.subscriber_id)); - - let builder = &db.get_database_backend(); - - let old_rss_item_mikan_episode_ids_set = db - .query_all(builder.build(&stmt)) - .await? - .into_iter() - .flat_map(|qs| qs.try_get_by_index(0)) - .collect::>(); - - let new_rss_items = items - .into_iter() - .filter(|item| { - !old_rss_item_mikan_episode_ids_set.contains(&item.mikan_episode_id) - }) - .collect_vec(); - - let mut new_metas = vec![]; - for new_rss_item in new_rss_items.iter() { - new_metas.push( - scrape_mikan_episode_meta_from_episode_homepage_url( - mikan_client, - new_rss_item.homepage.clone(), - ) - .await?, - ); - } - - let new_mikan_bangumi_groups = new_metas - .into_iter() - .into_group_map_by(|s| (s.mikan_bangumi_id.clone(), s.mikan_fansub_id.clone())); - - for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups - { - let mikan_base_url = ctx.mikan().base_url(); - let bgm_homepage = build_mikan_bangumi_homepage_url( - mikan_base_url.clone(), - &mikan_bangumi_id, - Some(&mikan_fansub_id), - ); - let bgm_rss_link = build_mikan_bangumi_subscription_rss_url( - mikan_base_url.clone(), - &mikan_bangumi_id, - Some(&mikan_fansub_id), - )?; - let bgm = Arc::new( - bangumi::Model::get_or_insert_from_mikan( - ctx, - self.subscriber_id, - self.id, - mikan_bangumi_id.to_string(), - mikan_fansub_id.to_string(), - async |am| -> RecorderResult<()> { - let bgm_meta = scrape_mikan_bangumi_meta_from_bangumi_homepage_url( - mikan_client, - bgm_homepage.clone(), - ) - .await?; - let bgm_name = bgm_meta.bangumi_title; - let (_, bgm_season_raw, bgm_season) = - extract_season_from_title_body(&bgm_name); - am.raw_name = ActiveValue::Set(bgm_name.clone()); - am.display_name = ActiveValue::Set(bgm_name); - am.season = ActiveValue::Set(bgm_season); - am.season_raw = ActiveValue::Set(bgm_season_raw); - am.rss_link = ActiveValue::Set(Some(bgm_rss_link.to_string())); - am.homepage = ActiveValue::Set(Some(bgm_homepage.to_string())); - am.fansub = ActiveValue::Set(Some(bgm_meta.fansub)); - if let Some(origin_poster_src) = bgm_meta.origin_poster_src - && let MikanBangumiPosterMeta { - poster_src: Some(poster_src), - .. - } = scrape_mikan_poster_meta_from_image_url( - mikan_client, - ctx.storage(), - origin_poster_src, - self.subscriber_id, - ) - .await? - { - am.poster_link = ActiveValue::Set(Some(poster_src)) - } - Ok(()) - }, - ) - .await?, - ); - episodes::Model::add_episodes( - ctx, - self.subscriber_id, - self.id, - new_ep_metas.into_iter().map(|item| MikanEpsiodeCreation { - episode: item, - bangumi: bgm.clone(), - }), - ) - .await?; - } - Ok(()) - } - _ => todo!(), + pub async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + let subscription = self.try_into()?; + match subscription { + Subscription::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await, + Subscription::MikanSeason(subscription) => subscription.sync_feeds(ctx).await, + Subscription::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await, + Subscription::Manual => Ok(()), } } } + +#[async_trait] +pub trait SubscriptionTrait: Sized + Debug { + fn get_subscriber_id(&self) -> i32; + + fn get_subscription_id(&self) -> i32; + + async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()>; + + async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()>; + + fn try_from_model(model: &Model) -> RecorderResult; +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "category")] +pub enum Subscription { + #[serde(rename = "mikan_subscriber")] + MikanSubscriber(MikanSubscriberSubscription), + #[serde(rename = "mikan_season")] + MikanSeason(MikanSeasonSubscription), + #[serde(rename = "mikan_bangumi")] + MikanBangumi(MikanBangumiSubscription), + #[serde(rename = "manual")] + Manual, +} + +impl Subscription { + pub fn category(&self) -> SubscriptionCategory { + match self { + Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber, + Self::MikanSeason(_) => SubscriptionCategory::MikanSeason, + Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi, + Self::Manual => SubscriptionCategory::Manual, + } + } +} + +#[async_trait] +impl SubscriptionTrait for Subscription { + fn get_subscriber_id(&self) -> i32 { + match self { + Self::MikanSubscriber(subscription) => subscription.get_subscriber_id(), + Self::MikanSeason(subscription) => subscription.get_subscriber_id(), + Self::MikanBangumi(subscription) => subscription.get_subscriber_id(), + Self::Manual => unreachable!(), + } + } + + fn get_subscription_id(&self) -> i32 { + match self { + Self::MikanSubscriber(subscription) => subscription.get_subscription_id(), + Self::MikanSeason(subscription) => subscription.get_subscription_id(), + Self::MikanBangumi(subscription) => subscription.get_subscription_id(), + Self::Manual => unreachable!(), + } + } + + async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await, + Self::MikanSeason(subscription) => subscription.sync_feeds(ctx).await, + Self::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await, + Self::Manual => Ok(()), + } + } + + async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::MikanSubscriber(subscription) => subscription.sync_sources(ctx).await, + Self::MikanSeason(subscription) => subscription.sync_sources(ctx).await, + Self::MikanBangumi(subscription) => subscription.sync_sources(ctx).await, + Self::Manual => Ok(()), + } + } + + fn try_from_model(model: &Model) -> RecorderResult { + match model.category { + SubscriptionCategory::MikanSubscriber => { + MikanSubscriberSubscription::try_from_model(model).map(Self::MikanSubscriber) + } + SubscriptionCategory::MikanSeason => { + MikanSeasonSubscription::try_from_model(model).map(Self::MikanSeason) + } + SubscriptionCategory::MikanBangumi => { + MikanBangumiSubscription::try_from_model(model).map(Self::MikanBangumi) + } + SubscriptionCategory::Manual => Ok(Self::Manual), + } + } +} + +impl TryFrom<&Model> for Subscription { + type Error = RecorderError; + + fn try_from(model: &Model) -> Result { + Self::try_from_model(model) + } +} diff --git a/apps/recorder/src/task/core.rs b/apps/recorder/src/task/core.rs index e8e6a1c..4a8bec0 100644 --- a/apps/recorder/src/task/core.rs +++ b/apps/recorder/src/task/core.rs @@ -1,41 +1,18 @@ use std::sync::Arc; -use futures::{Stream, TryStreamExt, pin_mut}; +use futures::Stream; use serde::{Serialize, de::DeserializeOwned}; -use crate::{ - app::AppContextTrait, - errors::RecorderResult, - models::subscriber_tasks::{self, SubscriberTaskErrorSnapshot}, -}; +use crate::{app::AppContextTrait, errors::RecorderResult}; pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task"; #[async_trait::async_trait] pub trait SubscriberAsyncTaskTrait: Serialize + DeserializeOwned + Sized { - type Result: Serialize + DeserializeOwned + Send; + async fn run_async(self, ctx: Arc) -> RecorderResult<()>; - async fn run_async( - self, - ctx: Arc, - id: i32, - ) -> RecorderResult; - - async fn run(self, ctx: Arc, id: i32) -> RecorderResult<()> { - match self.run_async(ctx.clone(), id).await { - Ok(result) => { - subscriber_tasks::Model::update_result(ctx, id, result).await?; - } - Err(e) => { - let error_snapshot = SubscriberTaskErrorSnapshot { - message: e.to_string(), - }; - - subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?; - - return Err(e); - } - } + async fn run(self, ctx: Arc) -> RecorderResult<()> { + self.run_async(ctx).await?; Ok(()) } @@ -48,35 +25,9 @@ pub trait SubscriberStreamTaskTrait: Serialize + DeserializeOwned + Sized { fn run_stream( self, ctx: Arc, - id: i32, ) -> impl Stream> + Send; - async fn run(self, ctx: Arc, id: i32) -> RecorderResult<()> { - let stream = self.run_stream(ctx.clone(), id); - - pin_mut!(stream); - - loop { - match stream.try_next().await { - Ok(Some(result)) => { - subscriber_tasks::Model::append_yield(ctx.clone(), id, result).await?; - } - Ok(None) => { - subscriber_tasks::Model::update_result(ctx, id, ()).await?; - break; - } - Err(e) => { - let error_snapshot = SubscriberTaskErrorSnapshot { - message: e.to_string(), - }; - - subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?; - - return Err(e); - } - } - } - - Ok(()) + async fn run(self, _ctx: Arc) -> RecorderResult<()> { + unimplemented!() } } diff --git a/apps/recorder/src/task/mikan/mod.rs b/apps/recorder/src/task/mikan/mod.rs deleted file mode 100644 index 4b407eb..0000000 --- a/apps/recorder/src/task/mikan/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod scrape_season_subscription; - -pub use scrape_season_subscription::MikanScrapeSeasonSubscriptionTask; diff --git a/apps/recorder/src/task/mikan/scrape_season_subscription.rs b/apps/recorder/src/task/mikan/scrape_season_subscription.rs deleted file mode 100644 index 8714244..0000000 --- a/apps/recorder/src/task/mikan/scrape_season_subscription.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::sync::Arc; - -use futures::Stream; -use sea_orm::FromJsonQueryResult; -use serde::{Deserialize, Serialize}; - -use crate::{ - app::AppContextTrait, - errors::RecorderResult, - extract::mikan::{MikanBangumiMeta, MikanSeasonStr, MikanSeasonSubscription}, - task::SubscriberStreamTaskTrait, -}; - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] -pub struct MikanScrapeSeasonSubscriptionTask { - pub year: i32, - pub season_str: MikanSeasonStr, - pub credential_id: i32, -} - -#[async_trait::async_trait] -impl SubscriberStreamTaskTrait for MikanScrapeSeasonSubscriptionTask { - type Yield = MikanBangumiMeta; - - fn run_stream( - self, - ctx: Arc, - id: i32, - ) -> impl Stream> { - let task = Arc::new(MikanSeasonSubscription { - id, - year: self.year, - season_str: self.season_str, - credential_id: self.credential_id, - }); - - task.pull_bangumi_meta_stream(ctx) - } -} diff --git a/apps/recorder/src/task/mod.rs b/apps/recorder/src/task/mod.rs index 6b891b2..82e174d 100644 --- a/apps/recorder/src/task/mod.rs +++ b/apps/recorder/src/task/mod.rs @@ -1,6 +1,5 @@ mod config; mod core; -pub mod mikan; mod registry; mod service; @@ -8,6 +7,7 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber pub use config::TaskConfig; pub use registry::{ - SubscriberTask, SubscriberTaskPayload, SubscriberTaskType, SubscriberTaskTypeEnum, + SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsTask, + SyncOneSubscriptionSourcesTask, }; pub use service::TaskService; diff --git a/apps/recorder/src/task/registry.rs b/apps/recorder/src/task/registry.rs deleted file mode 100644 index d6b19e1..0000000 --- a/apps/recorder/src/task/registry.rs +++ /dev/null @@ -1,61 +0,0 @@ -use sea_orm::{DeriveActiveEnum, DeriveDisplay, prelude::*}; -use serde::{Deserialize, Serialize}; - -use super::mikan::MikanScrapeSeasonSubscriptionTask; -use crate::errors::RecorderError; - -#[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, -)] -#[sea_orm( - rs_type = "String", - db_type = "String(StringLen::None)", - enum_name = "subscriber_task_type" -)] -#[serde(rename_all = "snake_case")] -pub enum SubscriberTaskType { - #[sea_orm(string_value = "mikan_scrape_season_subscription")] - MikanScrapeSeasonSubscription, -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "task_type")] -pub enum SubscriberTaskPayload { - #[serde(rename = "mikan_scrape_season_subscription")] - MikanScrapeSeasonSubscription(MikanScrapeSeasonSubscriptionTask), -} - -impl SubscriberTaskPayload { - pub fn task_type(&self) -> SubscriberTaskType { - match self { - Self::MikanScrapeSeasonSubscription(_) => { - SubscriberTaskType::MikanScrapeSeasonSubscription - } - } - } -} - -impl TryFrom for serde_json::Value { - type Error = RecorderError; - - fn try_from(value: SubscriberTaskPayload) -> Result { - let json_value = serde_json::to_value(&value)?; - Ok(match json_value { - serde_json::Value::Object(mut map) => { - map.remove("task_type"); - serde_json::Value::Object(map) - } - _ => { - unreachable!("payload must be an json object"); - } - }) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct SubscriberTask { - pub id: i32, - pub subscriber_id: i32, - #[serde(flatten)] - pub payload: SubscriberTaskPayload, -} diff --git a/apps/recorder/src/task/registry/mod.rs b/apps/recorder/src/task/registry/mod.rs new file mode 100644 index 0000000..7b1aa72 --- /dev/null +++ b/apps/recorder/src/task/registry/mod.rs @@ -0,0 +1,53 @@ +mod subscription; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +pub use subscription::{SyncOneSubscriptionFeedsTask, SyncOneSubscriptionSourcesTask}; + +use super::SubscriberAsyncTaskTrait; +use crate::{ + app::AppContextTrait, + errors::{RecorderError, RecorderResult}, +}; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "task_type")] +pub enum SubscriberTaskPayload { + #[serde(rename = "sync_one_subscription_feeds")] + SyncOneSubscriptionFeeds(SyncOneSubscriptionFeedsTask), + #[serde(rename = "sync_one_subscription_sources")] + SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), +} + +impl SubscriberTaskPayload { + pub async fn run(self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::SyncOneSubscriptionFeeds(task) => task.run(ctx).await, + Self::SyncOneSubscriptionSources(task) => task.run(ctx).await, + } + } +} + +impl TryFrom<&SubscriberTaskPayload> for serde_json::Value { + type Error = RecorderError; + + fn try_from(value: &SubscriberTaskPayload) -> Result { + let json_value = serde_json::to_value(value)?; + Ok(match json_value { + serde_json::Value::Object(mut map) => { + map.remove("task_type"); + serde_json::Value::Object(map) + } + _ => { + unreachable!("subscriber task payload must be an json object"); + } + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SubscriberTask { + pub subscriber_id: i32, + #[serde(flatten)] + pub payload: SubscriberTaskPayload, +} diff --git a/apps/recorder/src/task/registry/subscription.rs b/apps/recorder/src/task/registry/subscription.rs new file mode 100644 index 0000000..7e77f17 --- /dev/null +++ b/apps/recorder/src/task/registry/subscription.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use sea_orm::prelude::*; +use serde::{Deserialize, Serialize}; + +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + models::subscriptions::{self, SubscriptionTrait}, + task::SubscriberAsyncTaskTrait, +}; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SyncOneSubscriptionFeedsTask(pub subscriptions::Subscription); + +impl From for SyncOneSubscriptionFeedsTask { + fn from(subscription: subscriptions::Subscription) -> Self { + Self(subscription) + } +} + +#[async_trait::async_trait] +impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsTask { + async fn run_async(self, ctx: Arc) -> RecorderResult<()> { + self.0.sync_feeds(ctx).await?; + Ok(()) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SyncOneSubscriptionSourcesTask(pub subscriptions::Subscription); + +#[async_trait::async_trait] +impl SubscriberAsyncTaskTrait for SyncOneSubscriptionSourcesTask { + async fn run_async(self, ctx: Arc) -> RecorderResult<()> { + self.0.sync_sources(ctx).await?; + Ok(()) + } +} + +impl From for SyncOneSubscriptionSourcesTask { + fn from(subscription: subscriptions::Subscription) -> Self { + Self(subscription) + } +} diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index d834dbe..7433adf 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -7,10 +7,7 @@ use tokio::sync::RwLock; use crate::{ app::AppContextTrait, errors::RecorderResult, - task::{ - SUBSCRIBER_TASK_APALIS_NAME, SubscriberStreamTaskTrait, SubscriberTask, - SubscriberTaskPayload, TaskConfig, - }, + task::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberTask, SubscriberTaskPayload, TaskConfig}, }; pub struct TaskService { @@ -40,20 +37,25 @@ impl TaskService { ) -> RecorderResult<()> { let ctx = data.deref().clone(); - match job.payload { - SubscriberTaskPayload::MikanScrapeSeasonSubscription(task) => { - task.run(ctx, job.id).await - } - } + job.payload.run(ctx).await } - pub async fn add_subscriber_task(&self, job: SubscriberTask) -> RecorderResult<()> { - { - let mut storage = self.subscriber_task_storage.write().await; - storage.push(job).await?; - } + pub async fn add_subscriber_task( + &self, + subscriber_id: i32, + task_payload: SubscriberTaskPayload, + ) -> RecorderResult { + let subscriber_task = SubscriberTask { + subscriber_id, + payload: task_payload, + }; - Ok(()) + let task_id = { + let mut storage = self.subscriber_task_storage.write().await; + storage.push(subscriber_task).await?.task_id + }; + + Ok(task_id) } pub async fn setup(&self) -> RecorderResult<()> { diff --git a/apps/webui/src/presentation/routes/_app/subscriptions/-defs.ts b/apps/webui/src/presentation/routes/_app/subscriptions/-defs.ts index 028981e..ae01e9d 100644 --- a/apps/webui/src/presentation/routes/_app/subscriptions/-defs.ts +++ b/apps/webui/src/presentation/routes/_app/subscriptions/-defs.ts @@ -92,7 +92,6 @@ query GetSubscriptionDetail ($id: Int!) { rssLink posterLink savePath - deleted homepage } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index f644ff5..52a85d0 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "nightly-2025-05-14" +channel = "nightly-2025-05-20" components = ["rustfmt", "clippy"] profile = "default"