diff --git a/apps/recorder/src/extract/mikan/mod.rs b/apps/recorder/src/extract/mikan/mod.rs index ce99fba..37702ae 100644 --- a/apps/recorder/src/extract/mikan/mod.rs +++ b/apps/recorder/src/extract/mikan/mod.rs @@ -3,6 +3,7 @@ mod config; mod constants; mod credential; mod rss; +mod subscription; mod web; pub use client::MikanClient; @@ -14,11 +15,12 @@ pub use constants::{ }; pub use credential::MikanCredentialForm; pub use rss::{ - MikanBangumiIndexRssChannel, MikanBangumiRssChannel, MikanBangumiRssUrlMeta, MikanRssChannel, - MikanRssItem, MikanSubscriberAggregationRssUrlMeta, MikanSubscriberStreamRssChannel, - build_mikan_bangumi_rss_url, build_mikan_subscriber_aggregation_rss_url, - extract_mikan_bangumi_id_from_rss_url, extract_mikan_rss_channel_from_rss_link, - extract_mikan_subscriber_aggregation_id_from_rss_link, + MikanBangumiRssChannel, MikanBangumiRssUrlMeta, MikanRssChannel, MikanRssItem, + MikanSubscriberRssChannel, MikanSubscriberSubscriptionRssUrlMeta, + build_mikan_bangumi_subscription_rss_url, build_mikan_subscriber_subscription_rss_url, +}; +pub use subscription::{ + MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, }; pub use web::{ MikanBangumiHomepageUrlMeta, MikanBangumiIndexHomepageUrlMeta, MikanBangumiIndexMeta, @@ -26,10 +28,9 @@ pub use web::{ MikanSeasonFlowUrlMeta, MikanSeasonStr, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url, build_mikan_episode_homepage_url, build_mikan_season_flow_url, extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, + extract_mikan_bangumi_meta_from_expand_subscribed_fragment, extract_mikan_episode_meta_from_episode_homepage_html, scrape_mikan_bangumi_meta_from_bangumi_homepage_url, - scrape_mikan_bangumi_meta_list_from_season_flow_url, - scrape_mikan_bangumi_meta_stream_from_season_flow_url, scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_data_from_image_url, scrape_mikan_poster_meta_from_image_url, }; diff --git a/apps/recorder/src/extract/mikan/rss.rs b/apps/recorder/src/extract/mikan/rss.rs index 88436b3..5d268a0 100644 --- a/apps/recorder/src/extract/mikan/rss.rs +++ b/apps/recorder/src/extract/mikan/rss.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; +use bytes::Bytes; use chrono::DateTime; use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE; use fetch::{FetchError, IntoUrl, bytes::fetch_bytes}; @@ -34,16 +35,8 @@ pub struct MikanBangumiRssChannel { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanBangumiIndexRssChannel { - pub name: String, - pub url: Url, - pub mikan_bangumi_id: String, - pub items: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct MikanSubscriberStreamRssChannel { - pub mikan_aggregation_id: String, +pub struct MikanSubscriberRssChannel { + pub mikan_subscription_token: String, pub url: Url, pub items: Vec, } @@ -51,40 +44,35 @@ pub struct MikanSubscriberStreamRssChannel { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum MikanRssChannel { Bangumi(MikanBangumiRssChannel), - BangumiIndex(MikanBangumiIndexRssChannel), - SubscriberStream(MikanSubscriberStreamRssChannel), + Subscriber(MikanSubscriberRssChannel), } impl MikanRssChannel { pub fn items(&self) -> &[MikanRssItem] { match &self { Self::Bangumi(MikanBangumiRssChannel { items, .. }) - | Self::BangumiIndex(MikanBangumiIndexRssChannel { items, .. }) - | Self::SubscriberStream(MikanSubscriberStreamRssChannel { items, .. }) => items, + | Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items, } } pub fn into_items(self) -> Vec { match self { Self::Bangumi(MikanBangumiRssChannel { items, .. }) - | Self::BangumiIndex(MikanBangumiIndexRssChannel { items, .. }) - | Self::SubscriberStream(MikanSubscriberStreamRssChannel { items, .. }) => items, + | Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items, } } pub fn name(&self) -> Option<&str> { match &self { - Self::Bangumi(MikanBangumiRssChannel { name, .. }) - | Self::BangumiIndex(MikanBangumiIndexRssChannel { name, .. }) => Some(name.as_str()), - Self::SubscriberStream(MikanSubscriberStreamRssChannel { .. }) => None, + Self::Bangumi(MikanBangumiRssChannel { name, .. }) => Some(name.as_str()), + Self::Subscriber(MikanSubscriberRssChannel { .. }) => None, } } pub fn url(&self) -> &Url { match &self { Self::Bangumi(MikanBangumiRssChannel { url, .. }) - | Self::BangumiIndex(MikanBangumiIndexRssChannel { url, .. }) - | Self::SubscriberStream(MikanSubscriberStreamRssChannel { url, .. }) => url, + | Self::Subscriber(MikanSubscriberRssChannel { url, .. }) => url, } } } @@ -148,20 +136,58 @@ impl TryFrom for MikanRssItem { #[derive(Debug, Clone)] pub struct MikanBangumiRssUrlMeta { pub mikan_bangumi_id: String, - pub mikan_fansub_id: Option, + pub mikan_fansub_id: String, } -#[derive(Debug, Clone)] -pub struct MikanSubscriberAggregationRssUrlMeta { - pub mikan_aggregation_id: String, +impl MikanBangumiRssUrlMeta { + pub fn from_url(url: &Url) -> Option { + if url.path() == "/RSS/Bangumi" { + if let (Some(mikan_fansub_id), Some(mikan_bangumi_id)) = ( + url.query_pairs() + .find(|(k, _)| k == "subgroupid") + .map(|(_, v)| v.to_string()), + url.query_pairs() + .find(|(k, _)| k == "bangumiId") + .map(|(_, v)| v.to_string()), + ) { + Some(MikanBangumiRssUrlMeta { + mikan_bangumi_id, + mikan_fansub_id, + }) + } else { + None + } + } else { + None + } + } } -pub fn build_mikan_bangumi_rss_url( - mikan_base_url: impl IntoUrl, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct MikanSubscriberSubscriptionRssUrlMeta { + pub mikan_subscription_token: String, +} + +impl MikanSubscriberSubscriptionRssUrlMeta { + pub fn from_url(url: &Url) -> Option { + if url.path() == "/RSS/MyBangumi" { + url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| { + MikanSubscriberSubscriptionRssUrlMeta { + mikan_subscription_token: v.to_string(), + } + }) + } else { + None + } + } +} + +pub fn build_mikan_bangumi_subscription_rss_url( + mikan_base_url: Url, mikan_bangumi_id: &str, mikan_fansub_id: Option<&str>, -) -> RecorderResult { - let mut url = mikan_base_url.into_url().map_err(FetchError::from)?; +) -> Url { + let mut url = mikan_base_url; url.set_path("/RSS/Bangumi"); url.query_pairs_mut() .append_pair("bangumiId", mikan_bangumi_id); @@ -169,246 +195,16 @@ pub fn build_mikan_bangumi_rss_url( url.query_pairs_mut() .append_pair("subgroupid", mikan_fansub_id); }; - Ok(url) + url } -pub fn build_mikan_subscriber_aggregation_rss_url( - mikan_base_url: &str, - mikan_aggregation_id: &str, -) -> RecorderResult { - let mut url = Url::parse(mikan_base_url)?; +pub fn build_mikan_subscriber_subscription_rss_url( + mikan_base_url: Url, + mikan_subscription_token: &str, +) -> Url { + let mut url = mikan_base_url; url.set_path("/RSS/MyBangumi"); url.query_pairs_mut() - .append_pair("token", mikan_aggregation_id); - Ok(url) -} - -pub fn extract_mikan_bangumi_id_from_rss_url(url: &Url) -> Option { - if url.path() == "/RSS/Bangumi" { - url.query_pairs() - .find(|(k, _)| k == "bangumiId") - .map(|(_, v)| MikanBangumiRssUrlMeta { - mikan_bangumi_id: v.to_string(), - mikan_fansub_id: url - .query_pairs() - .find(|(k, _)| k == "subgroupid") - .map(|(_, v)| v.to_string()), - }) - } else { - None - } -} - -pub fn extract_mikan_subscriber_aggregation_id_from_rss_link( - url: &Url, -) -> Option { - if url.path() == "/RSS/MyBangumi" { - url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| { - MikanSubscriberAggregationRssUrlMeta { - mikan_aggregation_id: v.to_string(), - } - }) - } else { - None - } -} - -#[instrument(skip_all, fields(channel_rss_link = channel_rss_link.as_str()))] -pub async fn extract_mikan_rss_channel_from_rss_link( - http_client: &MikanClient, - channel_rss_link: impl IntoUrl, -) -> RecorderResult { - let bytes = fetch_bytes(http_client, channel_rss_link.as_str()).await?; - - let channel = rss::Channel::read_from(&bytes[..])?; - - let channel_link = Url::parse(channel.link())?; - - if let Some(MikanBangumiRssUrlMeta { - mikan_bangumi_id, - mikan_fansub_id, - }) = extract_mikan_bangumi_id_from_rss_url(&channel_link) - { - tracing::trace!( - mikan_bangumi_id, - mikan_fansub_id, - "MikanBangumiRssLink extracting..." - ); - - let channel_name = channel.title().replace("Mikan Project - ", ""); - - let items = channel - .items - .into_iter() - .enumerate() - .flat_map(|(idx, item)| { - MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - ) - }) - .collect_vec(); - - if let Some(mikan_fansub_id) = mikan_fansub_id { - tracing::trace!( - channel_name, - channel_link = channel_link.as_str(), - mikan_bangumi_id, - mikan_fansub_id, - "MikanBangumiRssChannel extracted" - ); - - Ok(MikanRssChannel::Bangumi(MikanBangumiRssChannel { - name: channel_name, - mikan_bangumi_id, - mikan_fansub_id, - url: channel_link, - items, - })) - } else { - tracing::trace!( - channel_name, - channel_link = channel_link.as_str(), - mikan_bangumi_id, - "MikanBangumiIndexRssChannel extracted" - ); - - Ok(MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { - name: channel_name, - mikan_bangumi_id, - url: channel_link, - items, - })) - } - } else if let Some(MikanSubscriberAggregationRssUrlMeta { - mikan_aggregation_id, - .. - }) = extract_mikan_subscriber_aggregation_id_from_rss_link(&channel_link) - { - tracing::trace!( - mikan_aggregation_id, - "MikanSubscriberAggregationRssLink extracting..." - ); - - let items = channel - .items - .into_iter() - .enumerate() - .flat_map(|(idx, item)| { - MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - ) - }) - .collect_vec(); - - tracing::trace!( - channel_link = channel_link.as_str(), - mikan_aggregation_id, - "MikanSubscriberAggregationRssChannel extracted" - ); - - Ok(MikanRssChannel::SubscriberStream( - MikanSubscriberStreamRssChannel { - mikan_aggregation_id, - items, - url: channel_link, - }, - )) - } else { - Err(RecorderError::MikanRssInvalidFormatError).inspect_err(|error| { - tracing::warn!(error = %error); - }) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use downloader::bittorrent::BITTORRENT_MIME_TYPE; - use rstest::rstest; - use url::Url; - - use crate::{ - errors::RecorderResult, - extract::mikan::{ - MikanBangumiIndexRssChannel, MikanBangumiRssChannel, MikanRssChannel, - extract_mikan_rss_channel_from_rss_link, - }, - test_utils::mikan::build_testing_mikan_client, - }; - - #[rstest] - #[tokio::test] - async fn test_parse_mikan_rss_channel_from_rss_link() -> RecorderResult<()> { - let mut mikan_server = mockito::Server::new_async().await; - - let mikan_base_url = Url::parse(&mikan_server.url())?; - - let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; - - { - let bangumi_rss_url = - mikan_base_url.join("/RSS/Bangumi?bangumiId=3141&subgroupid=370")?; - let bangumi_rss_mock = mikan_server - .mock("GET", bangumi_rss_url.path()) - .with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss") - .match_query(mockito::Matcher::Any) - .create_async() - .await; - - let channel = extract_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) - .await - .expect("should get mikan channel from rss url"); - - assert_matches!( - &channel, - MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. }) - ); - - assert_matches!(&channel.name(), Some("葬送的芙莉莲")); - - let items = channel.items(); - let first_sub_item = items - .first() - .expect("mikan subscriptions should have at least one subs"); - - assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE); - - assert!( - &first_sub_item - .homepage - .as_str() - .starts_with("https://mikanani.me/Home/Episode") - ); - - let name = first_sub_item.title.as_str(); - assert!(name.contains("葬送的芙莉莲")); - - bangumi_rss_mock.expect(1); - } - { - let bangumi_rss_url = mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?; - - let bangumi_rss_mock = mikan_server - .mock("GET", bangumi_rss_url.path()) - .match_query(mockito::Matcher::Any) - .with_body_from_file("tests/resources/mikan/Bangumi-3416.rss") - .create_async() - .await; - - let channel = extract_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) - .await - .expect("should get mikan channel from rss url"); - - assert_matches!( - &channel, - MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { .. }) - ); - - assert_matches!(&channel.name(), Some("叹气的亡灵想隐退")); - - bangumi_rss_mock.expect(1); - } - Ok(()) - } + .append_pair("token", mikan_subscription_token); + url } diff --git a/apps/recorder/src/extract/mikan/subscription.rs b/apps/recorder/src/extract/mikan/subscription.rs new file mode 100644 index 0000000..d3ed74e --- /dev/null +++ b/apps/recorder/src/extract/mikan/subscription.rs @@ -0,0 +1,483 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use async_graphql::{InputObject, SimpleObject}; +use async_stream::try_stream; +use fetch::{fetch_bytes, fetch_html}; +use futures::Stream; +use itertools::Itertools; +use maplit::hashmap; +use scraper::Html; +use sea_orm::{ColumnTrait, EntityTrait, IntoSimpleExpr, QueryFilter, QuerySelect, prelude::Expr}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use url::Url; + +use crate::{ + app::AppContextTrait, + errors::{RecorderError, RecorderResult}, + extract::mikan::{ + MikanBangumiHomepageUrlMeta, MikanBangumiMeta, MikanBangumiRssUrlMeta, MikanEpisodeMeta, + MikanRssItem, MikanSeasonFlowUrlMeta, MikanSeasonStr, + MikanSubscriberSubscriptionRssUrlMeta, build_mikan_bangumi_expand_subscribed_url, + build_mikan_bangumi_subscription_rss_url, build_mikan_season_flow_url, + build_mikan_subscriber_subscription_rss_url, + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, + extract_mikan_bangumi_meta_from_expand_subscribed_fragment, + scrape_mikan_episode_meta_from_episode_homepage_url, + }, + migrations::defs::Bangumi, + models::{bangumi, episodes, subscriptions}, +}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] +pub struct MikanSubscriberSubscription { + pub id: i32, + pub mikan_subscription_token: String, + pub subscriber_id: i32, +} + +impl MikanSubscriberSubscription { + #[tracing::instrument(skip(ctx))] + pub async fn pull_subscription( + &self, + ctx: Arc, + ) -> RecorderResult> { + let mikan_client = ctx.mikan(); + + let new_episode_meta_list: Vec = { + let rss_item_list = self.pull_rss_items(ctx.clone()).await?; + + let existed_item_set = episodes::Entity::find() + .select_only() + .column(episodes::Column::MikanEpisodeId) + .filter( + episodes::Column::SubscriberId.eq(self.subscriber_id).add( + episodes::Column::MikanEpisodeId + .is_in(rss_item_list.iter().map(|s| s.mikan_episode_id.clone())), + ), + ) + .into_tuple::<(String,)>() + .all(ctx.db()) + .await? + .into_iter() + .map(|(value,)| value) + .collect::>(); + + let mut result = vec![]; + for rss_item in rss_item_list + .into_iter() + .filter(|rss_item| !existed_item_set.contains(&rss_item.mikan_episode_id)) + { + let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url( + mikan_client, + rss_item.homepage, + ) + .await?; + } + + result + }; + + { + let mut new_bangumi_hash_map = new_episode_meta_list + .iter() + .map(|episode_meta| { + ( + MikanBangumiHomepageUrlMeta { + mikan_bangumi_id: episode_meta.mikan_bangumi_id.clone(), + mikan_fansub_id: episode_meta.mikan_fansub_id.clone(), + }, + episode_meta, + ) + }) + .collect::>(); + + let mut new_bangumi_meta_map: HashMap = + hashmap! {}; + + for bangumi_model in bangumi::Entity::find() + .filter({ + Expr::tuple([ + bangumi::Column::MikanBangumiId.into_simple_expr(), + bangumi::Column::MikanFansubId.into_simple_expr(), + bangumi::Column::SubscriberId.into_simple_expr(), + ]) + .in_tuples(new_bangumi_hash_map.iter().map( + |(bangumi_meta, _)| { + ( + bangumi_meta.mikan_bangumi_id.clone(), + bangumi_meta.mikan_fansub_id.clone(), + self.subscriber_id, + ) + }, + )) + }) + .all(ctx.db()) + .await? + { + let bangumi_hash = MikanBangumiHomepageUrlMeta { + mikan_bangumi_id: bangumi_model.mikan_bangumi_id.unwrap(), + mikan_fansub_id: bangumi_model.mikan_fansub_id.unwrap(), + }; + new_bangumi_hash_map.remove(&bangumi_hash); + new_bangumi_meta_map.insert(bangumi_hash, bangumi_model); + } + + for (bangumi_hash, episode_meta) in new_bangumi_hash_map { + let bangumi_meta: MikanBangumiMeta = episode_meta.clone().into(); + + let bangumi_active_model = bangumi::ActiveModel::from_mikan_bangumi_meta( + ctx.clone(), + bangumi_meta, + self.subscriber_id, + ) + .with_whatever_context::<_, String, RecorderError>(|_| { + format!( + "failed to create bangumi active model from mikan bangumi meta, \ + bangumi_meta = {:?}", + bangumi_meta + ) + })?; + + new_bangumi_meta_map.insert(bangumi_hash, bangumi_active_model); + } + } + + let mut new_bangumi_meta_map: HashMap = { + let mut map = hashmap! {}; + + for bangumi_model in existed_bangumi_list { + let hash = MikanBangumiHomepageUrlMeta { + mikan_bangumi_id: bangumi_model.mikan_bangumi_id.unwrap(), + mikan_fansub_id: bangumi_model.mikan_fansub_id.unwrap(), + }; + new_bangumi_hash_map.remove(&hash); + map.insert(hash, bangumi_model); + } + + map + }; + + for bangumi_hash in new_bangumi_hash_map { + bangumi::Entity::insert(bangumi::ActiveModel { + raw_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), + display_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), + ..Default::default() + }); + } + + bangumi::Entity::insert_many(new_bangumi_hash_map.values().map(|bangumi_meta| { + bangumi::ActiveModel { + raw_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), + display_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), + ..Default::default() + } + })); + + todo!() + } + + #[tracing::instrument(skip(ctx))] + pub async fn pull_rss_items( + &self, + ctx: Arc, + ) -> RecorderResult> { + let mikan_base_url = ctx.mikan().base_url().clone(); + let rss_url = build_mikan_subscriber_subscription_rss_url( + mikan_base_url.clone(), + &self.mikan_subscription_token, + ); + let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; + + let channel = rss::Channel::read_from(&bytes[..])?; + + let mut result = vec![]; + for (idx, item) in channel.items.into_iter().enumerate() { + let item = MikanRssItem::try_from(item).inspect_err( + |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), + )?; + result.push(item); + } + Ok(result) + } + + pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + let source_url = Url::parse(&model.source_url)?; + + let meta = MikanSubscriberSubscriptionRssUrlMeta::from_url(&source_url) + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanSubscriberSubscription should extract mikan_subscription_token from \ + source_url = {}, subscription_id = {}", + source_url, model.id + ) + })?; + + Ok(Self { + id: model.id, + mikan_subscription_token: meta.mikan_subscription_token, + subscriber_id: model.subscriber_id, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] +pub struct MikanSeasonSubscription { + pub id: i32, + pub year: i32, + pub season_str: MikanSeasonStr, + pub credential_id: i32, + pub subscriber_id: i32, +} + +impl MikanSeasonSubscription { + #[tracing::instrument] + pub fn pull_bangumi_meta_stream( + &self, + ctx: Arc, + ) -> impl Stream> { + let credential_id = self.credential_id; + let year = self.year; + let season_str = self.season_str.clone(); + + try_stream! { + let mikan_base_url = ctx.mikan().base_url().clone(); + + let mikan_client = ctx.mikan() + .fork_with_credential(ctx.clone(), credential_id) + .await?; + + let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url.clone(), year, season_str); + + let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?; + + let mut bangumi_indices_meta = { + let html = Html::parse_document(&content); + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url) + }; + + if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? { + mikan_client.login().await?; + let content = fetch_html(&mikan_client, mikan_season_flow_url).await?; + let html = Html::parse_document(&content); + bangumi_indices_meta = + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url); + } + + + mikan_client + .sync_credential_cookies(ctx.clone(), credential_id) + .await?; + + for bangumi_index in bangumi_indices_meta { + let bangumi_title = bangumi_index.bangumi_title.clone(); + let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url( + mikan_base_url.clone(), + &bangumi_index.mikan_bangumi_id, + ); + let bangumi_expand_subscribed_fragment = + fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?; + + let bangumi_meta = { + let html = Html::parse_document(&bangumi_expand_subscribed_fragment); + + extract_mikan_bangumi_meta_from_expand_subscribed_fragment( + &html, + bangumi_index, + mikan_base_url.clone(), + ) + .with_whatever_context::<_, String, RecorderError>(|| { + format!("failed to extract mikan bangumi fansub of title = {bangumi_title}") + }) + }?; + + yield bangumi_meta; + } + + mikan_client + .sync_credential_cookies(ctx, credential_id) + .await?; + } + } + + pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + let source_url = Url::parse(&model.source_url)?; + + let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url) + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanSeasonSubscription should extract season_str and year from source_url, \ + source_url = {}, subscription_id = {}", + source_url, model.id + ) + })?; + + let credential_id = model + .credential_id + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanSeasonSubscription credential_id is required, subscription_id = {}", + model.id + ) + })?; + + Ok(Self { + id: model.id, + year: source_url_meta.year, + season_str: source_url_meta.season_str, + credential_id, + subscriber_id: model.subscriber_id, + }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] +pub struct MikanBangumiSubscription { + pub id: i32, + pub mikan_bangumi_id: String, + pub mikan_fansub_id: String, + pub subscriber_id: i32, +} + +impl MikanBangumiSubscription { + #[tracing::instrument] + pub fn pull_rss_items( + &self, + ctx: Arc, + ) -> impl Stream> { + let mikan_bangumi_id = self.mikan_bangumi_id.clone(); + let mikan_fansub_id = self.mikan_fansub_id.clone(); + + try_stream! { + let mikan_base_url = ctx.mikan().base_url().clone(); + let rss_url = build_mikan_bangumi_subscription_rss_url(mikan_base_url.clone(), &mikan_bangumi_id, Some(&mikan_fansub_id)); + let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; + + let channel = rss::Channel::read_from(&bytes[..])?; + + for (idx, item) in channel.items.into_iter().enumerate() { + let item = MikanRssItem::try_from(item).inspect_err( + |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), + )?; + yield item + } + } + } + + pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + let source_url = Url::parse(&model.source_url)?; + + let meta = MikanBangumiRssUrlMeta::from_url(&source_url) + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "MikanBangumiSubscription need to extract bangumi id and fansub id from \ + source_url = {}, subscription_id = {}", + source_url, model.id + ) + })?; + + Ok(Self { + id: model.id, + mikan_bangumi_id: meta.mikan_bangumi_id, + mikan_fansub_id: meta.mikan_fansub_id, + subscriber_id: model.subscriber_id, + }) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use downloader::bittorrent::BITTORRENT_MIME_TYPE; + use rstest::rstest; + use url::Url; + + use crate::{ + errors::RecorderResult, + extract::mikan::{ + MikanBangumiIndexRssChannel, MikanBangumiRssChannel, MikanRssChannel, + extract_mikan_rss_channel_from_rss_link, + }, + test_utils::mikan::build_testing_mikan_client, + }; + + #[rstest] + #[tokio::test] + async fn test_parse_mikan_rss_channel_from_rss_link() -> RecorderResult<()> { + let mut mikan_server = mockito::Server::new_async().await; + + let mikan_base_url = Url::parse(&mikan_server.url())?; + + let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; + + { + let bangumi_rss_url = + mikan_base_url.join("/RSS/Bangumi?bangumiId=3141&subgroupid=370")?; + + let bangumi_rss_mock = mikan_server + .mock("GET", bangumi_rss_url.path()) + .with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss") + .match_query(mockito::Matcher::Any) + .create_async() + .await; + + let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) + .await + .expect("should get mikan channel from rss url"); + + assert_matches!( + &channel, + MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. }) + ); + + assert_matches!(&channel.name(), Some("葬送的芙莉莲")); + + let items = channel.items(); + let first_sub_item = items + .first() + .expect("mikan subscriptions should have at least one subs"); + + assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE); + + assert!( + &first_sub_item + .homepage + .as_str() + .starts_with("https://mikanani.me/Home/Episode") + ); + + let name = first_sub_item.title.as_str(); + assert!(name.contains("葬送的芙莉莲")); + + bangumi_rss_mock.expect(1); + } + { + let bangumi_rss_url = mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?; + + let bangumi_rss_mock = mikan_server + .mock("GET", bangumi_rss_url.path()) + .match_query(mockito::Matcher::Any) + .with_body_from_file("tests/resources/mikan/Bangumi-3416.rss") + .create_async() + .await; + + let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url) + .await + .expect("should get mikan channel from rss url"); + + assert_matches!( + &channel, + MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { .. }) + ); + + assert_matches!(&channel.name(), Some("叹气的亡灵想隐退")); + + bangumi_rss_mock.expect(1); + } + Ok(()) + } +} diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index 6934270..e626470 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, fmt, sync::Arc}; +use std::{borrow::Cow, fmt, str::FromStr, sync::Arc}; use async_stream::try_stream; use bytes::Bytes; @@ -7,14 +7,13 @@ use futures::{Stream, TryStreamExt, pin_mut}; use html_escape::decode_html_entities; use scraper::{Html, Selector}; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; +use snafu::FromString; use tracing::instrument; use url::Url; use super::{ MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_POSTER_BUCKET_KEY, MIKAN_SEASON_FLOW_PAGE_PATH, MikanBangumiRssUrlMeta, MikanClient, - extract_mikan_bangumi_id_from_rss_url, }; use crate::{ app::AppContextTrait, @@ -77,6 +76,19 @@ impl MikanBangumiMeta { } } +impl From for MikanBangumiMeta { + fn from(episode_meta: MikanEpisodeMeta) -> Self { + Self { + homepage: episode_meta.homepage, + origin_poster_src: episode_meta.origin_poster_src, + bangumi_title: episode_meta.bangumi_title, + mikan_bangumi_id: episode_meta.mikan_bangumi_id, + mikan_fansub_id: episode_meta.mikan_fansub_id, + fansub: episode_meta.fansub, + } + } +} + impl MikanBangumiMeta { pub fn from_bangumi_index_and_fansub_meta( bangumi_index_meta: MikanBangumiIndexMeta, @@ -128,7 +140,7 @@ impl MikanBangumiIndexHomepageUrlMeta { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct MikanBangumiHomepageUrlMeta { pub mikan_bangumi_id: String, pub mikan_fansub_id: String, @@ -194,12 +206,49 @@ impl fmt::Display for MikanSeasonStr { } } +impl FromStr for MikanSeasonStr { + type Err = RecorderError; + + fn from_str(s: &str) -> Result { + match s { + "春" => Ok(MikanSeasonStr::Spring), + "夏" => Ok(MikanSeasonStr::Summer), + "秋" => Ok(MikanSeasonStr::Autumn), + "冬" => Ok(MikanSeasonStr::Winter), + _ => Err(RecorderError::without_source(format!( + "MikanSeasonStr must be one of '春', '夏', '秋', '冬', but got '{}'", + s + ))), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct MikanSeasonFlowUrlMeta { pub year: i32, pub season_str: MikanSeasonStr, } +impl MikanSeasonFlowUrlMeta { + pub fn from_url(url: &Url) -> Option { + if url.path().starts_with(MIKAN_SEASON_FLOW_PAGE_PATH) { + if let (Some(year), Some(season_str)) = ( + url.query_pairs() + .find(|(key, _)| key == "year") + .and_then(|(_, value)| value.parse::().ok()), + url.query_pairs() + .find(|(key, _)| key == "seasonStr") + .and_then(|(_, value)| MikanSeasonStr::from_str(&value).ok()), + ) { + Some(Self { year, season_str }) + } else { + None + } + } else { + None + } + } +} pub fn build_mikan_bangumi_homepage_url( mikan_base_url: Url, mikan_bangumi_id: &str, @@ -271,15 +320,11 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html( .next() .and_then(|el| el.value().attr("href")) .and_then(|s| mikan_episode_homepage_url.join(s).ok()) - .and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_url(&rss_link_url)) + .and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) .ok_or_else(|| { RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_bangumi_id")) })?; - let mikan_fansub_id = mikan_fansub_id.ok_or_else(|| { - RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_fansub_id")) - })?; - let episode_title = html .select(&Selector::parse("title").unwrap()) .next() @@ -379,7 +424,7 @@ pub fn extract_mikan_bangumi_index_meta_from_bangumi_homepage_html( .next() .and_then(|el| el.value().attr("href")) .and_then(|s| mikan_bangumi_homepage_url.join(s).ok()) - .and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_url(&rss_link_url)) + .and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) .map( |MikanBangumiRssUrlMeta { mikan_bangumi_id, .. @@ -677,87 +722,6 @@ pub fn extract_mikan_bangumi_meta_from_expand_subscribed_fragment( } } -pub fn scrape_mikan_bangumi_meta_stream_from_season_flow_url( - ctx: Arc, - mikan_season_flow_url: Url, - credential_id: i32, -) -> impl Stream> { - try_stream! { - let mikan_client = ctx.mikan() - .fork_with_credential(ctx.clone(), credential_id) - .await?; - - let mikan_base_url = mikan_client.base_url(); - let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?; - let mut bangumi_indices_meta = { - let html = Html::parse_document(&content); - extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url) - }; - - if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? { - mikan_client.login().await?; - let content = fetch_html(&mikan_client, mikan_season_flow_url).await?; - let html = Html::parse_document(&content); - bangumi_indices_meta = - extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url); - } - - - mikan_client - .sync_credential_cookies(ctx.clone(), credential_id) - .await?; - - for bangumi_index in bangumi_indices_meta { - let bangumi_title = bangumi_index.bangumi_title.clone(); - let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url( - mikan_base_url.clone(), - &bangumi_index.mikan_bangumi_id, - ); - let bangumi_expand_subscribed_fragment = - fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?; - - let bangumi_meta = { - let html = Html::parse_document(&bangumi_expand_subscribed_fragment); - - extract_mikan_bangumi_meta_from_expand_subscribed_fragment( - &html, - bangumi_index, - mikan_base_url.clone(), - ) - .with_whatever_context::<_, String, RecorderError>(|| { - format!("failed to extract mikan bangumi fansub of title = {bangumi_title}") - }) - }?; - - yield bangumi_meta; - } - - mikan_client - .sync_credential_cookies(ctx, credential_id) - .await?; - } -} - -#[instrument(err, skip_all, fields(mikan_season_flow_url = mikan_season_flow_url.as_str(), credential_id = credential_id))] -pub async fn scrape_mikan_bangumi_meta_list_from_season_flow_url( - _mikan_client: &MikanClient, - ctx: Arc, - mikan_season_flow_url: Url, - credential_id: i32, -) -> RecorderResult> { - let stream = scrape_mikan_bangumi_meta_stream_from_season_flow_url( - ctx, - mikan_season_flow_url, - credential_id, - ); - - pin_mut!(stream); - - let bangumi_metas = stream.try_collect().await?; - - Ok(bangumi_metas) -} - #[cfg(test)] mod test { #![allow(unused_variables)] diff --git a/apps/recorder/src/graphql/mikan/mikan_scrape_season_subscription.rs b/apps/recorder/src/graphql/mikan/mikan_scrape_season_subscription.rs deleted file mode 100644 index 90058ff..0000000 --- a/apps/recorder/src/graphql/mikan/mikan_scrape_season_subscription.rs +++ /dev/null @@ -1,5 +0,0 @@ -use async_graphql::{Context, dynamic::SchemaBuilder}; - -pub fn register_mikan_scrape_season_subscription_mutation(builder: SchemaBuilder) { - -} diff --git a/apps/recorder/src/graphql/mikan/mod.rs b/apps/recorder/src/graphql/mikan/mod.rs index cdc8462..8c05bcc 100644 --- a/apps/recorder/src/graphql/mikan/mod.rs +++ b/apps/recorder/src/graphql/mikan/mod.rs @@ -1 +1,64 @@ -mod mikan_scrape_season_subscription; +mod scrape_season_subscription; + +use std::sync::Arc; + +use async_graphql::{Context, Object, Result as GraphQLResult}; +use snafu::FromString; + +use crate::{ + app::AppContextTrait, + auth::AuthUserInfo, + errors::RecorderError, + graphql::mikan::scrape_season_subscription::{ + MikanScrapeSeasonSubscriptionInput, MikanScrapeSeasonSubscriptionOutput, + }, + models::{ + subscriber_tasks, + subscriptions::{self, SubscriptionCategory}, + }, + task::{SubscriberTaskPayload, mikan::MikanScrapeSeasonSubscriptionTask}, +}; + +struct MikanQuery; + +struct MikanMutation; + +#[Object] +impl MikanMutation { + async fn mikan_scrape_season_subscription( + &self, + ctx: &Context<'_>, + input: MikanScrapeSeasonSubscriptionInput, + ) -> GraphQLResult { + let auth_user = ctx.data::()?; + let app_ctx = ctx.data::>()?; + + let subscription = + subscriptions::Model::find_by_id(app_ctx.as_ref(), input.subscription_id) + .await? + .ok_or_else(|| RecorderError::DbError { + source: sea_orm::DbErr::RecordNotFound(String::from("subscription not found")), + })?; + + if subscription.category != SubscriptionCategory::MikanSeason { + Err(RecorderError::without_source( + "subscription must be a mikan season subscription".to_string(), + ))?; + } + + let credential_id = subscription.credential_id.ok_or_else(|| { + RecorderError::without_source("subscription must have a credential".to_string()) + })?; + + let task = subscriber_tasks::Model::add_subscriber_task( + app_ctx.clone(), + auth_user.subscriber_auth.subscriber_id, + SubscriberTaskPayload::MikanScrapeSeasonSubscription(todo!()), + ) + .await?; + + Ok(MikanScrapeSeasonSubscriptionOutput { task_id: 1 }) + } +} + +struct MikanSubscription; diff --git a/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs b/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs new file mode 100644 index 0000000..4503ab8 --- /dev/null +++ b/apps/recorder/src/graphql/mikan/scrape_season_subscription.rs @@ -0,0 +1,12 @@ +use async_graphql::{InputObject, SimpleObject}; +use serde::{Deserialize, Serialize}; + +#[derive(InputObject, Serialize, Deserialize)] +pub struct MikanScrapeSeasonSubscriptionInput { + pub subscription_id: i32, +} + +#[derive(SimpleObject, Serialize, Deserialize)] +pub struct MikanScrapeSeasonSubscriptionOutput { + pub task_id: i32, +} diff --git a/apps/recorder/src/migrations/m20220101_000001_init.rs b/apps/recorder/src/migrations/m20220101_000001_init.rs index d4e087b..bf13f15 100644 --- a/apps/recorder/src/migrations/m20220101_000001_init.rs +++ b/apps/recorder/src/migrations/m20220101_000001_init.rs @@ -50,7 +50,9 @@ impl MigrationTrait for Migration { create_postgres_enum_for_active_enum!( manager, subscriptions::SubscriptionCategoryEnum, - subscriptions::SubscriptionCategory::Mikan, + subscriptions::SubscriptionCategory::MikanSubscriber, + subscriptions::SubscriptionCategory::MikanBangumi, + subscriptions::SubscriptionCategory::MikanSeason, subscriptions::SubscriptionCategory::Manual ) .await?; diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index 7fdee2c..b5ba725 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -1,10 +1,19 @@ +use std::sync::Arc; + use async_graphql::SimpleObject; use async_trait::async_trait; use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::OnConflict}; use serde::{Deserialize, Serialize}; use super::subscription_bangumi; -use crate::{app::AppContextTrait, errors::RecorderResult}; +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + extract::{ + mikan::{MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url}, + rawname::parse_episode_meta_from_raw_name, + }, +}; #[derive( Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject, @@ -174,5 +183,38 @@ impl Model { } } +impl ActiveModel { + pub fn from_mikan_bangumi_meta( + ctx: Arc, + meta: MikanBangumiMeta, + subscriber_id: i32, + ) -> RecorderResult { + let mikan_base_url = ctx.mikan().base_url(); + + let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; + + let rss_url = build_mikan_bangumi_subscription_rss_url( + mikan_base_url.clone(), + &meta.mikan_bangumi_id, + Some(&meta.mikan_fansub_id), + ); + + Ok(Self { + mikan_bangumi_id: ActiveValue::Set(Some(meta.mikan_bangumi_id)), + mikan_fansub_id: ActiveValue::Set(Some(meta.mikan_fansub_id)), + subscriber_id: ActiveValue::Set(subscriber_id), + display_name: ActiveValue::Set(meta.bangumi_title.clone()), + raw_name: ActiveValue::Set(meta.bangumi_title), + season: ActiveValue::Set(raw_meta.season), + season_raw: ActiveValue::Set(raw_meta.season_raw), + fansub: ActiveValue::Set(Some(meta.fansub)), + poster_link: ActiveValue::Set(meta.origin_poster_src.map(|url| url.to_string())), + homepage: ActiveValue::Set(Some(meta.homepage.to_string())), + rss_link: ActiveValue::Set(Some(rss_url.to_string())), + ..Default::default() + }) + } +} + #[async_trait] impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/query/mod.rs b/apps/recorder/src/models/query/mod.rs index 5ffbcc8..e6d87a3 100644 --- a/apps/recorder/src/models/query/mod.rs +++ b/apps/recorder/src/models/query/mod.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use sea_orm::{ - prelude::Expr, - sea_query::{Alias, IntoColumnRef, IntoTableRef, Query, SelectStatement}, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel, Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, Value, + prelude::Expr, + sea_query::{Alias, IntoColumnRef, IntoTableRef, Query, SelectStatement}, }; pub fn filter_values_in< @@ -17,12 +17,9 @@ pub fn filter_values_in< values: I, ) -> SelectStatement { Query::select() - .expr(Expr::col((Alias::new("t"), Alias::new("column1")))) - .from_values(values, Alias::new("t")) - .left_join( - tbl_ref, - Expr::col((Alias::new("t"), Alias::new("column1"))).equals(col_ref), - ) + .expr(Expr::col(("t", "column1"))) + .from_values(values, "t") + .left_join(tbl_ref, Expr::col(("t", "column1")).equals(col_ref)) .and_where(Expr::col(col_ref).is_not_null()) .to_owned() } diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks.rs index 793debc..abbd7be 100644 --- a/apps/recorder/src/models/subscriber_tasks.rs +++ b/apps/recorder/src/models/subscriber_tasks.rs @@ -4,7 +4,11 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, JsonValue, TryIntoModel, prelude use serde::{Deserialize, Serialize}; pub use crate::task::{SubscriberTaskType, SubscriberTaskTypeEnum}; -use crate::{app::AppContextTrait, errors::RecorderResult, task::SubscriberTask}; +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + task::{SubscriberTask, SubscriberTaskPayload}, +}; #[derive(Debug, Clone, Serialize, Deserialize, FromJsonQueryResult, PartialEq, Eq)] pub struct SubscriberTaskErrorSnapshot { @@ -125,9 +129,11 @@ impl Model { pub async fn add_subscriber_task( ctx: Arc, subscriber_id: i32, - task_type: SubscriberTaskType, - request: JsonValue, - ) -> RecorderResult { + payload: SubscriberTaskPayload, + ) -> RecorderResult { + let task_type = payload.task_type(); + let request: JsonValue = payload.clone().try_into()?; + let am = ActiveModel { subscriber_id: ActiveValue::Set(subscriber_id), task_type: ActiveValue::Set(task_type.clone()), @@ -137,17 +143,18 @@ impl Model { let db = ctx.db(); - let model = am.insert(db).await?.try_into_model()?; + let task_id = Entity::insert(am).exec(db).await?.last_insert_id; - let task_value: SubscriberTask = serde_json::from_value(serde_json::json!({ - "id": model.id, - "subscriber_id": model.subscriber_id.clone(), - "task_type": model.task_type.clone(), - "request": model.request.clone(), - }))?; + let subscriber_task = SubscriberTask { + id: task_id, + subscriber_id, + payload, + }; - ctx.task().add_subscriber_task(task_value).await?; + ctx.task() + .add_subscriber_task(subscriber_task.clone()) + .await?; - Ok(model) + Ok(subscriber_task) } } diff --git a/apps/recorder/src/models/subscriptions.rs b/apps/recorder/src/models/subscriptions.rs index 3c5179f..419405b 100644 --- a/apps/recorder/src/models/subscriptions.rs +++ b/apps/recorder/src/models/subscriptions.rs @@ -8,11 +8,12 @@ use serde::{Deserialize, Serialize}; use super::{bangumi, episodes, query::filter_values_in}; use crate::{ app::AppContextTrait, - errors::RecorderResult, + errors::{RecorderError, RecorderResult}, extract::{ mikan::{ - MikanBangumiPosterMeta, build_mikan_bangumi_homepage_url, build_mikan_bangumi_rss_url, - extract_mikan_rss_channel_from_rss_link, + MikanBangumiPosterMeta, MikanBangumiSubscription, MikanSeasonSubscription, + MikanSubscriberSubscription, build_mikan_bangumi_homepage_url, + build_mikan_bangumi_subscription_rss_url, scrape_mikan_bangumi_meta_from_bangumi_homepage_url, scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_meta_from_image_url, @@ -32,12 +33,55 @@ use crate::{ )] #[serde(rename_all = "snake_case")] pub enum SubscriptionCategory { - #[sea_orm(string_value = "mikan")] - Mikan, + #[sea_orm(string_value = "mikan_subscriber")] + MikanSubscriber, + #[sea_orm(string_value = "mikan_season")] + MikanSeason, + #[sea_orm(string_value = "mikan_bangumi")] + MikanBangumi, #[sea_orm(string_value = "manual")] Manual, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "category")] +pub enum SubscriptionPayload { + #[serde(rename = "mikan_subscriber")] + MikanSubscriber(MikanSubscriberSubscription), + #[serde(rename = "mikan_season")] + MikanSeason(MikanSeasonSubscription), + #[serde(rename = "mikan_bangumi")] + MikanBangumi(MikanBangumiSubscription), + #[serde(rename = "manual")] + Manual, +} + +impl SubscriptionPayload { + pub fn category(&self) -> SubscriptionCategory { + match self { + Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber, + Self::MikanSeason(_) => SubscriptionCategory::MikanSeason, + Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi, + Self::Manual => SubscriptionCategory::Manual, + } + } + + pub fn try_from_model(model: &Model) -> RecorderResult { + Ok(match model.category { + SubscriptionCategory::MikanSubscriber => { + Self::MikanSubscriber(MikanSubscriberSubscription::try_from_model(model)?) + } + SubscriptionCategory::MikanSeason => { + Self::MikanSeason(MikanSeasonSubscription::try_from_model(model)?) + } + SubscriptionCategory::MikanBangumi => { + Self::MikanBangumi(MikanBangumiSubscription::try_from_model(model)?) + } + SubscriptionCategory::Manual => Self::Manual, + }) + } +} + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscriptions")] pub struct Model { @@ -149,57 +193,15 @@ pub enum RelatedEntity { SubscriptionBangumi, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct SubscriptionCreateFromRssDto { - pub rss_link: String, - pub display_name: String, - pub enabled: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "category")] -pub enum SubscriptionCreateDto { - Mikan(SubscriptionCreateFromRssDto), -} - #[async_trait] impl ActiveModelBehavior for ActiveModel {} -impl ActiveModel { - pub fn from_create_dto(create_dto: SubscriptionCreateDto, subscriber_id: i32) -> Self { - match create_dto { - SubscriptionCreateDto::Mikan(create_dto) => { - Self::from_rss_create_dto(SubscriptionCategory::Mikan, create_dto, subscriber_id) - } - } - } - - fn from_rss_create_dto( - category: SubscriptionCategory, - create_dto: SubscriptionCreateFromRssDto, - subscriber_id: i32, - ) -> Self { - Self { - display_name: ActiveValue::Set(create_dto.display_name), - enabled: ActiveValue::Set(create_dto.enabled.unwrap_or(false)), - subscriber_id: ActiveValue::Set(subscriber_id), - category: ActiveValue::Set(category), - source_url: ActiveValue::Set(create_dto.rss_link), - ..Default::default() - } - } -} +impl ActiveModel {} impl Model { - pub async fn add_subscription( - ctx: &dyn AppContextTrait, - create_dto: SubscriptionCreateDto, - subscriber_id: i32, - ) -> RecorderResult { + pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RecorderResult> { let db = ctx.db(); - let subscription = ActiveModel::from_create_dto(create_dto, subscriber_id); - - Ok(subscription.insert(db).await?) + Ok(Entity::find_by_id(id).one(db).await?) } pub async fn toggle_with_ids( @@ -229,8 +231,8 @@ impl Model { } pub async fn pull_subscription(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { - match &self.category { - SubscriptionCategory::Mikan => { + match payload { + SubscriptionPayload::MikanSubscriber(payload) => { let mikan_client = ctx.mikan(); let channel = extract_mikan_rss_channel_from_rss_link(mikan_client, &self.source_url).await?; @@ -288,7 +290,7 @@ impl Model { &mikan_bangumi_id, Some(&mikan_fansub_id), ); - let bgm_rss_link = build_mikan_bangumi_rss_url( + let bgm_rss_link = build_mikan_bangumi_subscription_rss_url( mikan_base_url.clone(), &mikan_bangumi_id, Some(&mikan_fansub_id), diff --git a/apps/recorder/src/task/core.rs b/apps/recorder/src/task/core.rs index e4dbc66..e8e6a1c 100644 --- a/apps/recorder/src/task/core.rs +++ b/apps/recorder/src/task/core.rs @@ -48,10 +48,11 @@ pub trait SubscriberStreamTaskTrait: Serialize + DeserializeOwned + Sized { fn run_stream( self, ctx: Arc, + id: i32, ) -> impl Stream> + Send; async fn run(self, ctx: Arc, id: i32) -> RecorderResult<()> { - let stream = self.run_stream(ctx.clone()); + let stream = self.run_stream(ctx.clone(), id); pin_mut!(stream); diff --git a/apps/recorder/src/task/mikan/scrape_season_subscription.rs b/apps/recorder/src/task/mikan/scrape_season_subscription.rs index 72a7af9..8714244 100644 --- a/apps/recorder/src/task/mikan/scrape_season_subscription.rs +++ b/apps/recorder/src/task/mikan/scrape_season_subscription.rs @@ -7,20 +7,15 @@ use serde::{Deserialize, Serialize}; use crate::{ app::AppContextTrait, errors::RecorderResult, - extract::mikan::{ - MikanBangumiMeta, MikanSeasonStr, build_mikan_season_flow_url, - scrape_mikan_bangumi_meta_stream_from_season_flow_url, - }, + extract::mikan::{MikanBangumiMeta, MikanSeasonStr, MikanSeasonSubscription}, task::SubscriberStreamTaskTrait, }; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] pub struct MikanScrapeSeasonSubscriptionTask { - pub task_id: i32, pub year: i32, pub season_str: MikanSeasonStr, pub credential_id: i32, - pub subscriber_id: i32, } #[async_trait::async_trait] @@ -30,16 +25,15 @@ impl SubscriberStreamTaskTrait for MikanScrapeSeasonSubscriptionTask { fn run_stream( self, ctx: Arc, + id: i32, ) -> impl Stream> { - let mikan_base_url = ctx.mikan().base_url().clone(); + let task = Arc::new(MikanSeasonSubscription { + id, + year: self.year, + season_str: self.season_str, + credential_id: self.credential_id, + }); - let mikan_season_flow_url = - build_mikan_season_flow_url(mikan_base_url, self.year, self.season_str); - - scrape_mikan_bangumi_meta_stream_from_season_flow_url( - ctx.clone(), - mikan_season_flow_url, - self.credential_id, - ) + task.pull_bangumi_meta_stream(ctx) } } diff --git a/apps/recorder/src/task/registry.rs b/apps/recorder/src/task/registry.rs index 8165d0c..d6b19e1 100644 --- a/apps/recorder/src/task/registry.rs +++ b/apps/recorder/src/task/registry.rs @@ -2,6 +2,7 @@ use sea_orm::{DeriveActiveEnum, DeriveDisplay, prelude::*}; use serde::{Deserialize, Serialize}; use super::mikan::MikanScrapeSeasonSubscriptionTask; +use crate::errors::RecorderError; #[derive( Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, @@ -24,6 +25,33 @@ pub enum SubscriberTaskPayload { MikanScrapeSeasonSubscription(MikanScrapeSeasonSubscriptionTask), } +impl SubscriberTaskPayload { + pub fn task_type(&self) -> SubscriberTaskType { + match self { + Self::MikanScrapeSeasonSubscription(_) => { + SubscriberTaskType::MikanScrapeSeasonSubscription + } + } + } +} + +impl TryFrom for serde_json::Value { + type Error = RecorderError; + + fn try_from(value: SubscriberTaskPayload) -> Result { + let json_value = serde_json::to_value(&value)?; + Ok(match json_value { + serde_json::Value::Object(mut map) => { + map.remove("task_type"); + serde_json::Value::Object(map) + } + _ => { + unreachable!("payload must be an json object"); + } + }) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct SubscriberTask { pub id: i32,