refactor: continue

This commit is contained in:
master 2025-05-13 01:23:59 +08:00
parent 760cb2344e
commit bf270e4e87
34 changed files with 1210 additions and 1427 deletions

View File

@ -11,6 +11,7 @@ use openidconnect::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::prelude::*; use snafu::prelude::*;
use util::OptDynErr;
use crate::models::auth::AuthType; use crate::models::auth::AuthType;
@ -87,23 +88,29 @@ pub enum AuthError {
(if column.is_empty() { "" } else { "." }), (if column.is_empty() { "" } else { "." }),
source.message source.message
))] ))]
GraphQLPermissionError { GraphqlDynamicPermissionError {
#[snafu(source(false))] #[snafu(source(false))]
source: Box<async_graphql::Error>, source: Box<async_graphql::Error>,
field: String, field: String,
column: String, column: String,
context_path: String, context_path: String,
}, },
#[snafu(display("GraphQL permission denied since {field}"))]
GraphqlStaticPermissionError {
#[snafu(source)]
source: OptDynErr,
field: String,
},
} }
impl AuthError { impl AuthError {
pub fn from_graphql_subscribe_id_guard( pub fn from_graphql_dynamic_subscribe_id_guard(
source: async_graphql::Error, source: async_graphql::Error,
context: &ResolverContext, context: &ResolverContext,
field_name: &str, field_name: &str,
column_name: &str, column_name: &str,
) -> AuthError { ) -> AuthError {
AuthError::GraphQLPermissionError { AuthError::GraphqlDynamicPermissionError {
source: Box::new(source), source: Box::new(source),
field: field_name.to_string(), field: field_name.to_string(),
column: column_name.to_string(), column: column_name.to_string(),

View File

@ -2,7 +2,6 @@ mod client;
mod config; mod config;
mod constants; mod constants;
mod credential; mod credential;
mod rss;
mod subscription; mod subscription;
mod web; mod web;
@ -14,23 +13,22 @@ pub use constants::{
MIKAN_SEASON_FLOW_PAGE_PATH, MIKAN_UNKNOWN_FANSUB_ID, MIKAN_UNKNOWN_FANSUB_NAME, MIKAN_SEASON_FLOW_PAGE_PATH, MIKAN_UNKNOWN_FANSUB_ID, MIKAN_UNKNOWN_FANSUB_NAME,
}; };
pub use credential::MikanCredentialForm; pub use credential::MikanCredentialForm;
pub use rss::{
MikanBangumiRssChannel, MikanBangumiRssUrlMeta, MikanRssChannel, MikanRssItem,
MikanSubscriberRssChannel, MikanSubscriberSubscriptionRssUrlMeta,
build_mikan_bangumi_subscription_rss_url, build_mikan_subscriber_subscription_rss_url,
};
pub use subscription::{ pub use subscription::{
MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription,
}; };
pub use web::{ pub use web::{
MikanBangumiHash, MikanBangumiIndexHash, MikanBangumiIndexMeta, MikanBangumiMeta, MikanBangumiHash, MikanBangumiIndexHash, MikanBangumiIndexMeta, MikanBangumiMeta,
MikanBangumiPosterMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanSeasonFlowUrlMeta, MikanBangumiPosterMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanRssItem,
MikanSeasonStr, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url, MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSubscriberSubscriptionRssUrlMeta,
build_mikan_episode_homepage_url, build_mikan_season_flow_url, build_mikan_bangumi_expand_subscribed_url, build_mikan_bangumi_homepage_url,
build_mikan_bangumi_subscription_rss_url, build_mikan_episode_homepage_url,
build_mikan_season_flow_url, build_mikan_subscriber_subscription_rss_url,
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment, extract_mikan_bangumi_index_meta_list_from_season_flow_fragment,
extract_mikan_bangumi_meta_from_expand_subscribed_fragment, extract_mikan_bangumi_meta_from_expand_subscribed_fragment,
extract_mikan_episode_meta_from_episode_homepage_html, extract_mikan_episode_meta_from_episode_homepage_html,
scrape_mikan_bangumi_meta_from_bangumi_homepage_url, scrape_mikan_bangumi_meta_from_bangumi_homepage_url,
scrape_mikan_bangumi_meta_list_from_season_flow_url,
scrape_mikan_bangumi_meta_stream_from_season_flow_url,
scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_data_from_image_url, scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_data_from_image_url,
scrape_mikan_poster_meta_from_image_url, scrape_mikan_poster_meta_from_image_url,
}; };

View File

@ -1,204 +0,0 @@
use std::borrow::Cow;
use chrono::DateTime;
use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::{errors::app_error::RecorderError, extract::mikan::MikanEpisodeHash};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanRssItem {
pub title: String,
pub homepage: Url,
pub url: Url,
pub content_length: Option<u64>,
pub mime: String,
pub pub_date: Option<i64>,
pub mikan_episode_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanBangumiRssChannel {
pub name: String,
pub url: Url,
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
pub items: Vec<MikanRssItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanSubscriberRssChannel {
pub mikan_subscription_token: String,
pub url: Url,
pub items: Vec<MikanRssItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum MikanRssChannel {
Bangumi(MikanBangumiRssChannel),
Subscriber(MikanSubscriberRssChannel),
}
impl MikanRssChannel {
pub fn items(&self) -> &[MikanRssItem] {
match &self {
Self::Bangumi(MikanBangumiRssChannel { items, .. })
| Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items,
}
}
pub fn into_items(self) -> Vec<MikanRssItem> {
match self {
Self::Bangumi(MikanBangumiRssChannel { items, .. })
| Self::Subscriber(MikanSubscriberRssChannel { items, .. }) => items,
}
}
pub fn name(&self) -> Option<&str> {
match &self {
Self::Bangumi(MikanBangumiRssChannel { name, .. }) => Some(name.as_str()),
Self::Subscriber(MikanSubscriberRssChannel { .. }) => None,
}
}
pub fn url(&self) -> &Url {
match &self {
Self::Bangumi(MikanBangumiRssChannel { url, .. })
| Self::Subscriber(MikanSubscriberRssChannel { url, .. }) => url,
}
}
}
impl TryFrom<rss::Item> for MikanRssItem {
type Error = RecorderError;
fn try_from(item: rss::Item) -> Result<Self, Self::Error> {
let enclosure = item.enclosure.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("enclosure"))
})?;
let mime_type = enclosure.mime_type;
if mime_type != BITTORRENT_MIME_TYPE {
return Err(RecorderError::MimeError {
expected: String::from(BITTORRENT_MIME_TYPE),
found: mime_type.to_string(),
desc: String::from("MikanRssItem"),
});
}
let title = item.title.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("title:title"))
})?;
let enclosure_url = Url::parse(&enclosure.url).map_err(|err| {
RecorderError::from_mikan_rss_invalid_field_and_source(
"enclosure_url:enclosure.link".into(),
err,
)
})?;
let homepage = item
.link
.and_then(|link| Url::parse(&link).ok())
.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("homepage:link"))
})?;
let MikanEpisodeHash {
mikan_episode_token: mikan_episode_id,
..
} = MikanEpisodeHash::from_homepage_url(&homepage).ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id"))
})?;
Ok(MikanRssItem {
title,
homepage,
url: enclosure_url,
content_length: enclosure.length.parse().ok(),
mime: mime_type,
pub_date: item
.pub_date
.and_then(|s| DateTime::parse_from_rfc2822(&s).ok())
.map(|s| s.timestamp_millis()),
mikan_episode_id,
})
}
}
#[derive(Debug, Clone)]
pub struct MikanBangumiRssUrlMeta {
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
}
impl MikanBangumiRssUrlMeta {
pub fn from_url(url: &Url) -> Option<Self> {
if url.path() == "/RSS/Bangumi" {
if let (Some(mikan_fansub_id), Some(mikan_bangumi_id)) = (
url.query_pairs()
.find(|(k, _)| k == "subgroupid")
.map(|(_, v)| v.to_string()),
url.query_pairs()
.find(|(k, _)| k == "bangumiId")
.map(|(_, v)| v.to_string()),
) {
Some(MikanBangumiRssUrlMeta {
mikan_bangumi_id,
mikan_fansub_id,
})
} else {
None
}
} else {
None
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanSubscriberSubscriptionRssUrlMeta {
pub mikan_subscription_token: String,
}
impl MikanSubscriberSubscriptionRssUrlMeta {
pub fn from_url(url: &Url) -> Option<Self> {
if url.path() == "/RSS/MyBangumi" {
url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| {
MikanSubscriberSubscriptionRssUrlMeta {
mikan_subscription_token: v.to_string(),
}
})
} else {
None
}
}
}
pub fn build_mikan_bangumi_subscription_rss_url(
mikan_base_url: Url,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/RSS/Bangumi");
url.query_pairs_mut()
.append_pair("bangumiId", mikan_bangumi_id);
if let Some(mikan_fansub_id) = mikan_fansub_id {
url.query_pairs_mut()
.append_pair("subgroupid", mikan_fansub_id);
};
url
}
pub fn build_mikan_subscriber_subscription_rss_url(
mikan_base_url: Url,
mikan_subscription_token: &str,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/RSS/MyBangumi");
url.query_pairs_mut()
.append_pair("token", mikan_subscription_token);
url
}

View File

@ -1,146 +1,206 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt::Debug,
sync::Arc, sync::Arc,
}; };
use async_graphql::{InputObject, SimpleObject}; use async_graphql::{InputObject, SimpleObject};
use async_stream::try_stream; use fetch::fetch_bytes;
use fetch::{fetch_bytes, fetch_html}; use futures::try_join;
use futures::Stream;
use itertools::Itertools; use itertools::Itertools;
use maplit::hashmap; use maplit::hashmap;
use scraper::Html;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoSimpleExpr, QueryFilter, ActiveValue::Set, ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect,
QuerySelect, prelude::Expr, sea_query::OnConflict, RelationTrait,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt}; use snafu::OptionExt;
use url::Url; use url::Url;
use super::scrape_mikan_bangumi_meta_list_from_season_flow_url;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::{RecorderError, RecorderResult}, errors::{RecorderError, RecorderResult},
extract::mikan::{ extract::mikan::{
MikanBangumiHash, MikanBangumiMeta, MikanBangumiRssUrlMeta, MikanEpisodeHash, MikanBangumiHash, MikanBangumiMeta, MikanEpisodeHash, MikanEpisodeMeta, MikanRssItem,
MikanEpisodeMeta, MikanRssItem, MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanSubscriberSubscriptionRssUrlMeta,
MikanSubscriberSubscriptionRssUrlMeta, build_mikan_bangumi_expand_subscribed_url,
build_mikan_bangumi_subscription_rss_url, build_mikan_season_flow_url, build_mikan_bangumi_subscription_rss_url, build_mikan_season_flow_url,
build_mikan_subscriber_subscription_rss_url, build_mikan_subscriber_subscription_rss_url,
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment,
extract_mikan_bangumi_meta_from_expand_subscribed_fragment,
scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_episode_meta_from_episode_homepage_url,
}, },
migrations::defs::Bangumi, models::{
models::{bangumi, episodes, subscription_bangumi, subscription_episode, subscriptions}, bangumi, episodes, subscription_bangumi, subscription_episode,
subscriptions::{self, SubscriptionTrait},
},
}; };
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] #[tracing::instrument(err, skip(ctx, rss_item_list))]
pub struct MikanSubscriberSubscription { async fn sync_mikan_feeds_from_rss_item_list(
pub id: i32, ctx: &dyn AppContextTrait,
pub mikan_subscription_token: String, rss_item_list: Vec<MikanRssItem>,
pub subscriber_id: i32, subscriber_id: i32,
} subscription_id: i32,
) -> RecorderResult<()> {
impl MikanSubscriberSubscription { let (new_episode_meta_list, existed_episode_hash2id_map) = {
#[tracing::instrument(skip(ctx))] let existed_episode_hash2id_map = episodes::Model::get_existed_mikan_episode_list(
pub async fn pull_subscription( ctx,
&self,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Vec<MikanBangumiMeta>> {
let mikan_client = ctx.mikan();
let db = ctx.db();
let to_insert_episode_meta_list: Vec<MikanEpisodeMeta> = {
let rss_item_list = self.pull_rss_items(ctx.clone()).await?;
let existed_episode_token_list = episodes::Model::get_existed_mikan_episode_list(
ctx.as_ref(),
rss_item_list.iter().map(|s| MikanEpisodeHash { rss_item_list.iter().map(|s| MikanEpisodeHash {
mikan_episode_token: s.mikan_episode_id.clone(), mikan_episode_id: s.mikan_episode_id.clone(),
}), }),
self.subscriber_id, subscriber_id,
self.id, subscription_id,
) )
.await? .await?
.into_iter() .map(|(episode_id, hash, bangumi_id)| (hash.mikan_episode_id, (episode_id, bangumi_id)))
.map(|(id, hash)| (hash.mikan_episode_token, id))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let mut to_insert_episode_meta_list = vec![]; let mut new_episode_meta_list: Vec<MikanEpisodeMeta> = vec![];
let mikan_client = ctx.mikan();
for to_insert_rss_item in rss_item_list.into_iter().filter(|rss_item| { for to_insert_rss_item in rss_item_list.into_iter().filter(|rss_item| {
!existed_episode_token_list.contains_key(&rss_item.mikan_episode_id) !existed_episode_hash2id_map.contains_key(&rss_item.mikan_episode_id)
}) { }) {
let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url( let episode_meta = scrape_mikan_episode_meta_from_episode_homepage_url(
mikan_client, mikan_client,
to_insert_rss_item.homepage, to_insert_rss_item.homepage,
) )
.await?; .await?;
to_insert_episode_meta_list.push(episode_meta); new_episode_meta_list.push(episode_meta);
} }
(new_episode_meta_list, existed_episode_hash2id_map)
};
// subscribe existed but not subscribed episode and bangumi
let (existed_episode_id_list, existed_episode_bangumi_id_set): (Vec<i32>, HashSet<i32>) =
existed_episode_hash2id_map.into_values().unzip();
try_join!(
subscription_episode::Model::add_episodes_for_subscription( subscription_episode::Model::add_episodes_for_subscription(
ctx.as_ref(), ctx,
existed_episode_token_list.into_values(), existed_episode_id_list.into_iter(),
self.subscriber_id, subscriber_id,
self.id, subscription_id,
) ),
.await?; subscription_bangumi::Model::add_bangumis_for_subscription(
ctx,
existed_episode_bangumi_id_set.into_iter(),
subscriber_id,
subscription_id,
),
)?;
to_insert_episode_meta_list let new_episode_meta_list_group_by_bangumi_hash: HashMap<
MikanBangumiHash,
Vec<MikanEpisodeMeta>,
> = {
let mut m = hashmap! {};
for episode_meta in new_episode_meta_list {
let bangumi_hash = episode_meta.bangumi_hash();
m.entry(bangumi_hash)
.or_insert_with(Vec::new)
.push(episode_meta);
}
m
}; };
let new_episode_meta_bangumi_map = { for (group_bangumi_hash, group_episode_meta_list) in new_episode_meta_list_group_by_bangumi_hash
let bangumi_hash_map = to_insert_episode_meta_list {
.iter() let first_episode_meta = group_episode_meta_list.first().unwrap();
.map(|episode_meta| (episode_meta.bangumi_hash(), episode_meta)) let group_bangumi_model = bangumi::Model::get_or_insert_from_mikan(
.collect::<HashMap<_, _>>(); ctx,
group_bangumi_hash,
let existed_bangumi_set = bangumi::Model::get_existed_mikan_bangumi_list( subscriber_id,
ctx.as_ref(), subscription_id,
bangumi_hash_map.keys().cloned(), async || {
self.subscriber_id, let bangumi_meta: MikanBangumiMeta = first_episode_meta.clone().into();
self.id, let bangumi_am = bangumi::ActiveModel::from_mikan_bangumi_meta(
) ctx,
.await?
.map(|(_, bangumi_hash)| bangumi_hash)
.collect::<HashSet<_>>();
let mut to_insert_bangumi_list = vec![];
for (bangumi_hash, episode_meta) in bangumi_hash_map.iter() {
if !existed_bangumi_set.contains(&bangumi_hash) {
let bangumi_meta: MikanBangumiMeta = (*episode_meta).clone().into();
let bangumi_active_model = bangumi::ActiveModel::from_mikan_bangumi_meta(
ctx.as_ref(),
bangumi_meta, bangumi_meta,
self.subscriber_id, subscriber_id,
self.id, subscription_id,
)
.await?;
Ok(bangumi_am)
},
)
.await?;
let group_episode_creation_list = group_episode_meta_list
.into_iter()
.map(|episode_meta| (&group_bangumi_model, episode_meta));
episodes::Model::add_mikan_episodes_for_subscription(
ctx,
group_episode_creation_list.into_iter(),
subscriber_id,
subscription_id,
)
.await?;
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanSubscriberSubscription {
pub id: i32,
pub mikan_subscription_token: String,
pub subscriber_id: i32,
}
#[async_trait::async_trait]
impl SubscriptionTrait for MikanSubscriberSubscription {
fn get_subscriber_id(&self) -> i32 {
self.subscriber_id
}
fn get_subscription_id(&self) -> i32 {
self.id
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_list,
self.get_subscriber_id(),
self.get_subscription_id(),
) )
.await?; .await?;
to_insert_bangumi_list.push(bangumi_active_model); Ok(())
}
async fn sync_sources(&self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
Ok(())
}
fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> {
let source_url = Url::parse(&model.source_url)?;
let meta = MikanSubscriberSubscriptionRssUrlMeta::from_rss_url(&source_url)
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSubscriberSubscription should extract mikan_subscription_token from \
source_url = {}, subscription_id = {}",
source_url, model.id
)
})?;
Ok(Self {
id: model.id,
mikan_subscription_token: meta.mikan_subscription_token,
subscriber_id: model.subscriber_id,
})
} }
} }
bangumi::Entity::insert_many(to_insert_bangumi_list) impl MikanSubscriberSubscription {
.on_conflict_do_nothing() #[tracing::instrument(err, skip(ctx))]
.exec(db) async fn get_rss_item_list(
.await?;
let mut new_episode_meta_bangumi_map: HashMap<MikanBangumiHash, bangumi::Model> =
hashmap! {};
};
todo!()
}
#[tracing::instrument(skip(ctx))]
pub async fn pull_rss_items(
&self, &self,
ctx: Arc<dyn AppContextTrait>, ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> { ) -> RecorderResult<Vec<MikanRssItem>> {
let mikan_base_url = ctx.mikan().base_url().clone(); let mikan_base_url = ctx.mikan().base_url().clone();
let rss_url = build_mikan_subscriber_subscription_rss_url( let rss_url = build_mikan_subscriber_subscription_rss_url(
@ -160,25 +220,6 @@ impl MikanSubscriberSubscription {
} }
Ok(result) Ok(result)
} }
pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> {
let source_url = Url::parse(&model.source_url)?;
let meta = MikanSubscriberSubscriptionRssUrlMeta::from_url(&source_url)
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSubscriberSubscription should extract mikan_subscription_token from \
source_url = {}, subscription_id = {}",
source_url, model.id
)
})?;
Ok(Self {
id: model.id,
mikan_subscription_token: meta.mikan_subscription_token,
subscriber_id: model.subscriber_id,
})
}
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)]
@ -190,77 +231,60 @@ pub struct MikanSeasonSubscription {
pub subscriber_id: i32, pub subscriber_id: i32,
} }
impl MikanSeasonSubscription { #[async_trait::async_trait]
#[tracing::instrument] impl SubscriptionTrait for MikanSeasonSubscription {
pub fn pull_bangumi_meta_stream( fn get_subscriber_id(&self) -> i32 {
&self, self.subscriber_id
ctx: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = RecorderResult<MikanBangumiMeta>> {
let credential_id = self.credential_id;
let year = self.year;
let season_str = self.season_str.clone();
try_stream! {
let mikan_base_url = ctx.mikan().base_url().clone();
let mikan_client = ctx.mikan()
.fork_with_credential(ctx.clone(), credential_id)
.await?;
let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url.clone(), year, season_str);
let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?;
let mut bangumi_indices_meta = {
let html = Html::parse_document(&content);
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url)
};
if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? {
mikan_client.login().await?;
let content = fetch_html(&mikan_client, mikan_season_flow_url).await?;
let html = Html::parse_document(&content);
bangumi_indices_meta =
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url);
} }
fn get_subscription_id(&self) -> i32 {
self.id
}
mikan_client async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
.sync_credential_cookies(ctx.clone(), credential_id) let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
.await?;
for bangumi_index in bangumi_indices_meta { sync_mikan_feeds_from_rss_item_list(
let bangumi_title = bangumi_index.bangumi_title.clone(); ctx.as_ref(),
let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url( rss_item_list,
mikan_base_url.clone(), self.get_subscriber_id(),
&bangumi_index.mikan_bangumi_id, self.get_subscription_id(),
);
let bangumi_expand_subscribed_fragment =
fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?;
let bangumi_meta = {
let html = Html::parse_document(&bangumi_expand_subscribed_fragment);
extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
&html,
bangumi_index,
mikan_base_url.clone(),
) )
.with_whatever_context::<_, String, RecorderError>(|| {
format!("failed to extract mikan bangumi fansub of title = {bangumi_title}")
})
}?;
yield bangumi_meta;
}
mikan_client
.sync_credential_cookies(ctx, credential_id)
.await?; .await?;
}
Ok(())
} }
pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> { async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let bangumi_meta_list = self.get_bangumi_meta_list(ctx.clone()).await?;
let mikan_base_url = ctx.mikan().base_url();
let rss_link_list = bangumi_meta_list
.into_iter()
.map(|bangumi_meta| {
build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(),
&bangumi_meta.mikan_bangumi_id,
Some(&bangumi_meta.mikan_fansub_id),
)
.to_string()
})
.collect_vec();
subscriptions::Entity::update_many()
.set(subscriptions::ActiveModel {
source_urls: Set(Some(rss_link_list)),
..Default::default()
})
.filter(subscription_bangumi::Column::SubscriptionId.eq(self.id))
.exec(ctx.db())
.await?;
Ok(())
}
fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> {
let source_url = Url::parse(&model.source_url)?; let source_url = Url::parse(&model.source_url)?;
let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url) let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url)
@ -291,26 +315,53 @@ impl MikanSeasonSubscription {
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] impl MikanSeasonSubscription {
pub struct MikanBangumiSubscription { #[tracing::instrument(err, skip(ctx))]
pub id: i32, async fn get_bangumi_meta_list(
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
pub subscriber_id: i32,
}
impl MikanBangumiSubscription {
#[tracing::instrument]
pub fn pull_rss_items(
&self, &self,
ctx: Arc<dyn AppContextTrait>, ctx: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = RecorderResult<MikanRssItem>> { ) -> RecorderResult<Vec<MikanBangumiMeta>> {
let mikan_bangumi_id = self.mikan_bangumi_id.clone(); let credential_id = self.credential_id;
let mikan_fansub_id = self.mikan_fansub_id.clone(); let year = self.year;
let season_str = self.season_str;
try_stream! {
let mikan_base_url = ctx.mikan().base_url().clone(); let mikan_base_url = ctx.mikan().base_url().clone();
let rss_url = build_mikan_bangumi_subscription_rss_url(mikan_base_url.clone(), &mikan_bangumi_id, Some(&mikan_fansub_id)); let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url, year, season_str);
scrape_mikan_bangumi_meta_list_from_season_flow_url(
ctx,
mikan_season_flow_url,
credential_id,
)
.await
}
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> {
let db = ctx.db();
let subscribed_bangumi_list = bangumi::Entity::find()
.filter(Condition::all().add(subscription_bangumi::Column::SubscriptionId.eq(self.id)))
.join_rev(
JoinType::InnerJoin,
subscription_bangumi::Relation::Bangumi.def(),
)
.all(db)
.await?;
let mut rss_item_list = vec![];
for subscribed_bangumi in subscribed_bangumi_list {
let rss_url = subscribed_bangumi
.rss_link
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSeasonSubscription rss_link is required, subscription_id = {}",
self.id
)
})?;
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
let channel = rss::Channel::read_from(&bytes[..])?; let channel = rss::Channel::read_from(&bytes[..])?;
@ -319,15 +370,53 @@ impl MikanBangumiSubscription {
let item = MikanRssItem::try_from(item).inspect_err( let item = MikanRssItem::try_from(item).inspect_err(
|error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx),
)?; )?;
yield item rss_item_list.push(item);
} }
} }
Ok(rss_item_list)
}
} }
pub fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)]
pub struct MikanBangumiSubscription {
pub id: i32,
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
pub subscriber_id: i32,
}
#[async_trait::async_trait]
impl SubscriptionTrait for MikanBangumiSubscription {
fn get_subscriber_id(&self) -> i32 {
self.subscriber_id
}
fn get_subscription_id(&self) -> i32 {
self.id
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_list,
<Self as SubscriptionTrait>::get_subscriber_id(self),
<Self as SubscriptionTrait>::get_subscription_id(self),
)
.await?;
Ok(())
}
async fn sync_sources(&self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
Ok(())
}
fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> {
let source_url = Url::parse(&model.source_url)?; let source_url = Url::parse(&model.source_url)?;
let meta = MikanBangumiRssUrlMeta::from_url(&source_url) let meta = MikanBangumiHash::from_rss_url(&source_url)
.with_whatever_context::<_, String, RecorderError>(|| { .with_whatever_context::<_, String, RecorderError>(|| {
format!( format!(
"MikanBangumiSubscription need to extract bangumi id and fansub id from \ "MikanBangumiSubscription need to extract bangumi id and fansub id from \
@ -345,96 +434,133 @@ impl MikanBangumiSubscription {
} }
} }
#[cfg(test)] impl MikanBangumiSubscription {
mod tests { #[tracing::instrument(err, skip(ctx))]
use std::assert_matches::assert_matches; async fn get_rss_item_list(
&self,
use downloader::bittorrent::BITTORRENT_MIME_TYPE; ctx: &dyn AppContextTrait,
use rstest::rstest; ) -> RecorderResult<Vec<MikanRssItem>> {
use url::Url; let mikan_base_url = ctx.mikan().base_url().clone();
let rss_url = build_mikan_bangumi_subscription_rss_url(
use crate::{ mikan_base_url.clone(),
errors::RecorderResult, &self.mikan_bangumi_id,
extract::mikan::{ Some(&self.mikan_fansub_id),
MikanBangumiIndexRssChannel, MikanBangumiRssChannel, MikanRssChannel,
extract_mikan_rss_channel_from_rss_link,
},
test_utils::mikan::build_testing_mikan_client,
};
#[rstest]
#[tokio::test]
async fn test_parse_mikan_rss_channel_from_rss_link() -> RecorderResult<()> {
let mut mikan_server = mockito::Server::new_async().await;
let mikan_base_url = Url::parse(&mikan_server.url())?;
let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?;
{
let bangumi_rss_url =
mikan_base_url.join("/RSS/Bangumi?bangumiId=3141&subgroupid=370")?;
let bangumi_rss_mock = mikan_server
.mock("GET", bangumi_rss_url.path())
.with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss")
.match_query(mockito::Matcher::Any)
.create_async()
.await;
let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url)
.await
.expect("should get mikan channel from rss url");
assert_matches!(
&channel,
MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. })
); );
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
assert_matches!(&channel.name(), Some("葬送的芙莉莲")); let channel = rss::Channel::read_from(&bytes[..])?;
let items = channel.items(); let mut result = vec![];
let first_sub_item = items for (idx, item) in channel.items.into_iter().enumerate() {
.first() let item = MikanRssItem::try_from(item).inspect_err(
.expect("mikan subscriptions should have at least one subs"); |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx),
)?;
assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE); result.push(item);
assert!(
&first_sub_item
.homepage
.as_str()
.starts_with("https://mikanani.me/Home/Episode")
);
let name = first_sub_item.title.as_str();
assert!(name.contains("葬送的芙莉莲"));
bangumi_rss_mock.expect(1);
} }
{ Ok(result)
let bangumi_rss_url = mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?;
let bangumi_rss_mock = mikan_server
.mock("GET", bangumi_rss_url.path())
.match_query(mockito::Matcher::Any)
.with_body_from_file("tests/resources/mikan/Bangumi-3416.rss")
.create_async()
.await;
let channel = scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url)
.await
.expect("should get mikan channel from rss url");
assert_matches!(
&channel,
MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel { .. })
);
assert_matches!(&channel.name(), Some("叹气的亡灵想隐退"));
bangumi_rss_mock.expect(1);
}
Ok(())
} }
} }
// #[cfg(test)]
// mod tests {
// use std::assert_matches::assert_matches;
// use downloader::bittorrent::BITTORRENT_MIME_TYPE;
// use rstest::rstest;
// use url::Url;
// use crate::{
// errors::RecorderResult,
// extract::mikan::{
// MikanBangumiIndexRssChannel, MikanBangumiRssChannel,
// MikanRssChannel, build_mikan_bangumi_subscription_rss_url,
// extract_mikan_rss_channel_from_rss_link, },
// test_utils::mikan::build_testing_mikan_client,
// };
// #[rstest]
// #[tokio::test]
// async fn test_parse_mikan_rss_channel_from_rss_link() ->
// RecorderResult<()> { let mut mikan_server =
// mockito::Server::new_async().await;
// let mikan_base_url = Url::parse(&mikan_server.url())?;
// let mikan_client =
// build_testing_mikan_client(mikan_base_url.clone()).await?;
// {
// let bangumi_rss_url = build_mikan_bangumi_subscription_rss_url(
// mikan_base_url.clone(),
// "3141",
// Some("370"),
// );
// let bangumi_rss_mock = mikan_server
// .mock("GET", bangumi_rss_url.path())
//
// .with_body_from_file("tests/resources/mikan/Bangumi-3141-370.rss")
// .match_query(mockito::Matcher::Any)
// .create_async()
// .await;
// let channel =
// scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url)
// .await
// .expect("should get mikan channel from rss url");
// assert_matches!(
// &channel,
// MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. })
// );
// assert_matches!(&channel.name(), Some("葬送的芙莉莲"));
// let items = channel.items();
// let first_sub_item = items
// .first()
// .expect("mikan subscriptions should have at least one subs");
// assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE);
// assert!(
// &first_sub_item
// .homepage
// .as_str()
// .starts_with("https://mikanani.me/Home/Episode")
// );
// let name = first_sub_item.title.as_str();
// assert!(name.contains("葬送的芙莉莲"));
// bangumi_rss_mock.expect(1);
// }
// {
// let bangumi_rss_url =
// mikan_base_url.join("/RSS/Bangumi?bangumiId=3416")?;
// let bangumi_rss_mock = mikan_server
// .mock("GET", bangumi_rss_url.path())
// .match_query(mockito::Matcher::Any)
//
// .with_body_from_file("tests/resources/mikan/Bangumi-3416.rss")
// .create_async()
// .await;
// let channel =
// scrape_mikan_rss_channel_from_rss_link(&mikan_client, bangumi_rss_url)
// .await
// .expect("should get mikan channel from rss url");
// assert_matches!(
// &channel,
// MikanRssChannel::BangumiIndex(MikanBangumiIndexRssChannel {
// .. }) );
// assert_matches!(&channel.name(), Some("叹气的亡灵想隐退"));
// bangumi_rss_mock.expect(1);
// }
// Ok(())
// }
// }

View File

@ -2,29 +2,132 @@ use std::{borrow::Cow, fmt, str::FromStr, sync::Arc};
use async_stream::try_stream; use async_stream::try_stream;
use bytes::Bytes; use bytes::Bytes;
use chrono::DateTime;
use downloader::bittorrent::defs::BITTORRENT_MIME_TYPE;
use fetch::{html::fetch_html, image::fetch_image}; use fetch::{html::fetch_html, image::fetch_image};
use futures::{Stream, TryStreamExt, pin_mut}; use futures::{Stream, TryStreamExt, pin_mut};
use html_escape::decode_html_entities; use html_escape::decode_html_entities;
use scraper::{Html, Selector}; use scraper::{Html, Selector};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::FromString; use snafu::{FromString, OptionExt};
use tracing::instrument; use tracing::instrument;
use url::Url; use url::Url;
use super::{
MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_POSTER_BUCKET_KEY,
MIKAN_SEASON_FLOW_PAGE_PATH, MikanBangumiRssUrlMeta, MikanClient,
};
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::app_error::{RecorderError, RecorderResult}, errors::app_error::{RecorderError, RecorderResult},
extract::{ extract::{
html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref}, html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref},
media::extract_image_src_from_str, media::extract_image_src_from_str,
mikan::{
MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_POSTER_BUCKET_KEY,
MIKAN_SEASON_FLOW_PAGE_PATH, MikanClient,
},
}, },
storage::{StorageContentCategory, StorageServiceTrait}, storage::{StorageContentCategory, StorageServiceTrait},
}; };
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanRssItem {
pub title: String,
pub homepage: Url,
pub url: Url,
pub content_length: Option<u64>,
pub mime: String,
pub pub_date: Option<i64>,
pub mikan_episode_id: String,
}
impl TryFrom<rss::Item> for MikanRssItem {
type Error = RecorderError;
fn try_from(item: rss::Item) -> Result<Self, Self::Error> {
let enclosure = item.enclosure.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("enclosure"))
})?;
let mime_type = enclosure.mime_type;
if mime_type != BITTORRENT_MIME_TYPE {
return Err(RecorderError::MimeError {
expected: String::from(BITTORRENT_MIME_TYPE),
found: mime_type.to_string(),
desc: String::from("MikanRssItem"),
});
}
let title = item.title.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("title:title"))
})?;
let enclosure_url = Url::parse(&enclosure.url).map_err(|err| {
RecorderError::from_mikan_rss_invalid_field_and_source(
"enclosure_url:enclosure.link".into(),
err,
)
})?;
let homepage = item
.link
.and_then(|link| Url::parse(&link).ok())
.ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("homepage:link"))
})?;
let MikanEpisodeHash {
mikan_episode_id, ..
} = MikanEpisodeHash::from_homepage_url(&homepage).ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id"))
})?;
Ok(MikanRssItem {
title,
homepage,
url: enclosure_url,
content_length: enclosure.length.parse().ok(),
mime: mime_type,
pub_date: item
.pub_date
.and_then(|s| DateTime::parse_from_rfc2822(&s).ok())
.map(|s| s.timestamp_millis()),
mikan_episode_id,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanSubscriberSubscriptionRssUrlMeta {
pub mikan_subscription_token: String,
}
impl MikanSubscriberSubscriptionRssUrlMeta {
pub fn from_rss_url(url: &Url) -> Option<Self> {
if url.path() == "/RSS/MyBangumi" {
url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| {
MikanSubscriberSubscriptionRssUrlMeta {
mikan_subscription_token: v.to_string(),
}
})
} else {
None
}
}
pub fn build_rss_url(self, mikan_base_url: Url) -> Url {
build_mikan_subscriber_subscription_rss_url(mikan_base_url, &self.mikan_subscription_token)
}
}
pub fn build_mikan_subscriber_subscription_rss_url(
mikan_base_url: Url,
mikan_subscription_token: &str,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/RSS/MyBangumi");
url.query_pairs_mut()
.append_pair("token", mikan_subscription_token);
url
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)]
pub struct MikanBangumiIndexMeta { pub struct MikanBangumiIndexMeta {
pub homepage: Url, pub homepage: Url,
@ -147,6 +250,26 @@ impl MikanBangumiIndexHash {
None None
} }
} }
pub fn build_homepage_url(self, mikan_base_url: Url) -> Url {
build_mikan_bangumi_homepage_url(mikan_base_url, &self.mikan_bangumi_id, None)
}
}
pub fn build_mikan_bangumi_subscription_rss_url(
mikan_base_url: Url,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/RSS/Bangumi");
url.query_pairs_mut()
.append_pair("bangumiId", mikan_bangumi_id);
if let Some(mikan_fansub_id) = mikan_fansub_id {
url.query_pairs_mut()
.append_pair("subgroupid", mikan_fansub_id);
};
url
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -170,24 +293,70 @@ impl MikanBangumiHash {
None None
} }
} }
pub fn from_rss_url(url: &Url) -> Option<Self> {
if url.path() == "/RSS/Bangumi" {
if let (Some(mikan_fansub_id), Some(mikan_bangumi_id)) = (
url.query_pairs()
.find(|(k, _)| k == "subgroupid")
.map(|(_, v)| v.to_string()),
url.query_pairs()
.find(|(k, _)| k == "bangumiId")
.map(|(_, v)| v.to_string()),
) {
Some(Self {
mikan_bangumi_id,
mikan_fansub_id,
})
} else {
None
}
} else {
None
}
}
pub fn build_rss_url(self, mikan_base_url: Url) -> Url {
build_mikan_bangumi_subscription_rss_url(
mikan_base_url,
&self.mikan_bangumi_id,
Some(&self.mikan_fansub_id),
)
}
pub fn build_homepage_url(self, mikan_base_url: Url) -> Url {
build_mikan_bangumi_homepage_url(
mikan_base_url,
&self.mikan_bangumi_id,
Some(&self.mikan_fansub_id),
)
}
}
pub fn build_mikan_episode_homepage_url(mikan_base_url: Url, mikan_episode_id: &str) -> Url {
let mut url = mikan_base_url;
url.set_path(&format!("/Home/Episode/{mikan_episode_id}"));
url
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct MikanEpisodeHash { pub struct MikanEpisodeHash {
pub mikan_episode_token: String, pub mikan_episode_id: String,
} }
impl MikanEpisodeHash { impl MikanEpisodeHash {
pub fn from_homepage_url(url: &Url) -> Option<Self> { pub fn from_homepage_url(url: &Url) -> Option<Self> {
if url.path().starts_with("/Home/Episode/") { if url.path().starts_with("/Home/Episode/") {
let mikan_episode_id = url.path().replace("/Home/Episode/", ""); let mikan_episode_id = url.path().replace("/Home/Episode/", "");
Some(Self { Some(Self { mikan_episode_id })
mikan_episode_token: mikan_episode_id,
})
} else { } else {
None None
} }
} }
pub fn build_homepage_url(self, mikan_base_url: Url) -> Url {
build_mikan_episode_homepage_url(mikan_base_url, &self.mikan_episode_id)
}
} }
#[derive(async_graphql::Enum, Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)] #[derive(async_graphql::Enum, Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)]
@ -227,8 +396,7 @@ impl FromStr for MikanSeasonStr {
"" => Ok(MikanSeasonStr::Autumn), "" => Ok(MikanSeasonStr::Autumn),
"" => Ok(MikanSeasonStr::Winter), "" => Ok(MikanSeasonStr::Winter),
_ => Err(RecorderError::without_source(format!( _ => Err(RecorderError::without_source(format!(
"MikanSeasonStr must be one of '春', '夏', '秋', '冬', but got '{}'", "MikanSeasonStr must be one of '春', '夏', '秋', '冬', but got '{s}'"
s
))), ))),
} }
} }
@ -284,12 +452,6 @@ pub fn build_mikan_season_flow_url(
url url
} }
pub fn build_mikan_episode_homepage_url(mikan_base_url: Url, mikan_episode_id: &str) -> Url {
let mut url = mikan_base_url;
url.set_path(&format!("/Home/Episode/{mikan_episode_id}"));
url
}
pub fn build_mikan_bangumi_expand_subscribed_url( pub fn build_mikan_bangumi_expand_subscribed_url(
mikan_base_url: Url, mikan_base_url: Url,
mikan_bangumi_id: &str, mikan_bangumi_id: &str,
@ -322,7 +484,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html(
RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("bangumi_title")) RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("bangumi_title"))
})?; })?;
let MikanBangumiRssUrlMeta { let MikanBangumiHash {
mikan_bangumi_id, mikan_bangumi_id,
mikan_fansub_id, mikan_fansub_id,
.. ..
@ -331,7 +493,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html(
.next() .next()
.and_then(|el| el.value().attr("href")) .and_then(|el| el.value().attr("href"))
.and_then(|s| mikan_episode_homepage_url.join(s).ok()) .and_then(|s| mikan_episode_homepage_url.join(s).ok())
.and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) .and_then(|rss_link_url| MikanBangumiHash::from_rss_url(&rss_link_url))
.ok_or_else(|| { .ok_or_else(|| {
RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_bangumi_id")) RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_bangumi_id"))
})?; })?;
@ -345,8 +507,7 @@ pub fn extract_mikan_episode_meta_from_episode_homepage_html(
})?; })?;
let MikanEpisodeHash { let MikanEpisodeHash {
mikan_episode_token, mikan_episode_id, ..
..
} = MikanEpisodeHash::from_homepage_url(&mikan_episode_homepage_url).ok_or_else(|| { } = MikanEpisodeHash::from_homepage_url(&mikan_episode_homepage_url).ok_or_else(|| {
RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_episode_id")) RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_episode_id"))
})?; })?;
@ -436,9 +597,9 @@ pub fn extract_mikan_bangumi_index_meta_from_bangumi_homepage_html(
.next() .next()
.and_then(|el| el.value().attr("href")) .and_then(|el| el.value().attr("href"))
.and_then(|s| mikan_bangumi_homepage_url.join(s).ok()) .and_then(|s| mikan_bangumi_homepage_url.join(s).ok())
.and_then(|rss_link_url| MikanBangumiRssUrlMeta::from_url(&rss_link_url)) .and_then(|rss_link_url| MikanBangumiHash::from_rss_url(&rss_link_url))
.map( .map(
|MikanBangumiRssUrlMeta { |MikanBangumiHash {
mikan_bangumi_id, .. mikan_bangumi_id, ..
}| mikan_bangumi_id, }| mikan_bangumi_id,
) )
@ -734,10 +895,86 @@ pub fn extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
} }
} }
pub fn scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx: Arc<dyn AppContextTrait>,
mikan_season_flow_url: Url,
credential_id: i32,
) -> impl Stream<Item = RecorderResult<MikanBangumiMeta>> {
try_stream! {
let mikan_base_url = ctx.mikan().base_url().clone();
let mikan_client = ctx.mikan().fork_with_credential(ctx.clone(), credential_id).await?;
let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?;
let mut bangumi_indices_meta = {
let html = Html::parse_document(&content);
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url)
};
if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? {
mikan_client.login().await?;
let content = fetch_html(&mikan_client, mikan_season_flow_url).await?;
let html = Html::parse_document(&content);
bangumi_indices_meta =
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, &mikan_base_url);
}
mikan_client
.sync_credential_cookies(ctx.clone(), credential_id)
.await?;
for bangumi_index in bangumi_indices_meta {
let bangumi_title = bangumi_index.bangumi_title.clone();
let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url(
mikan_base_url.clone(),
&bangumi_index.mikan_bangumi_id,
);
let bangumi_expand_subscribed_fragment =
fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?;
let bangumi_meta = {
let html = Html::parse_document(&bangumi_expand_subscribed_fragment);
extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
&html,
bangumi_index,
mikan_base_url.clone(),
)
.with_whatever_context::<_, String, RecorderError>(|| {
format!("failed to extract mikan bangumi fansub of title = {bangumi_title}")
})
}?;
yield bangumi_meta;
}
mikan_client
.sync_credential_cookies(ctx, credential_id)
.await?;
}
}
pub async fn scrape_mikan_bangumi_meta_list_from_season_flow_url(
ctx: Arc<dyn AppContextTrait>,
mikan_season_flow_url: Url,
credential_id: i32,
) -> RecorderResult<Vec<MikanBangumiMeta>> {
let stream = scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx,
mikan_season_flow_url,
credential_id,
);
pin_mut!(stream);
stream.try_collect().await
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
#![allow(unused_variables)] #![allow(unused_variables)]
use std::fs; use std::{fs, sync::Arc};
use rstest::{fixture, rstest}; use rstest::{fixture, rstest};
use tracing::Level; use tracing::Level;
@ -1035,7 +1272,6 @@ mod test {
build_mikan_season_flow_url(mikan_base_url.clone(), 2025, MikanSeasonStr::Spring); build_mikan_season_flow_url(mikan_base_url.clone(), 2025, MikanSeasonStr::Spring);
let bangumi_meta_list = scrape_mikan_bangumi_meta_list_from_season_flow_url( let bangumi_meta_list = scrape_mikan_bangumi_meta_list_from_season_flow_url(
mikan_client,
app_ctx.clone(), app_ctx.clone(),
mikan_season_flow_url, mikan_season_flow_url,
credential.id, credential.id,

View File

@ -4,8 +4,10 @@ use async_graphql::dynamic::{ResolverContext, ValueAccessor};
use sea_orm::EntityTrait; use sea_orm::EntityTrait;
use seaography::{BuilderContext, FnGuard, GuardAction}; use seaography::{BuilderContext, FnGuard, GuardAction};
use super::util::{get_column_key, get_entity_key}; use crate::{
use crate::auth::{AuthError, AuthUserInfo}; auth::{AuthError, AuthUserInfo},
graphql::infra::util::{get_column_key, get_entity_key},
};
fn guard_data_object_accessor_with_subscriber_id( fn guard_data_object_accessor_with_subscriber_id(
value: ValueAccessor<'_>, value: ValueAccessor<'_>,
@ -108,7 +110,7 @@ where
subscriber_id, subscriber_id,
) )
.map_err(|inner_error| { .map_err(|inner_error| {
AuthError::from_graphql_subscribe_id_guard( AuthError::from_graphql_dynamic_subscribe_id_guard(
inner_error, inner_error,
context, context,
&entity_create_one_mutation_data_field_name, &entity_create_one_mutation_data_field_name,
@ -136,7 +138,7 @@ where
}) })
}) })
.map_err(|inner_error| { .map_err(|inner_error| {
AuthError::from_graphql_subscribe_id_guard( AuthError::from_graphql_dynamic_subscribe_id_guard(
inner_error, inner_error,
context, context,
&entity_create_batch_mutation_data_field_name, &entity_create_batch_mutation_data_field_name,
@ -157,7 +159,7 @@ where
subscriber_id, subscriber_id,
) )
.map_err(|inner_error| { .map_err(|inner_error| {
AuthError::from_graphql_subscribe_id_guard( AuthError::from_graphql_dynamic_subscribe_id_guard(
inner_error, inner_error,
context, context,
&entity_update_mutation_data_field_name, &entity_update_mutation_data_field_name,

View File

@ -1,64 +0,0 @@
mod scrape_season_subscription;
use std::sync::Arc;
use async_graphql::{Context, Object, Result as GraphQLResult};
use snafu::FromString;
use crate::{
app::AppContextTrait,
auth::AuthUserInfo,
errors::RecorderError,
graphql::mikan::scrape_season_subscription::{
MikanScrapeSeasonSubscriptionInput, MikanScrapeSeasonSubscriptionOutput,
},
models::{
subscriber_tasks,
subscriptions::{self, SubscriptionCategory},
},
task::{SubscriberTaskPayload, mikan::MikanScrapeSeasonSubscriptionTask},
};
struct MikanQuery;
struct MikanMutation;
#[Object]
impl MikanMutation {
async fn mikan_scrape_season_subscription(
&self,
ctx: &Context<'_>,
input: MikanScrapeSeasonSubscriptionInput,
) -> GraphQLResult<MikanScrapeSeasonSubscriptionOutput> {
let auth_user = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscription =
subscriptions::Model::find_by_id(app_ctx.as_ref(), input.subscription_id)
.await?
.ok_or_else(|| RecorderError::DbError {
source: sea_orm::DbErr::RecordNotFound(String::from("subscription not found")),
})?;
if subscription.category != SubscriptionCategory::MikanSeason {
Err(RecorderError::without_source(
"subscription must be a mikan season subscription".to_string(),
))?;
}
let credential_id = subscription.credential_id.ok_or_else(|| {
RecorderError::without_source("subscription must have a credential".to_string())
})?;
let task = subscriber_tasks::Model::add_subscriber_task(
app_ctx.clone(),
auth_user.subscriber_auth.subscriber_id,
SubscriberTaskPayload::MikanScrapeSeasonSubscription(todo!()),
)
.await?;
Ok(MikanScrapeSeasonSubscriptionOutput { task_id: 1 })
}
}
struct MikanSubscription;

View File

@ -1,12 +0,0 @@
use async_graphql::{InputObject, SimpleObject};
use serde::{Deserialize, Serialize};
#[derive(InputObject, Serialize, Deserialize)]
pub struct MikanScrapeSeasonSubscriptionInput {
pub subscription_id: i32,
}
#[derive(SimpleObject, Serialize, Deserialize)]
pub struct MikanScrapeSeasonSubscriptionOutput {
pub task_id: i32,
}

View File

@ -1,8 +1,8 @@
pub mod config; pub mod config;
pub mod infra; pub mod infra;
pub mod mikan;
pub mod schema_root; pub mod schema_root;
pub mod service; pub mod service;
pub mod views;
pub use config::GraphQLConfig; pub use config::GraphQLConfig;
pub use schema_root::schema; pub use schema_root::schema;

View File

@ -111,10 +111,6 @@ pub fn schema(
&mut context, &mut context,
&subscription_episode::Column::SubscriberId, &subscription_episode::Column::SubscriberId,
); );
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
&mut context,
&subscriber_tasks::Column::SubscriberId,
);
for column in subscribers::Column::iter() { for column in subscribers::Column::iter() {
if !matches!(column, subscribers::Column::Id) { if !matches!(column, subscribers::Column::Id) {
restrict_filter_input_for_entity::<subscribers::Entity>( restrict_filter_input_for_entity::<subscribers::Entity>(
@ -156,7 +152,6 @@ pub fn schema(
subscription_bangumi, subscription_bangumi,
subscription_episode, subscription_episode,
subscriptions, subscriptions,
subscriber_tasks,
] ]
); );
@ -165,7 +160,6 @@ pub fn schema(
builder.register_enumeration::<subscriptions::SubscriptionCategory>(); builder.register_enumeration::<subscriptions::SubscriptionCategory>();
builder.register_enumeration::<downloaders::DownloaderCategory>(); builder.register_enumeration::<downloaders::DownloaderCategory>();
builder.register_enumeration::<downloads::DownloadMime>(); builder.register_enumeration::<downloads::DownloadMime>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
} }
let schema = builder.schema_builder(); let schema = builder.schema_builder();

View File

@ -0,0 +1 @@
mod subscription;

View File

@ -0,0 +1,117 @@
use std::sync::Arc;
use async_graphql::{Context, InputObject, Object, Result as GraphQLResult, SimpleObject};
use sea_orm::{DbErr, EntityTrait};
use crate::{
app::AppContextTrait,
auth::AuthUserInfo,
errors::RecorderError,
models::subscriptions::{self, SubscriptionTrait},
task::SubscriberTaskPayload,
};
pub struct SubscriptionMutation;
#[derive(InputObject)]
struct SyncOneSubscriptionFilterInput {
pub subscription_id: i32,
}
#[derive(SimpleObject)]
struct SyncOneSubscriptionTaskOutput {
pub task_id: String,
}
#[Object]
impl SubscriptionMutation {
async fn sync_one_subscription_feeds(
&self,
ctx: &Context<'_>,
input: SyncOneSubscriptionFilterInput,
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id)
.one(app_ctx.db())
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
}
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
SubscriberTaskPayload::SyncOneSubscriptionFeeds(subscription.into()),
)
.await?;
Ok(SyncOneSubscriptionTaskOutput {
task_id: task_id.to_string(),
})
}
async fn sync_one_subscription_sources(
&self,
ctx: &Context<'_>,
input: SyncOneSubscriptionFilterInput,
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id)
.one(app_ctx.db())
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
}
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
SubscriberTaskPayload::SyncOneSubscriptionSources(subscription.into()),
)
.await?;
Ok(SyncOneSubscriptionTaskOutput {
task_id: task_id.to_string(),
})
}
}

View File

@ -32,6 +32,7 @@ pub enum Subscriptions {
SubscriberId, SubscriberId,
Category, Category,
SourceUrl, SourceUrl,
SourceUrls,
Enabled, Enabled,
CredentialId, CredentialId,
} }
@ -52,7 +53,6 @@ pub enum Bangumi {
RssLink, RssLink,
PosterLink, PosterLink,
SavePath, SavePath,
Deleted,
Homepage, Homepage,
Extra, Extra,
} }
@ -85,7 +85,6 @@ pub enum Episodes {
EpisodeIndex, EpisodeIndex,
Homepage, Homepage,
Subtitle, Subtitle,
Deleted,
Source, Source,
Extra, Extra,
} }
@ -150,18 +149,6 @@ pub enum Credential3rd {
UserAgent, UserAgent,
} }
#[derive(DeriveIden)]
pub enum SubscriberTasks {
Table,
Id,
SubscriberId,
TaskType,
Request,
Result,
Error,
Yields,
}
macro_rules! create_postgres_enum_for_active_enum { macro_rules! create_postgres_enum_for_active_enum {
($manager: expr, $active_enum: expr, $($enum_value:expr),+) => { ($manager: expr, $active_enum: expr, $($enum_value:expr),+) => {
{ {

View File

@ -64,6 +64,10 @@ impl MigrationTrait for Migration {
.col(string(Subscriptions::DisplayName)) .col(string(Subscriptions::DisplayName))
.col(integer(Subscriptions::SubscriberId)) .col(integer(Subscriptions::SubscriberId))
.col(text(Subscriptions::SourceUrl)) .col(text(Subscriptions::SourceUrl))
.col(array_null(
Subscriptions::SourceUrls,
ColumnType::String(StringLen::None),
))
.col(boolean(Subscriptions::Enabled)) .col(boolean(Subscriptions::Enabled))
.col(enumeration( .col(enumeration(
Subscriptions::Category, Subscriptions::Category,
@ -105,7 +109,6 @@ impl MigrationTrait for Migration {
.col(text_null(Bangumi::RssLink)) .col(text_null(Bangumi::RssLink))
.col(text_null(Bangumi::PosterLink)) .col(text_null(Bangumi::PosterLink))
.col(text_null(Bangumi::SavePath)) .col(text_null(Bangumi::SavePath))
.col(boolean(Bangumi::Deleted).default(false))
.col(text_null(Bangumi::Homepage)) .col(text_null(Bangumi::Homepage))
.col(json_binary_null(Bangumi::Extra)) .col(json_binary_null(Bangumi::Extra))
.foreign_key( .foreign_key(
@ -224,7 +227,6 @@ impl MigrationTrait for Migration {
.col(integer(Episodes::EpisodeIndex)) .col(integer(Episodes::EpisodeIndex))
.col(text_null(Episodes::Homepage)) .col(text_null(Episodes::Homepage))
.col(text_null(Episodes::Subtitle)) .col(text_null(Episodes::Subtitle))
.col(boolean(Episodes::Deleted).default(false))
.col(text_null(Episodes::Source)) .col(text_null(Episodes::Source))
.col(json_binary_null(Episodes::Extra)) .col(json_binary_null(Episodes::Extra))
.foreign_key( .foreign_key(

View File

@ -1,81 +0,0 @@
use sea_orm_migration::{
prelude::*,
schema::{array, enumeration, integer, json_binary, json_binary_null, pk_auto},
};
use super::defs::{SubscriberTasks, Subscribers, table_auto_z};
use crate::models::subscriber_tasks::{SubscriberTaskType, SubscriberTaskTypeEnum};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
table_auto_z(SubscriberTasks::Table)
.col(pk_auto(SubscriberTasks::Id))
.col(integer(SubscriberTasks::SubscriberId))
.col(enumeration(
SubscriberTasks::TaskType,
SubscriberTaskTypeEnum,
SubscriberTaskType::iden_values(),
))
.col(json_binary(SubscriberTasks::Request))
.col(json_binary_null(SubscriberTasks::Result))
.col(json_binary_null(SubscriberTasks::Error))
.col(
array(SubscriberTasks::Yields, ColumnType::JsonBinary)
.default(SimpleExpr::Custom(String::from("ARRAY[]::jsonb[]"))),
)
.foreign_key(
ForeignKey::create()
.name("fk_subscriber_tasks_subscriber_id")
.from_tbl(SubscriberTasks::Table)
.from_col(SubscriberTasks::SubscriberId)
.to_tbl(Subscribers::Table)
.to_col(Subscribers::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_subscriber_tasks_task_type")
.table(SubscriberTasks::Table)
.col(SubscriberTasks::TaskType)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.if_exists()
.name("idx_subscriber_tasks_task_type")
.table(SubscriberTasks::Table)
.to_owned(),
)
.await?;
manager
.drop_table(
Table::drop()
.if_exists()
.table(SubscriberTasks::Table)
.to_owned(),
)
.await?;
Ok(())
}
}

View File

@ -7,7 +7,6 @@ pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads; pub mod m20240224_082543_add_downloads;
pub mod m20241231_000001_auth; pub mod m20241231_000001_auth;
pub mod m20250501_021523_credential_3rd; pub mod m20250501_021523_credential_3rd;
pub mod m20250508_022044_subscriber_tasks;
pub struct Migrator; pub struct Migrator;
@ -19,7 +18,6 @@ impl MigratorTrait for Migrator {
Box::new(m20240224_082543_add_downloads::Migration), Box::new(m20240224_082543_add_downloads::Migration),
Box::new(m20241231_000001_auth::Migration), Box::new(m20241231_000001_auth::Migration),
Box::new(m20250501_021523_credential_3rd::Migration), Box::new(m20250501_021523_credential_3rd::Migration),
Box::new(m20250508_022044_subscriber_tasks::Migration),
] ]
} }
} }

View File

@ -1,18 +1,17 @@
use std::sync::Arc;
use async_graphql::SimpleObject; use async_graphql::SimpleObject;
use async_trait::async_trait; use async_trait::async_trait;
use sea_orm::{ use sea_orm::{
ActiveValue, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType, QuerySelect, ActiveValue, Condition, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType,
QuerySelect,
entity::prelude::*, entity::prelude::*,
sea_query::{IntoCondition, OnConflict}, sea_query::{Alias, IntoCondition, OnConflict},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::subscription_bangumi; use super::subscription_bangumi;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::{RecorderError, RecorderResult}, errors::RecorderResult,
extract::{ extract::{
mikan::{ mikan::{
MikanBangumiHash, MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url, MikanBangumiHash, MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url,
@ -63,8 +62,6 @@ pub struct Model {
pub rss_link: Option<String>, pub rss_link: Option<String>,
pub poster_link: Option<String>, pub poster_link: Option<String>,
pub save_path: Option<String>, pub save_path: Option<String>,
#[sea_orm(default = "false")]
pub deleted: bool,
pub homepage: Option<String>, pub homepage: Option<String>,
pub extra: Option<BangumiExtra>, pub extra: Option<BangumiExtra>,
} }
@ -139,7 +136,7 @@ impl ActiveModel {
let storage_service = ctx.storage(); let storage_service = ctx.storage();
let mikan_base_url = mikan_client.base_url(); let mikan_base_url = mikan_client.base_url();
let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?; let rawname_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?;
let rss_url = build_mikan_bangumi_subscription_rss_url( let rss_url = build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(), mikan_base_url.clone(),
@ -166,12 +163,20 @@ impl ActiveModel {
subscriber_id: ActiveValue::Set(subscriber_id), subscriber_id: ActiveValue::Set(subscriber_id),
display_name: ActiveValue::Set(meta.bangumi_title.clone()), display_name: ActiveValue::Set(meta.bangumi_title.clone()),
raw_name: ActiveValue::Set(meta.bangumi_title), raw_name: ActiveValue::Set(meta.bangumi_title),
season: ActiveValue::Set(raw_meta.season), season: ActiveValue::Set(rawname_meta.season),
season_raw: ActiveValue::Set(raw_meta.season_raw), season_raw: ActiveValue::Set(rawname_meta.season_raw),
fansub: ActiveValue::Set(Some(meta.fansub)), fansub: ActiveValue::Set(Some(meta.fansub)),
poster_link: ActiveValue::Set(poster_link), poster_link: ActiveValue::Set(poster_link),
homepage: ActiveValue::Set(Some(meta.homepage.to_string())), homepage: ActiveValue::Set(Some(meta.homepage.to_string())),
rss_link: ActiveValue::Set(Some(rss_url.to_string())), rss_link: ActiveValue::Set(Some(rss_url.to_string())),
extra: ActiveValue::Set(Some(BangumiExtra {
name_zh: rawname_meta.name_zh,
name_en: rawname_meta.name_en,
name_jp: rawname_meta.name_jp,
s_name_en: rawname_meta.name_en_no_season,
s_name_jp: rawname_meta.name_jp_no_season,
s_name_zh: rawname_meta.name_zh_no_season,
})),
..Default::default() ..Default::default()
}) })
} }
@ -183,35 +188,60 @@ impl ActiveModelBehavior for ActiveModel {}
impl Model { impl Model {
pub async fn get_or_insert_from_mikan<F>( pub async fn get_or_insert_from_mikan<F>(
ctx: &dyn AppContextTrait, ctx: &dyn AppContextTrait,
hash: MikanBangumiHash,
subscriber_id: i32, subscriber_id: i32,
subscription_id: i32, subscription_id: i32,
mikan_bangumi_id: String, create_bangumi_fn: F,
mikan_fansub_id: String, ) -> RecorderResult<Self>
f: F,
) -> RecorderResult<Model>
where where
F: AsyncFnOnce(&mut ActiveModel) -> RecorderResult<()>, F: AsyncFnOnce() -> RecorderResult<ActiveModel>,
{ {
#[derive(FromQueryResult)]
struct ModelWithIsSubscribed {
#[sea_orm(nested)]
bangumi: Model,
is_subscribed: bool,
}
let db = ctx.db(); let db = ctx.db();
if let Some(existed) = Entity::find()
let subscription_bangumi_alias = Alias::new("sb");
let mut is_subscribed = false;
let new_bangumi_model = if let Some(existed) = Entity::find()
.filter( .filter(
Column::MikanBangumiId Condition::all()
.eq(Some(mikan_bangumi_id.clone())) .add(Column::MikanBangumiId.eq(Some(hash.mikan_bangumi_id)))
.and(Column::MikanFansubId.eq(Some(mikan_fansub_id.clone()))), .add(Column::MikanFansubId.eq(Some(hash.mikan_fansub_id)))
.add(Column::SubscriberId.eq(subscriber_id)),
) )
.column_as(
Expr::col((
subscription_bangumi_alias.clone(),
subscription_bangumi::Column::SubscriptionId,
)),
"is_subscribed",
)
.join_as_rev(
JoinType::LeftJoin,
subscription_bangumi::Relation::Bangumi
.def()
.on_condition(move |_left, right| {
Expr::col((right, subscription_bangumi::Column::SubscriptionId))
.eq(subscription_id)
.into_condition()
}),
subscription_bangumi_alias.clone(),
)
.into_model::<ModelWithIsSubscribed>()
.one(db) .one(db)
.await? .await?
{ {
Ok(existed) is_subscribed = existed.is_subscribed;
existed.bangumi
} else { } else {
let mut bgm = ActiveModel { let new_bangumi_active_model = create_bangumi_fn().await?;
mikan_bangumi_id: ActiveValue::Set(Some(mikan_bangumi_id)),
mikan_fansub_id: ActiveValue::Set(Some(mikan_fansub_id)), Entity::insert(new_bangumi_active_model)
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default()
};
f(&mut bgm).await?;
let bgm = Entity::insert(bgm)
.on_conflict( .on_conflict(
OnConflict::columns([ OnConflict::columns([
Column::MikanBangumiId, Column::MikanBangumiId,
@ -220,26 +250,30 @@ impl Model {
]) ])
.update_columns([ .update_columns([
Column::RawName, Column::RawName,
Column::Extra,
Column::Fansub, Column::Fansub,
Column::PosterLink, Column::PosterLink,
Column::Season, Column::Season,
Column::SeasonRaw, Column::SeasonRaw,
Column::RssLink,
Column::Homepage,
]) ])
.to_owned(), .to_owned(),
) )
.exec_with_returning(db) .exec_with_returning(db)
.await?; .await?
};
if !is_subscribed {
subscription_bangumi::Entity::insert(subscription_bangumi::ActiveModel { subscription_bangumi::Entity::insert(subscription_bangumi::ActiveModel {
subscription_id: ActiveValue::Set(subscription_id), subscription_id: ActiveValue::Set(subscription_id),
bangumi_id: ActiveValue::Set(bgm.id), bangumi_id: ActiveValue::Set(new_bangumi_model.id),
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default() ..Default::default()
}) })
.on_conflict_do_nothing() .on_conflict_do_nothing()
.exec(db) .exec(db)
.await?; .await?;
Ok(bgm)
} }
Ok(new_bangumi_model)
} }
pub async fn get_existed_mikan_bangumi_list( pub async fn get_existed_mikan_bangumi_list(

View File

@ -1,17 +1,14 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use sea_orm::{ use sea_orm::{
ActiveValue, ColumnTrait, FromJsonQueryResult, IntoSimpleExpr, JoinType, QuerySelect, ActiveValue, FromJsonQueryResult, IntoSimpleExpr, QuerySelect, entity::prelude::*,
entity::prelude::*, sea_query::OnConflict,
sea_query::{Alias, IntoCondition, OnConflict},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::{bangumi, query::InsertManyReturningExt, subscription_bangumi, subscription_episode}; use super::{bangumi, query::InsertManyReturningExt, subscription_episode};
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::{RecorderError, RecorderResult}, errors::RecorderResult,
extract::{ extract::{
mikan::{MikanEpisodeHash, MikanEpisodeMeta, build_mikan_episode_homepage_url}, mikan::{MikanEpisodeHash, MikanEpisodeMeta, build_mikan_episode_homepage_url},
rawname::parse_episode_meta_from_raw_name, rawname::parse_episode_meta_from_raw_name,
@ -52,8 +49,6 @@ pub struct Model {
pub episode_index: i32, pub episode_index: i32,
pub homepage: Option<String>, pub homepage: Option<String>,
pub subtitle: Option<String>, pub subtitle: Option<String>,
#[sea_orm(default = "false")]
pub deleted: bool,
pub source: Option<String>, pub source: Option<String>,
pub extra: EpisodeExtra, pub extra: EpisodeExtra,
} }
@ -132,56 +127,47 @@ pub enum RelatedEntity {
SubscriptionEpisode, SubscriptionEpisode,
} }
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpsiodeCreation {
pub episode: MikanEpisodeMeta,
pub bangumi: Arc<bangumi::Model>,
}
impl ActiveModel { impl ActiveModel {
pub fn from_mikan_episode_meta( #[tracing::instrument(err, skip(ctx), fields(bangumi_id = ?bangumi.id, mikan_episode_id = ?episode.mikan_episode_id))]
pub fn from_mikan_bangumi_and_episode_meta(
ctx: &dyn AppContextTrait, ctx: &dyn AppContextTrait,
creation: MikanEpsiodeCreation, bangumi: &bangumi::Model,
episode: MikanEpisodeMeta,
) -> RecorderResult<Self> { ) -> RecorderResult<Self> {
let item = creation.episode; let mikan_base_url = ctx.mikan().base_url().clone();
let bgm = creation.bangumi; let rawname_meta = parse_episode_meta_from_raw_name(&episode.episode_title)?;
let raw_meta = parse_episode_meta_from_raw_name(&item.episode_title) let homepage = build_mikan_episode_homepage_url(mikan_base_url, &episode.mikan_episode_id);
.inspect_err(|e| {
tracing::warn!("Failed to parse episode meta: {:?}", e);
})
.ok()
.unwrap_or_default();
let homepage = build_mikan_episode_homepage_url(
ctx.mikan().base_url().clone(),
&item.mikan_episode_id,
);
Ok(Self { Ok(Self {
mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)), mikan_episode_id: ActiveValue::Set(Some(episode.mikan_episode_id)),
raw_name: ActiveValue::Set(item.episode_title.clone()), raw_name: ActiveValue::Set(episode.episode_title.clone()),
display_name: ActiveValue::Set(item.episode_title.clone()), display_name: ActiveValue::Set(episode.episode_title.clone()),
bangumi_id: ActiveValue::Set(bgm.id), bangumi_id: ActiveValue::Set(bangumi.id),
subscriber_id: ActiveValue::Set(bgm.subscriber_id), subscriber_id: ActiveValue::Set(bangumi.subscriber_id),
resolution: ActiveValue::Set(raw_meta.resolution), resolution: ActiveValue::Set(rawname_meta.resolution),
season: ActiveValue::Set(if raw_meta.season > 0 { season: ActiveValue::Set(if rawname_meta.season > 0 {
raw_meta.season rawname_meta.season
} else { } else {
bgm.season bangumi.season
}), }),
season_raw: ActiveValue::Set(raw_meta.season_raw.or_else(|| bgm.season_raw.clone())), season_raw: ActiveValue::Set(
fansub: ActiveValue::Set(raw_meta.fansub.or_else(|| bgm.fansub.clone())), rawname_meta
poster_link: ActiveValue::Set(bgm.poster_link.clone()), .season_raw
episode_index: ActiveValue::Set(raw_meta.episode_index), .or_else(|| bangumi.season_raw.clone()),
),
fansub: ActiveValue::Set(rawname_meta.fansub.or_else(|| bangumi.fansub.clone())),
poster_link: ActiveValue::Set(bangumi.poster_link.clone()),
episode_index: ActiveValue::Set(rawname_meta.episode_index),
homepage: ActiveValue::Set(Some(homepage.to_string())), homepage: ActiveValue::Set(Some(homepage.to_string())),
subtitle: ActiveValue::Set(raw_meta.subtitle), subtitle: ActiveValue::Set(rawname_meta.subtitle),
source: ActiveValue::Set(raw_meta.source), source: ActiveValue::Set(rawname_meta.source),
extra: ActiveValue::Set(EpisodeExtra { extra: ActiveValue::Set(EpisodeExtra {
name_zh: raw_meta.name_zh, name_zh: rawname_meta.name_zh,
name_en: raw_meta.name_en, name_en: rawname_meta.name_en,
name_jp: raw_meta.name_jp, name_jp: rawname_meta.name_jp,
s_name_en: raw_meta.name_en_no_season, s_name_en: rawname_meta.name_en_no_season,
s_name_jp: raw_meta.name_jp_no_season, s_name_jp: rawname_meta.name_jp_no_season,
s_name_zh: raw_meta.name_zh_no_season, s_name_zh: rawname_meta.name_zh_no_season,
}), }),
..Default::default() ..Default::default()
}) })
@ -197,13 +183,14 @@ impl Model {
ids: impl Iterator<Item = MikanEpisodeHash>, ids: impl Iterator<Item = MikanEpisodeHash>,
subscriber_id: i32, subscriber_id: i32,
_subscription_id: i32, _subscription_id: i32,
) -> RecorderResult<impl Iterator<Item = (i32, MikanEpisodeHash)>> { ) -> RecorderResult<impl Iterator<Item = (i32, MikanEpisodeHash, i32)>> {
let db = ctx.db(); let db = ctx.db();
Ok(Entity::find() Ok(Entity::find()
.select_only() .select_only()
.column(Column::Id) .column(Column::Id)
.column(Column::MikanEpisodeId) .column(Column::MikanEpisodeId)
.column(Column::BangumiId)
.filter( .filter(
Expr::tuple([ Expr::tuple([
Column::MikanEpisodeId.into_simple_expr(), Column::MikanEpisodeId.into_simple_expr(),
@ -211,44 +198,39 @@ impl Model {
]) ])
.in_tuples( .in_tuples(
ids.into_iter() ids.into_iter()
.map(|id| (id.mikan_episode_token, subscriber_id)), .map(|id| (id.mikan_episode_id, subscriber_id)),
), ),
) )
.into_tuple::<(i32, String)>() .into_tuple::<(i32, String, i32)>()
.all(db) .all(db)
.await? .await?
.into_iter() .into_iter()
.map(|(id, mikan_episode_id)| { .map(|(episode_id, mikan_episode_id, bangumi_id)| {
( (
id, episode_id,
MikanEpisodeHash { MikanEpisodeHash { mikan_episode_id },
mikan_episode_token: mikan_episode_id, bangumi_id,
},
) )
})) }))
} }
pub async fn add_episodes( pub async fn add_mikan_episodes_for_subscription(
ctx: &dyn AppContextTrait, ctx: &dyn AppContextTrait,
creations: impl Iterator<Item = (&bangumi::Model, MikanEpisodeMeta)>,
subscriber_id: i32, subscriber_id: i32,
subscription_id: i32, subscription_id: i32,
creations: impl IntoIterator<Item = MikanEpsiodeCreation>,
) -> RecorderResult<()> { ) -> RecorderResult<()> {
let db = ctx.db(); let db = ctx.db();
let new_episode_active_modes = creations let new_episode_active_modes: Vec<ActiveModel> = creations
.into_iter() .map(|(bangumi, episode_meta)| {
.map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr)) ActiveModel::from_mikan_bangumi_and_episode_meta(ctx, bangumi, episode_meta)
.inspect(|result| {
if let Err(e) = result {
tracing::warn!("Failed to create episode: {:?}", e);
}
}) })
.flatten(); .collect::<Result<_, _>>()?;
let inserted_episodes = Entity::insert_many(new_episode_active_modes) let new_episode_ids = Entity::insert_many(new_episode_active_modes)
.on_conflict( .on_conflict(
OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId]) OnConflict::columns([Column::MikanEpisodeId, Column::SubscriberId])
.do_nothing() .update_columns([Column::RawName, Column::PosterLink, Column::Homepage])
.to_owned(), .to_owned(),
) )
.exec_with_returning_columns(db, [Column::Id]) .exec_with_returning_columns(db, [Column::Id])
@ -256,24 +238,12 @@ impl Model {
.into_iter() .into_iter()
.flat_map(|r| r.try_get_many_by_index::<i32>()); .flat_map(|r| r.try_get_many_by_index::<i32>());
let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| { subscription_episode::Model::add_episodes_for_subscription(
subscription_episode::ActiveModel::from_subscription_and_episode( ctx,
new_episode_ids,
subscriber_id, subscriber_id,
subscription_id, subscription_id,
episode_id,
) )
});
subscription_episode::Entity::insert_many(insert_subscription_episode_links)
.on_conflict(
OnConflict::columns([
subscription_episode::Column::SubscriptionId,
subscription_episode::Column::EpisodeId,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?; .await?;
Ok(()) Ok(())

View File

@ -5,7 +5,6 @@ pub mod downloaders;
pub mod downloads; pub mod downloads;
pub mod episodes; pub mod episodes;
pub mod query; pub mod query;
pub mod subscriber_tasks;
pub mod subscribers; pub mod subscribers;
pub mod subscription_bangumi; pub mod subscription_bangumi;
pub mod subscription_episode; pub mod subscription_episode;

View File

@ -1,29 +1,9 @@
use async_trait::async_trait; use async_trait::async_trait;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel,
Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, Value, Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, sea_query::Query,
prelude::Expr,
sea_query::{Alias, IntoColumnRef, IntoTableRef, Query, SelectStatement},
}; };
pub fn filter_values_in<
I: IntoIterator<Item = T>,
T: Into<Value>,
R: IntoTableRef,
C: IntoColumnRef + Copy,
>(
tbl_ref: R,
col_ref: C,
values: I,
) -> SelectStatement {
Query::select()
.expr(Expr::col(("t", "column1")))
.from_values(values, "t")
.left_join(tbl_ref, Expr::col(("t", "column1")).equals(col_ref))
.and_where(Expr::col(col_ref).is_not_null())
.to_owned()
}
#[async_trait] #[async_trait]
pub trait InsertManyReturningExt<A>: Sized pub trait InsertManyReturningExt<A>: Sized
where where

View File

@ -1,160 +0,0 @@
use std::sync::Arc;
use sea_orm::{ActiveValue, FromJsonQueryResult, JsonValue, TryIntoModel, prelude::*};
use serde::{Deserialize, Serialize};
pub use crate::task::{SubscriberTaskType, SubscriberTaskTypeEnum};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
task::{SubscriberTask, SubscriberTaskPayload},
};
#[derive(Debug, Clone, Serialize, Deserialize, FromJsonQueryResult, PartialEq, Eq)]
pub struct SubscriberTaskErrorSnapshot {
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, DeriveEntityModel, PartialEq, Eq)]
#[sea_orm(table_name = "subscriber_tasks")]
pub struct Model {
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub created_at: DateTimeUtc,
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub updated_at: DateTimeUtc,
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub task_type: SubscriberTaskType,
pub request: JsonValue,
pub yields: Vec<JsonValue>,
pub result: Option<JsonValue>,
pub error: Option<JsonValue>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id"
)]
Subscriber,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn update_result<R>(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
result: R,
) -> RecorderResult<()>
where
R: Serialize,
{
let db = ctx.db();
let result_value = serde_json::to_value(result)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.set(ActiveModel {
result: ActiveValue::Set(Some(result_value)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
pub async fn update_error(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
error: SubscriberTaskErrorSnapshot,
) -> RecorderResult<()> {
let db = ctx.db();
let error_value = serde_json::to_value(&error)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.set(ActiveModel {
error: ActiveValue::Set(Some(error_value)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
pub async fn append_yield<Y>(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
item: Y,
) -> RecorderResult<()>
where
Y: Serialize,
{
let db = ctx.db();
let yield_value = serde_json::to_value(item)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.col_expr(
Column::Yields,
Expr::cust_with_values("array_append($1)", [yield_value]),
)
.exec(db)
.await?;
Ok(())
}
pub async fn add_subscriber_task(
ctx: Arc<dyn AppContextTrait>,
subscriber_id: i32,
payload: SubscriberTaskPayload,
) -> RecorderResult<SubscriberTask> {
let task_type = payload.task_type();
let request: JsonValue = payload.clone().try_into()?;
let am = ActiveModel {
subscriber_id: ActiveValue::Set(subscriber_id),
task_type: ActiveValue::Set(task_type.clone()),
request: ActiveValue::Set(request.clone()),
..Default::default()
};
let db = ctx.db();
let task_id = Entity::insert(am).exec(db).await?.last_insert_id;
let subscriber_task = SubscriberTask {
id: task_id,
subscriber_id,
payload,
};
ctx.task()
.add_subscriber_task(subscriber_task.clone())
.await?;
Ok(subscriber_task)
}
}

View File

@ -39,8 +39,6 @@ pub enum Relation {
Episode, Episode,
#[sea_orm(has_many = "super::auth::Entity")] #[sea_orm(has_many = "super::auth::Entity")]
Auth, Auth,
#[sea_orm(has_many = "super::subscriber_tasks::Entity")]
SubscriberTasks,
} }
impl Related<super::subscriptions::Entity> for Entity { impl Related<super::subscriptions::Entity> for Entity {
@ -73,12 +71,6 @@ impl Related<super::auth::Entity> for Entity {
} }
} }
impl Related<super::subscriber_tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::SubscriberTasks.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity { pub enum RelatedEntity {
#[sea_orm(entity = "super::subscriptions::Entity")] #[sea_orm(entity = "super::subscriptions::Entity")]
@ -89,8 +81,6 @@ pub enum RelatedEntity {
Bangumi, Bangumi,
#[sea_orm(entity = "super::episodes::Entity")] #[sea_orm(entity = "super::episodes::Entity")]
Episode, Episode,
#[sea_orm(entity = "super::subscriber_tasks::Entity")]
SubscriberTasks,
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]

View File

@ -57,21 +57,6 @@ pub enum RelatedEntity {
#[async_trait] #[async_trait]
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn from_subscription_and_episode(
subscriber_id: i32,
subscription_id: i32,
episode_id: i32,
) -> Self {
Self {
subscriber_id: ActiveValue::Set(subscriber_id),
subscription_id: ActiveValue::Set(subscription_id),
episode_id: ActiveValue::Set(episode_id),
..Default::default()
}
}
}
impl Model { impl Model {
pub async fn add_episodes_for_subscription( pub async fn add_episodes_for_subscription(
ctx: &dyn AppContextTrait, ctx: &dyn AppContextTrait,

View File

@ -1,26 +1,15 @@
use std::{collections::HashSet, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use itertools::Itertools; use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue, entity::prelude::*};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::{bangumi, episodes, query::filter_values_in};
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::{RecorderError, RecorderResult}, errors::{RecorderError, RecorderResult},
extract::{ extract::mikan::{
mikan::{ MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription,
MikanBangumiPosterMeta, MikanBangumiSubscription, MikanSeasonSubscription,
MikanSubscriberSubscription, build_mikan_bangumi_homepage_url,
build_mikan_bangumi_subscription_rss_url,
scrape_mikan_bangumi_meta_from_bangumi_homepage_url,
scrape_mikan_episode_meta_from_episode_homepage_url,
scrape_mikan_poster_meta_from_image_url,
}, },
rawname::extract_season_from_title_body,
},
models::episodes::MikanEpsiodeCreation,
}; };
#[derive( #[derive(
@ -43,45 +32,6 @@ pub enum SubscriptionCategory {
Manual, Manual,
} }
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "category")]
pub enum SubscriptionPayload {
#[serde(rename = "mikan_subscriber")]
MikanSubscriber(MikanSubscriberSubscription),
#[serde(rename = "mikan_season")]
MikanSeason(MikanSeasonSubscription),
#[serde(rename = "mikan_bangumi")]
MikanBangumi(MikanBangumiSubscription),
#[serde(rename = "manual")]
Manual,
}
impl SubscriptionPayload {
pub fn category(&self) -> SubscriptionCategory {
match self {
Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber,
Self::MikanSeason(_) => SubscriptionCategory::MikanSeason,
Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi,
Self::Manual => SubscriptionCategory::Manual,
}
}
pub fn try_from_model(model: &Model) -> RecorderResult<Self> {
Ok(match model.category {
SubscriptionCategory::MikanSubscriber => {
Self::MikanSubscriber(MikanSubscriberSubscription::try_from_model(model)?)
}
SubscriptionCategory::MikanSeason => {
Self::MikanSeason(MikanSeasonSubscription::try_from_model(model)?)
}
SubscriptionCategory::MikanBangumi => {
Self::MikanBangumi(MikanBangumiSubscription::try_from_model(model)?)
}
SubscriptionCategory::Manual => Self::Manual,
})
}
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscriptions")] #[sea_orm(table_name = "subscriptions")]
pub struct Model { pub struct Model {
@ -95,6 +45,7 @@ pub struct Model {
pub subscriber_id: i32, pub subscriber_id: i32,
pub category: SubscriptionCategory, pub category: SubscriptionCategory,
pub source_url: String, pub source_url: String,
pub source_urls: Option<Vec<String>>,
pub enabled: bool, pub enabled: bool,
pub credential_id: Option<i32>, pub credential_id: Option<i32>,
} }
@ -199,11 +150,6 @@ impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {} impl ActiveModel {}
impl Model { impl Model {
pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RecorderResult<Option<Self>> {
let db = ctx.db();
Ok(Entity::find_by_id(id).one(db).await?)
}
pub async fn toggle_with_ids( pub async fn toggle_with_ids(
ctx: &dyn AppContextTrait, ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = i32>, ids: impl Iterator<Item = i32>,
@ -230,127 +176,112 @@ impl Model {
Ok(()) Ok(())
} }
pub async fn pull_subscription(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { pub async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match payload { let subscription = self.try_into()?;
SubscriptionPayload::MikanSubscriber(payload) => { match subscription {
let mikan_client = ctx.mikan(); Subscription::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await,
let channel = Subscription::MikanSeason(subscription) => subscription.sync_feeds(ctx).await,
extract_mikan_rss_channel_from_rss_link(mikan_client, &self.source_url).await?; Subscription::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await,
Subscription::Manual => Ok(()),
let items = channel.into_items();
let db = ctx.db();
let items = items.into_iter().collect_vec();
let mut stmt = filter_values_in(
episodes::Entity,
episodes::Column::MikanEpisodeId,
items
.iter()
.map(|s| Value::from(s.mikan_episode_id.clone())),
);
stmt.and_where(Expr::col(episodes::Column::SubscriberId).eq(self.subscriber_id));
let builder = &db.get_database_backend();
let old_rss_item_mikan_episode_ids_set = db
.query_all(builder.build(&stmt))
.await?
.into_iter()
.flat_map(|qs| qs.try_get_by_index(0))
.collect::<HashSet<String>>();
let new_rss_items = items
.into_iter()
.filter(|item| {
!old_rss_item_mikan_episode_ids_set.contains(&item.mikan_episode_id)
})
.collect_vec();
let mut new_metas = vec![];
for new_rss_item in new_rss_items.iter() {
new_metas.push(
scrape_mikan_episode_meta_from_episode_homepage_url(
mikan_client,
new_rss_item.homepage.clone(),
)
.await?,
);
}
let new_mikan_bangumi_groups = new_metas
.into_iter()
.into_group_map_by(|s| (s.mikan_bangumi_id.clone(), s.mikan_fansub_id.clone()));
for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups
{
let mikan_base_url = ctx.mikan().base_url();
let bgm_homepage = build_mikan_bangumi_homepage_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
Some(&mikan_fansub_id),
);
let bgm_rss_link = build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
Some(&mikan_fansub_id),
)?;
let bgm = Arc::new(
bangumi::Model::get_or_insert_from_mikan(
ctx,
self.subscriber_id,
self.id,
mikan_bangumi_id.to_string(),
mikan_fansub_id.to_string(),
async |am| -> RecorderResult<()> {
let bgm_meta = scrape_mikan_bangumi_meta_from_bangumi_homepage_url(
mikan_client,
bgm_homepage.clone(),
)
.await?;
let bgm_name = bgm_meta.bangumi_title;
let (_, bgm_season_raw, bgm_season) =
extract_season_from_title_body(&bgm_name);
am.raw_name = ActiveValue::Set(bgm_name.clone());
am.display_name = ActiveValue::Set(bgm_name);
am.season = ActiveValue::Set(bgm_season);
am.season_raw = ActiveValue::Set(bgm_season_raw);
am.rss_link = ActiveValue::Set(Some(bgm_rss_link.to_string()));
am.homepage = ActiveValue::Set(Some(bgm_homepage.to_string()));
am.fansub = ActiveValue::Set(Some(bgm_meta.fansub));
if let Some(origin_poster_src) = bgm_meta.origin_poster_src
&& let MikanBangumiPosterMeta {
poster_src: Some(poster_src),
..
} = scrape_mikan_poster_meta_from_image_url(
mikan_client,
ctx.storage(),
origin_poster_src,
self.subscriber_id,
)
.await?
{
am.poster_link = ActiveValue::Set(Some(poster_src))
}
Ok(())
},
)
.await?,
);
episodes::Model::add_episodes(
ctx,
self.subscriber_id,
self.id,
new_ep_metas.into_iter().map(|item| MikanEpsiodeCreation {
episode: item,
bangumi: bgm.clone(),
}),
)
.await?;
}
Ok(())
}
_ => todo!(),
} }
} }
} }
#[async_trait]
pub trait SubscriptionTrait: Sized + Debug {
fn get_subscriber_id(&self) -> i32;
fn get_subscription_id(&self) -> i32;
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
fn try_from_model(model: &Model) -> RecorderResult<Self>;
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "category")]
pub enum Subscription {
#[serde(rename = "mikan_subscriber")]
MikanSubscriber(MikanSubscriberSubscription),
#[serde(rename = "mikan_season")]
MikanSeason(MikanSeasonSubscription),
#[serde(rename = "mikan_bangumi")]
MikanBangumi(MikanBangumiSubscription),
#[serde(rename = "manual")]
Manual,
}
impl Subscription {
pub fn category(&self) -> SubscriptionCategory {
match self {
Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber,
Self::MikanSeason(_) => SubscriptionCategory::MikanSeason,
Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi,
Self::Manual => SubscriptionCategory::Manual,
}
}
}
#[async_trait]
impl SubscriptionTrait for Subscription {
fn get_subscriber_id(&self) -> i32 {
match self {
Self::MikanSubscriber(subscription) => subscription.get_subscriber_id(),
Self::MikanSeason(subscription) => subscription.get_subscriber_id(),
Self::MikanBangumi(subscription) => subscription.get_subscriber_id(),
Self::Manual => unreachable!(),
}
}
fn get_subscription_id(&self) -> i32 {
match self {
Self::MikanSubscriber(subscription) => subscription.get_subscription_id(),
Self::MikanSeason(subscription) => subscription.get_subscription_id(),
Self::MikanBangumi(subscription) => subscription.get_subscription_id(),
Self::Manual => unreachable!(),
}
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await,
Self::MikanSeason(subscription) => subscription.sync_feeds(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await,
Self::Manual => Ok(()),
}
}
async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::MikanSubscriber(subscription) => subscription.sync_sources(ctx).await,
Self::MikanSeason(subscription) => subscription.sync_sources(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_sources(ctx).await,
Self::Manual => Ok(()),
}
}
fn try_from_model(model: &Model) -> RecorderResult<Self> {
match model.category {
SubscriptionCategory::MikanSubscriber => {
MikanSubscriberSubscription::try_from_model(model).map(Self::MikanSubscriber)
}
SubscriptionCategory::MikanSeason => {
MikanSeasonSubscription::try_from_model(model).map(Self::MikanSeason)
}
SubscriptionCategory::MikanBangumi => {
MikanBangumiSubscription::try_from_model(model).map(Self::MikanBangumi)
}
SubscriptionCategory::Manual => Ok(Self::Manual),
}
}
}
impl TryFrom<&Model> for Subscription {
type Error = RecorderError;
fn try_from(model: &Model) -> Result<Self, Self::Error> {
Self::try_from_model(model)
}
}

View File

@ -1,41 +1,18 @@
use std::sync::Arc; use std::sync::Arc;
use futures::{Stream, TryStreamExt, pin_mut}; use futures::Stream;
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use crate::{ use crate::{app::AppContextTrait, errors::RecorderResult};
app::AppContextTrait,
errors::RecorderResult,
models::subscriber_tasks::{self, SubscriberTaskErrorSnapshot},
};
pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task"; pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task";
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait SubscriberAsyncTaskTrait: Serialize + DeserializeOwned + Sized { pub trait SubscriberAsyncTaskTrait: Serialize + DeserializeOwned + Sized {
type Result: Serialize + DeserializeOwned + Send; async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn run_async( async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self, self.run_async(ctx).await?;
ctx: Arc<dyn AppContextTrait>,
id: i32,
) -> RecorderResult<Self::Result>;
async fn run(self, ctx: Arc<dyn AppContextTrait>, id: i32) -> RecorderResult<()> {
match self.run_async(ctx.clone(), id).await {
Ok(result) => {
subscriber_tasks::Model::update_result(ctx, id, result).await?;
}
Err(e) => {
let error_snapshot = SubscriberTaskErrorSnapshot {
message: e.to_string(),
};
subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?;
return Err(e);
}
}
Ok(()) Ok(())
} }
@ -48,35 +25,9 @@ pub trait SubscriberStreamTaskTrait: Serialize + DeserializeOwned + Sized {
fn run_stream( fn run_stream(
self, self,
ctx: Arc<dyn AppContextTrait>, ctx: Arc<dyn AppContextTrait>,
id: i32,
) -> impl Stream<Item = RecorderResult<Self::Yield>> + Send; ) -> impl Stream<Item = RecorderResult<Self::Yield>> + Send;
async fn run(self, ctx: Arc<dyn AppContextTrait>, id: i32) -> RecorderResult<()> { async fn run(self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let stream = self.run_stream(ctx.clone(), id); unimplemented!()
pin_mut!(stream);
loop {
match stream.try_next().await {
Ok(Some(result)) => {
subscriber_tasks::Model::append_yield(ctx.clone(), id, result).await?;
}
Ok(None) => {
subscriber_tasks::Model::update_result(ctx, id, ()).await?;
break;
}
Err(e) => {
let error_snapshot = SubscriberTaskErrorSnapshot {
message: e.to_string(),
};
subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?;
return Err(e);
}
}
}
Ok(())
} }
} }

View File

@ -1,3 +0,0 @@
mod scrape_season_subscription;
pub use scrape_season_subscription::MikanScrapeSeasonSubscriptionTask;

View File

@ -1,39 +0,0 @@
use std::sync::Arc;
use futures::Stream;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
extract::mikan::{MikanBangumiMeta, MikanSeasonStr, MikanSeasonSubscription},
task::SubscriberStreamTaskTrait,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
pub struct MikanScrapeSeasonSubscriptionTask {
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
}
#[async_trait::async_trait]
impl SubscriberStreamTaskTrait for MikanScrapeSeasonSubscriptionTask {
type Yield = MikanBangumiMeta;
fn run_stream(
self,
ctx: Arc<dyn AppContextTrait>,
id: i32,
) -> impl Stream<Item = RecorderResult<Self::Yield>> {
let task = Arc::new(MikanSeasonSubscription {
id,
year: self.year,
season_str: self.season_str,
credential_id: self.credential_id,
});
task.pull_bangumi_meta_stream(ctx)
}
}

View File

@ -1,6 +1,5 @@
mod config; mod config;
mod core; mod core;
pub mod mikan;
mod registry; mod registry;
mod service; mod service;
@ -8,6 +7,7 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber
pub use config::TaskConfig; pub use config::TaskConfig;
pub use registry::{ pub use registry::{
SubscriberTask, SubscriberTaskPayload, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsTask,
SyncOneSubscriptionSourcesTask,
}; };
pub use service::TaskService; pub use service::TaskService;

View File

@ -1,61 +0,0 @@
use sea_orm::{DeriveActiveEnum, DeriveDisplay, prelude::*};
use serde::{Deserialize, Serialize};
use super::mikan::MikanScrapeSeasonSubscriptionTask;
use crate::errors::RecorderError;
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(
rs_type = "String",
db_type = "String(StringLen::None)",
enum_name = "subscriber_task_type"
)]
#[serde(rename_all = "snake_case")]
pub enum SubscriberTaskType {
#[sea_orm(string_value = "mikan_scrape_season_subscription")]
MikanScrapeSeasonSubscription,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "task_type")]
pub enum SubscriberTaskPayload {
#[serde(rename = "mikan_scrape_season_subscription")]
MikanScrapeSeasonSubscription(MikanScrapeSeasonSubscriptionTask),
}
impl SubscriberTaskPayload {
pub fn task_type(&self) -> SubscriberTaskType {
match self {
Self::MikanScrapeSeasonSubscription(_) => {
SubscriberTaskType::MikanScrapeSeasonSubscription
}
}
}
}
impl TryFrom<SubscriberTaskPayload> for serde_json::Value {
type Error = RecorderError;
fn try_from(value: SubscriberTaskPayload) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(&value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("payload must be an json object");
}
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriberTask {
pub id: i32,
pub subscriber_id: i32,
#[serde(flatten)]
pub payload: SubscriberTaskPayload,
}

View File

@ -0,0 +1,53 @@
mod subscription;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
pub use subscription::{SyncOneSubscriptionFeedsTask, SyncOneSubscriptionSourcesTask};
use super::SubscriberAsyncTaskTrait;
use crate::{
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "task_type")]
pub enum SubscriberTaskPayload {
#[serde(rename = "sync_one_subscription_feeds")]
SyncOneSubscriptionFeeds(SyncOneSubscriptionFeedsTask),
#[serde(rename = "sync_one_subscription_sources")]
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
}
impl SubscriberTaskPayload {
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::SyncOneSubscriptionFeeds(task) => task.run(ctx).await,
Self::SyncOneSubscriptionSources(task) => task.run(ctx).await,
}
}
}
impl TryFrom<&SubscriberTaskPayload> for serde_json::Value {
type Error = RecorderError;
fn try_from(value: &SubscriberTaskPayload) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task payload must be an json object");
}
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriberTask {
pub subscriber_id: i32,
#[serde(flatten)]
pub payload: SubscriberTaskPayload,
}

View File

@ -0,0 +1,45 @@
use std::sync::Arc;
use sea_orm::prelude::*;
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
models::subscriptions::{self, SubscriptionTrait},
task::SubscriberAsyncTaskTrait,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsTask(pub subscriptions::Subscription);
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}
#[async_trait::async_trait]
impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_feeds(ctx).await?;
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionSourcesTask(pub subscriptions::Subscription);
#[async_trait::async_trait]
impl SubscriberAsyncTaskTrait for SyncOneSubscriptionSourcesTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_sources(ctx).await?;
Ok(())
}
}
impl From<subscriptions::Subscription> for SyncOneSubscriptionSourcesTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}

View File

@ -7,10 +7,7 @@ use tokio::sync::RwLock;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::RecorderResult, errors::RecorderResult,
task::{ task::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberTask, SubscriberTaskPayload, TaskConfig},
SUBSCRIBER_TASK_APALIS_NAME, SubscriberStreamTaskTrait, SubscriberTask,
SubscriberTaskPayload, TaskConfig,
},
}; };
pub struct TaskService { pub struct TaskService {
@ -40,20 +37,25 @@ impl TaskService {
) -> RecorderResult<()> { ) -> RecorderResult<()> {
let ctx = data.deref().clone(); let ctx = data.deref().clone();
match job.payload { job.payload.run(ctx).await
SubscriberTaskPayload::MikanScrapeSeasonSubscription(task) => {
task.run(ctx, job.id).await
}
}
} }
pub async fn add_subscriber_task(&self, job: SubscriberTask) -> RecorderResult<()> { pub async fn add_subscriber_task(
{ &self,
subscriber_id: i32,
task_payload: SubscriberTaskPayload,
) -> RecorderResult<TaskId> {
let subscriber_task = SubscriberTask {
subscriber_id,
payload: task_payload,
};
let task_id = {
let mut storage = self.subscriber_task_storage.write().await; let mut storage = self.subscriber_task_storage.write().await;
storage.push(job).await?; storage.push(subscriber_task).await?.task_id
} };
Ok(()) Ok(task_id)
} }
pub async fn setup(&self) -> RecorderResult<()> { pub async fn setup(&self) -> RecorderResult<()> {

View File

@ -92,7 +92,6 @@ query GetSubscriptionDetail ($id: Int!) {
rssLink rssLink
posterLink posterLink
savePath savePath
deleted
homepage homepage
} }
} }

View File

@ -1,4 +1,4 @@
[toolchain] [toolchain]
channel = "nightly-2025-05-14" channel = "nightly-2025-05-20"
components = ["rustfmt", "clippy"] components = ["rustfmt", "clippy"]
profile = "default" profile = "default"