diff --git a/apps/recorder/src/extract/mikan/mod.rs b/apps/recorder/src/extract/mikan/mod.rs index 37702ae..4942b77 100644 --- a/apps/recorder/src/extract/mikan/mod.rs +++ b/apps/recorder/src/extract/mikan/mod.rs @@ -23,11 +23,11 @@ pub use subscription::{ MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, }; pub use web::{ - MikanBangumiHomepageUrlMeta, MikanBangumiIndexHomepageUrlMeta, MikanBangumiIndexMeta, - MikanBangumiMeta, MikanBangumiPosterMeta, MikanEpisodeHomepageUrlMeta, MikanEpisodeMeta, - MikanSeasonFlowUrlMeta, MikanSeasonStr, build_mikan_bangumi_expand_subscribed_url, - build_mikan_bangumi_homepage_url, build_mikan_episode_homepage_url, - build_mikan_season_flow_url, extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, + MikanBangumiHash, MikanBangumiIndexHash, MikanBangumiIndexMeta, MikanBangumiMeta, + MikanBangumiPosterMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanSeasonFlowUrlMeta, + MikanSeasonStr, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url, + build_mikan_episode_homepage_url, build_mikan_season_flow_url, + extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, extract_mikan_bangumi_meta_from_expand_subscribed_fragment, extract_mikan_episode_meta_from_episode_homepage_html, scrape_mikan_bangumi_meta_from_bangumi_homepage_url, diff --git a/apps/recorder/src/extract/mikan/rss.rs b/apps/recorder/src/extract/mikan/rss.rs index 5d268a0..8d0e527 100644 --- a/apps/recorder/src/extract/mikan/rss.rs +++ b/apps/recorder/src/extract/mikan/rss.rs @@ -1,18 +1,11 @@ use std::borrow::Cow; -use bytes::Bytes; use chrono::DateTime; use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE; -use fetch::{FetchError, IntoUrl, bytes::fetch_bytes}; -use itertools::Itertools; use serde::{Deserialize, Serialize}; -use tracing::instrument; use url::Url; -use crate::{ - errors::app_error::{RecorderError, RecorderResult}, - extract::mikan::{MikanClient, MikanEpisodeHomepageUrlMeta}, -}; +use crate::{errors::app_error::RecorderError, extract::mikan::MikanEpisodeHash}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MikanRssItem { @@ -112,9 +105,10 @@ impl TryFrom for MikanRssItem { RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("homepage:link")) })?; - let MikanEpisodeHomepageUrlMeta { - mikan_episode_id, .. - } = MikanEpisodeHomepageUrlMeta::parse_url(&homepage).ok_or_else(|| { + let MikanEpisodeHash { + mikan_episode_token: mikan_episode_id, + .. + } = MikanEpisodeHash::from_homepage_url(&homepage).ok_or_else(|| { RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id")) })?; diff --git a/apps/recorder/src/extract/mikan/subscription.rs b/apps/recorder/src/extract/mikan/subscription.rs index d3ed74e..56d45fb 100644 --- a/apps/recorder/src/extract/mikan/subscription.rs +++ b/apps/recorder/src/extract/mikan/subscription.rs @@ -10,7 +10,10 @@ use futures::Stream; use itertools::Itertools; use maplit::hashmap; use scraper::Html; -use sea_orm::{ColumnTrait, EntityTrait, IntoSimpleExpr, QueryFilter, QuerySelect, prelude::Expr}; +use sea_orm::{ + ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoSimpleExpr, QueryFilter, + QuerySelect, prelude::Expr, sea_query::OnConflict, +}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use url::Url; @@ -19,8 +22,8 @@ use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, extract::mikan::{ - MikanBangumiHomepageUrlMeta, MikanBangumiMeta, MikanBangumiRssUrlMeta, MikanEpisodeMeta, - MikanRssItem, MikanSeasonFlowUrlMeta, MikanSeasonStr, + MikanBangumiHash, MikanBangumiMeta, MikanBangumiRssUrlMeta, MikanEpisodeHash, + MikanEpisodeMeta, MikanRssItem, MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSubscriberSubscriptionRssUrlMeta, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_subscription_rss_url, build_mikan_season_flow_url, build_mikan_subscriber_subscription_rss_url, @@ -29,7 +32,7 @@ use crate::{ scrape_mikan_episode_meta_from_episode_homepage_url, }, migrations::defs::Bangumi, - models::{bangumi, episodes, subscriptions}, + models::{bangumi, episodes, subscription_bangumi, subscription_episode, subscriptions}, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] @@ -46,137 +49,91 @@ impl MikanSubscriberSubscription { ctx: Arc, ) -> RecorderResult> { let mikan_client = ctx.mikan(); + let db = ctx.db(); - let new_episode_meta_list: Vec = { + let to_insert_episode_meta_list: Vec = { let rss_item_list = self.pull_rss_items(ctx.clone()).await?; - let existed_item_set = episodes::Entity::find() - .select_only() - .column(episodes::Column::MikanEpisodeId) - .filter( - episodes::Column::SubscriberId.eq(self.subscriber_id).add( - episodes::Column::MikanEpisodeId - .is_in(rss_item_list.iter().map(|s| s.mikan_episode_id.clone())), - ), - ) - .into_tuple::<(String,)>() - .all(ctx.db()) - .await? - .into_iter() - .map(|(value,)| value) - .collect::>(); + let existed_episode_token_list = episodes::Model::get_existed_mikan_episode_list( + ctx.as_ref(), + rss_item_list.iter().map(|s| MikanEpisodeHash { + mikan_episode_token: s.mikan_episode_id.clone(), + }), + self.subscriber_id, + self.id, + ) + .await? + .into_iter() + .map(|(id, hash)| (hash.mikan_episode_token, id)) + .collect::>(); - let mut result = vec![]; - for rss_item in rss_item_list - .into_iter() - .filter(|rss_item| !existed_item_set.contains(&rss_item.mikan_episode_id)) - { + let mut to_insert_episode_meta_list = vec![]; + + for to_insert_rss_item in rss_item_list.into_iter().filter(|rss_item| { + !existed_episode_token_list.contains_key(&rss_item.mikan_episode_id) + }) { let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url( mikan_client, - rss_item.homepage, + to_insert_rss_item.homepage, ) .await?; + to_insert_episode_meta_list.push(episode_meta); } - result + subscription_episode::Model::add_episodes_for_subscription( + ctx.as_ref(), + existed_episode_token_list.into_values(), + self.subscriber_id, + self.id, + ) + .await?; + + to_insert_episode_meta_list }; - { - let mut new_bangumi_hash_map = new_episode_meta_list + let new_episode_meta_bangumi_map = { + let bangumi_hash_map = to_insert_episode_meta_list .iter() - .map(|episode_meta| { - ( - MikanBangumiHomepageUrlMeta { - mikan_bangumi_id: episode_meta.mikan_bangumi_id.clone(), - mikan_fansub_id: episode_meta.mikan_fansub_id.clone(), - }, - episode_meta, - ) - }) + .map(|episode_meta| (episode_meta.bangumi_hash(), episode_meta)) .collect::>(); - let mut new_bangumi_meta_map: HashMap = - hashmap! {}; + let existed_bangumi_set = bangumi::Model::get_existed_mikan_bangumi_list( + ctx.as_ref(), + bangumi_hash_map.keys().cloned(), + self.subscriber_id, + self.id, + ) + .await? + .map(|(_, bangumi_hash)| bangumi_hash) + .collect::>(); - for bangumi_model in bangumi::Entity::find() - .filter({ - Expr::tuple([ - bangumi::Column::MikanBangumiId.into_simple_expr(), - bangumi::Column::MikanFansubId.into_simple_expr(), - bangumi::Column::SubscriberId.into_simple_expr(), - ]) - .in_tuples(new_bangumi_hash_map.iter().map( - |(bangumi_meta, _)| { - ( - bangumi_meta.mikan_bangumi_id.clone(), - bangumi_meta.mikan_fansub_id.clone(), - self.subscriber_id, - ) - }, - )) - }) - .all(ctx.db()) - .await? - { - let bangumi_hash = MikanBangumiHomepageUrlMeta { - mikan_bangumi_id: bangumi_model.mikan_bangumi_id.unwrap(), - mikan_fansub_id: bangumi_model.mikan_fansub_id.unwrap(), - }; - new_bangumi_hash_map.remove(&bangumi_hash); - new_bangumi_meta_map.insert(bangumi_hash, bangumi_model); - } + let mut to_insert_bangumi_list = vec![]; - for (bangumi_hash, episode_meta) in new_bangumi_hash_map { - let bangumi_meta: MikanBangumiMeta = episode_meta.clone().into(); + for (bangumi_hash, episode_meta) in bangumi_hash_map.iter() { + if !existed_bangumi_set.contains(&bangumi_hash) { + let bangumi_meta: MikanBangumiMeta = (*episode_meta).clone().into(); - let bangumi_active_model = bangumi::ActiveModel::from_mikan_bangumi_meta( - ctx.clone(), - bangumi_meta, - self.subscriber_id, - ) - .with_whatever_context::<_, String, RecorderError>(|_| { - format!( - "failed to create bangumi active model from mikan bangumi meta, \ - bangumi_meta = {:?}", - bangumi_meta + let bangumi_active_model = bangumi::ActiveModel::from_mikan_bangumi_meta( + ctx.as_ref(), + bangumi_meta, + self.subscriber_id, + self.id, ) - })?; + .await?; - new_bangumi_meta_map.insert(bangumi_hash, bangumi_active_model); - } - } - - let mut new_bangumi_meta_map: HashMap = { - let mut map = hashmap! {}; - - for bangumi_model in existed_bangumi_list { - let hash = MikanBangumiHomepageUrlMeta { - mikan_bangumi_id: bangumi_model.mikan_bangumi_id.unwrap(), - mikan_fansub_id: bangumi_model.mikan_fansub_id.unwrap(), - }; - new_bangumi_hash_map.remove(&hash); - map.insert(hash, bangumi_model); + to_insert_bangumi_list.push(bangumi_active_model); + } } - map + bangumi::Entity::insert_many(to_insert_bangumi_list) + .on_conflict_do_nothing() + .exec(db) + .await?; + + let mut new_episode_meta_bangumi_map: HashMap = + hashmap! {}; }; - for bangumi_hash in new_bangumi_hash_map { - bangumi::Entity::insert(bangumi::ActiveModel { - raw_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), - display_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), - ..Default::default() - }); - } - - bangumi::Entity::insert_many(new_bangumi_hash_map.values().map(|bangumi_meta| { - bangumi::ActiveModel { - raw_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), - display_name: ActiveValue::Set(bangumi_meta.bangumi_title.clone()), - ..Default::default() - } - })); - todo!() } diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index e626470..3ad9a61 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -117,6 +117,15 @@ pub struct MikanEpisodeMeta { pub mikan_episode_id: String, } +impl MikanEpisodeMeta { + pub fn bangumi_hash(&self) -> MikanBangumiHash { + MikanBangumiHash { + mikan_bangumi_id: self.mikan_bangumi_id.clone(), + mikan_fansub_id: self.mikan_fansub_id.clone(), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct MikanBangumiPosterMeta { pub origin_poster_src: Url, @@ -124,12 +133,12 @@ pub struct MikanBangumiPosterMeta { } #[derive(Clone, Debug, PartialEq)] -pub struct MikanBangumiIndexHomepageUrlMeta { +pub struct MikanBangumiIndexHash { pub mikan_bangumi_id: String, } -impl MikanBangumiIndexHomepageUrlMeta { - pub fn parse_url(url: &Url) -> Option { +impl MikanBangumiIndexHash { + pub fn from_homepage_url(url: &Url) -> Option { if url.path().starts_with("/Home/Bangumi/") { let mikan_bangumi_id = url.path().replace("/Home/Bangumi/", ""); @@ -141,13 +150,13 @@ impl MikanBangumiIndexHomepageUrlMeta { } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct MikanBangumiHomepageUrlMeta { +pub struct MikanBangumiHash { pub mikan_bangumi_id: String, pub mikan_fansub_id: String, } -impl MikanBangumiHomepageUrlMeta { - pub fn from_url(url: &Url) -> Option { +impl MikanBangumiHash { + pub fn from_homepage_url(url: &Url) -> Option { if url.path().starts_with("/Home/Bangumi/") { let mikan_bangumi_id = url.path().replace("/Home/Bangumi/", ""); @@ -163,16 +172,18 @@ impl MikanBangumiHomepageUrlMeta { } } -#[derive(Clone, Debug, PartialEq)] -pub struct MikanEpisodeHomepageUrlMeta { - pub mikan_episode_id: String, +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct MikanEpisodeHash { + pub mikan_episode_token: String, } -impl MikanEpisodeHomepageUrlMeta { - pub fn parse_url(url: &Url) -> Option { +impl MikanEpisodeHash { + pub fn from_homepage_url(url: &Url) -> Option { if url.path().starts_with("/Home/Episode/") { let mikan_episode_id = url.path().replace("/Home/Episode/", ""); - Some(Self { mikan_episode_id }) + Some(Self { + mikan_episode_token: mikan_episode_id, + }) } else { None } @@ -333,9 +344,10 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html( RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("episode_title")) })?; - let MikanEpisodeHomepageUrlMeta { - mikan_episode_id, .. - } = MikanEpisodeHomepageUrlMeta::parse_url(&mikan_episode_homepage_url).ok_or_else(|| { + let MikanEpisodeHash { + mikan_episode_token, + .. + } = MikanEpisodeHash::from_homepage_url(&mikan_episode_homepage_url).ok_or_else(|| { RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_episode_id")) })?; @@ -484,7 +496,7 @@ pub fn extract_mikan_bangumi_meta_from_bangumi_homepage_html( mikan_bangumi_homepage_url: Url, mikan_base_url: &Url, ) -> RecorderResult { - let mikan_fansub_id = MikanBangumiHomepageUrlMeta::from_url(&mikan_bangumi_homepage_url) + let mikan_fansub_id = MikanBangumiHash::from_homepage_url(&mikan_bangumi_homepage_url) .map(|s| s.mikan_fansub_id) .ok_or_else(|| { RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_fansub_id")) diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index b5ba725..4be1b18 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -2,15 +2,22 @@ use std::sync::Arc; use async_graphql::SimpleObject; use async_trait::async_trait; -use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::OnConflict}; +use sea_orm::{ + ActiveValue, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType, QuerySelect, + entity::prelude::*, + sea_query::{IntoCondition, OnConflict}, +}; use serde::{Deserialize, Serialize}; use super::subscription_bangumi; use crate::{ app::AppContextTrait, - errors::RecorderResult, + errors::{RecorderError, RecorderResult}, extract::{ - mikan::{MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url}, + mikan::{ + MikanBangumiHash, MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url, + scrape_mikan_poster_meta_from_image_url, + }, rawname::parse_episode_meta_from_raw_name, }, }; @@ -120,6 +127,59 @@ pub enum RelatedEntity { SubscriptionBangumi, } +impl ActiveModel { + #[tracing::instrument(err, skip_all, fields(mikan_bangumi_id = %meta.mikan_bangumi_id, mikan_fansub_id = %meta.mikan_fansub_id, subscriber_id = %subscriber_id))] + pub async fn from_mikan_bangumi_meta( + ctx: &dyn AppContextTrait, + meta: MikanBangumiMeta, + subscriber_id: i32, + _subscription_id: i32, + ) -> RecorderResult { + let mikan_client = ctx.mikan(); + let storage_service = ctx.storage(); + let mikan_base_url = mikan_client.base_url(); + + let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; + + let rss_url = build_mikan_bangumi_subscription_rss_url( + mikan_base_url.clone(), + &meta.mikan_bangumi_id, + Some(&meta.mikan_fansub_id), + ); + + let poster_link = if let Some(origin_poster_src) = meta.origin_poster_src { + let poster_meta = scrape_mikan_poster_meta_from_image_url( + mikan_client, + storage_service, + origin_poster_src, + subscriber_id, + ) + .await?; + poster_meta.poster_src + } else { + None + }; + + Ok(Self { + mikan_bangumi_id: ActiveValue::Set(Some(meta.mikan_bangumi_id)), + mikan_fansub_id: ActiveValue::Set(Some(meta.mikan_fansub_id)), + subscriber_id: ActiveValue::Set(subscriber_id), + display_name: ActiveValue::Set(meta.bangumi_title.clone()), + raw_name: ActiveValue::Set(meta.bangumi_title), + season: ActiveValue::Set(raw_meta.season), + season_raw: ActiveValue::Set(raw_meta.season_raw), + fansub: ActiveValue::Set(Some(meta.fansub)), + poster_link: ActiveValue::Set(poster_link), + homepage: ActiveValue::Set(Some(meta.homepage.to_string())), + rss_link: ActiveValue::Set(Some(rss_url.to_string())), + ..Default::default() + }) + } +} + +#[async_trait] +impl ActiveModelBehavior for ActiveModel {} + impl Model { pub async fn get_or_insert_from_mikan( ctx: &dyn AppContextTrait, @@ -181,40 +241,44 @@ impl Model { Ok(bgm) } } -} -impl ActiveModel { - pub fn from_mikan_bangumi_meta( - ctx: Arc, - meta: MikanBangumiMeta, + pub async fn get_existed_mikan_bangumi_list( + ctx: &dyn AppContextTrait, + hashes: impl Iterator, subscriber_id: i32, - ) -> RecorderResult { - let mikan_base_url = ctx.mikan().base_url(); - - let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; - - let rss_url = build_mikan_bangumi_subscription_rss_url( - mikan_base_url.clone(), - &meta.mikan_bangumi_id, - Some(&meta.mikan_fansub_id), - ); - - Ok(Self { - mikan_bangumi_id: ActiveValue::Set(Some(meta.mikan_bangumi_id)), - mikan_fansub_id: ActiveValue::Set(Some(meta.mikan_fansub_id)), - subscriber_id: ActiveValue::Set(subscriber_id), - display_name: ActiveValue::Set(meta.bangumi_title.clone()), - raw_name: ActiveValue::Set(meta.bangumi_title), - season: ActiveValue::Set(raw_meta.season), - season_raw: ActiveValue::Set(raw_meta.season_raw), - fansub: ActiveValue::Set(Some(meta.fansub)), - poster_link: ActiveValue::Set(meta.origin_poster_src.map(|url| url.to_string())), - homepage: ActiveValue::Set(Some(meta.homepage.to_string())), - rss_link: ActiveValue::Set(Some(rss_url.to_string())), - ..Default::default() - }) + _subscription_id: i32, + ) -> RecorderResult> { + Ok(Entity::find() + .select_only() + .column(Column::Id) + .column(Column::MikanBangumiId) + .column(Column::MikanFansubId) + .filter( + Expr::tuple([ + Column::MikanBangumiId.into_simple_expr(), + Column::MikanFansubId.into_simple_expr(), + Column::SubscriberId.into_simple_expr(), + ]) + .in_tuples(hashes.map(|hash| { + ( + hash.mikan_bangumi_id.clone(), + hash.mikan_fansub_id.clone(), + subscriber_id, + ) + })), + ) + .into_tuple::<(i32, String, String)>() + .all(ctx.db()) + .await? + .into_iter() + .map(|(bangumi_id, mikan_bangumi_id, mikan_fansub_id)| { + ( + bangumi_id, + MikanBangumiHash { + mikan_bangumi_id, + mikan_fansub_id, + }, + ) + })) } } - -#[async_trait] -impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/episodes.rs b/apps/recorder/src/models/episodes.rs index 2d636d3..ba806d5 100644 --- a/apps/recorder/src/models/episodes.rs +++ b/apps/recorder/src/models/episodes.rs @@ -1,15 +1,19 @@ use std::sync::Arc; use async_trait::async_trait; -use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::OnConflict}; +use sea_orm::{ + ActiveValue, ColumnTrait, FromJsonQueryResult, IntoSimpleExpr, JoinType, QuerySelect, + entity::prelude::*, + sea_query::{Alias, IntoCondition, OnConflict}, +}; use serde::{Deserialize, Serialize}; -use super::{bangumi, query::InsertManyReturningExt, subscription_episode}; +use super::{bangumi, query::InsertManyReturningExt, subscription_bangumi, subscription_episode}; use crate::{ app::AppContextTrait, - errors::RecorderResult, + errors::{RecorderError, RecorderResult}, extract::{ - mikan::{MikanEpisodeMeta, build_mikan_episode_homepage_url}, + mikan::{MikanEpisodeHash, MikanEpisodeMeta, build_mikan_episode_homepage_url}, rawname::parse_episode_meta_from_raw_name, }, }; @@ -134,59 +138,6 @@ pub struct MikanEpsiodeCreation { pub bangumi: Arc, } -impl Model { - pub async fn add_episodes( - ctx: &dyn AppContextTrait, - subscriber_id: i32, - subscription_id: i32, - creations: impl IntoIterator, - ) -> RecorderResult<()> { - let db = ctx.db(); - let new_episode_active_modes = creations - .into_iter() - .map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr)) - .inspect(|result| { - if let Err(e) = result { - tracing::warn!("Failed to create episode: {:?}", e); - } - }) - .flatten(); - - let inserted_episodes = Entity::insert_many(new_episode_active_modes) - .on_conflict( - OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId]) - .do_nothing() - .to_owned(), - ) - .exec_with_returning_columns(db, [Column::Id]) - .await? - .into_iter() - .flat_map(|r| r.try_get_many_by_index::()); - - let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| { - subscription_episode::ActiveModel::from_subscription_and_episode( - subscriber_id, - subscription_id, - episode_id, - ) - }); - - subscription_episode::Entity::insert_many(insert_subscription_episode_links) - .on_conflict( - OnConflict::columns([ - subscription_episode::Column::SubscriptionId, - subscription_episode::Column::EpisodeId, - ]) - .do_nothing() - .to_owned(), - ) - .exec(db) - .await?; - - Ok(()) - } -} - impl ActiveModel { pub fn from_mikan_episode_meta( ctx: &dyn AppContextTrait, @@ -239,3 +190,92 @@ impl ActiveModel { #[async_trait] impl ActiveModelBehavior for ActiveModel {} + +impl Model { + pub async fn get_existed_mikan_episode_list( + ctx: &dyn AppContextTrait, + ids: impl Iterator, + subscriber_id: i32, + _subscription_id: i32, + ) -> RecorderResult> { + let db = ctx.db(); + + Ok(Entity::find() + .select_only() + .column(Column::Id) + .column(Column::MikanEpisodeId) + .filter( + Expr::tuple([ + Column::MikanEpisodeId.into_simple_expr(), + Column::SubscriberId.into_simple_expr(), + ]) + .in_tuples( + ids.into_iter() + .map(|id| (id.mikan_episode_token, subscriber_id)), + ), + ) + .into_tuple::<(i32, String)>() + .all(db) + .await? + .into_iter() + .map(|(id, mikan_episode_id)| { + ( + id, + MikanEpisodeHash { + mikan_episode_token: mikan_episode_id, + }, + ) + })) + } + + pub async fn add_episodes( + ctx: &dyn AppContextTrait, + subscriber_id: i32, + subscription_id: i32, + creations: impl IntoIterator, + ) -> RecorderResult<()> { + let db = ctx.db(); + let new_episode_active_modes = creations + .into_iter() + .map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr)) + .inspect(|result| { + if let Err(e) = result { + tracing::warn!("Failed to create episode: {:?}", e); + } + }) + .flatten(); + + let inserted_episodes = Entity::insert_many(new_episode_active_modes) + .on_conflict( + OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId]) + .do_nothing() + .to_owned(), + ) + .exec_with_returning_columns(db, [Column::Id]) + .await? + .into_iter() + .flat_map(|r| r.try_get_many_by_index::()); + + let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| { + subscription_episode::ActiveModel::from_subscription_and_episode( + subscriber_id, + subscription_id, + episode_id, + ) + }); + + subscription_episode::Entity::insert_many(insert_subscription_episode_links) + .on_conflict( + OnConflict::columns([ + subscription_episode::Column::SubscriptionId, + subscription_episode::Column::EpisodeId, + ]) + .do_nothing() + .to_owned(), + ) + .exec(db) + .await?; + + Ok(()) + } +} diff --git a/apps/recorder/src/models/subscription_bangumi.rs b/apps/recorder/src/models/subscription_bangumi.rs index a27caf3..f802790 100644 --- a/apps/recorder/src/models/subscription_bangumi.rs +++ b/apps/recorder/src/models/subscription_bangumi.rs @@ -2,6 +2,8 @@ use async_trait::async_trait; use sea_orm::{ActiveValue, entity::prelude::*}; use serde::{Deserialize, Serialize}; +use crate::{app::AppContextTrait, errors::RecorderResult}; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscription_bangumi")] pub struct Model { @@ -69,3 +71,25 @@ impl ActiveModel { } } } + +impl Model { + pub async fn add_bangumis_for_subscription( + ctx: &dyn AppContextTrait, + bangumi_ids: impl Iterator, + subscriber_id: i32, + subscription_id: i32, + ) -> RecorderResult<()> { + let db = ctx.db(); + Entity::insert_many(bangumi_ids.map(|bangumi_id| ActiveModel { + bangumi_id: ActiveValue::Set(bangumi_id), + subscriber_id: ActiveValue::Set(subscriber_id), + subscription_id: ActiveValue::Set(subscription_id), + ..Default::default() + })) + .on_conflict_do_nothing() + .exec(db) + .await?; + + Ok(()) + } +} diff --git a/apps/recorder/src/models/subscription_episode.rs b/apps/recorder/src/models/subscription_episode.rs index abff5d5..577d1aa 100644 --- a/apps/recorder/src/models/subscription_episode.rs +++ b/apps/recorder/src/models/subscription_episode.rs @@ -2,6 +2,8 @@ use async_trait::async_trait; use sea_orm::{ActiveValue, entity::prelude::*}; use serde::{Deserialize, Serialize}; +use crate::{app::AppContextTrait, errors::RecorderResult}; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscription_episode")] pub struct Model { @@ -69,3 +71,25 @@ impl ActiveModel { } } } + +impl Model { + pub async fn add_episodes_for_subscription( + ctx: &dyn AppContextTrait, + episode_ids: impl Iterator, + subscriber_id: i32, + subscription_id: i32, + ) -> RecorderResult<()> { + let db = ctx.db(); + Entity::insert_many(episode_ids.map(|episode_id| ActiveModel { + episode_id: ActiveValue::Set(episode_id), + subscription_id: ActiveValue::Set(subscription_id), + subscriber_id: ActiveValue::Set(subscriber_id), + ..Default::default() + })) + .on_conflict_do_nothing() + .exec(db) + .await?; + + Ok(()) + } +}