refactor: continue

This commit is contained in:
2025-05-12 08:11:11 +08:00
parent ed2c1038e6
commit 760cb2344e
8 changed files with 351 additions and 236 deletions

View File

@@ -2,15 +2,22 @@ use std::sync::Arc;
use async_graphql::SimpleObject;
use async_trait::async_trait;
use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::OnConflict};
use sea_orm::{
ActiveValue, FromJsonQueryResult, FromQueryResult, IntoSimpleExpr, JoinType, QuerySelect,
entity::prelude::*,
sea_query::{IntoCondition, OnConflict},
};
use serde::{Deserialize, Serialize};
use super::subscription_bangumi;
use crate::{
app::AppContextTrait,
errors::RecorderResult,
errors::{RecorderError, RecorderResult},
extract::{
mikan::{MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url},
mikan::{
MikanBangumiHash, MikanBangumiMeta, build_mikan_bangumi_subscription_rss_url,
scrape_mikan_poster_meta_from_image_url,
},
rawname::parse_episode_meta_from_raw_name,
},
};
@@ -120,6 +127,59 @@ pub enum RelatedEntity {
SubscriptionBangumi,
}
impl ActiveModel {
#[tracing::instrument(err, skip_all, fields(mikan_bangumi_id = %meta.mikan_bangumi_id, mikan_fansub_id = %meta.mikan_fansub_id, subscriber_id = %subscriber_id))]
pub async fn from_mikan_bangumi_meta(
ctx: &dyn AppContextTrait,
meta: MikanBangumiMeta,
subscriber_id: i32,
_subscription_id: i32,
) -> RecorderResult<Self> {
let mikan_client = ctx.mikan();
let storage_service = ctx.storage();
let mikan_base_url = mikan_client.base_url();
let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?;
let rss_url = build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(),
&meta.mikan_bangumi_id,
Some(&meta.mikan_fansub_id),
);
let poster_link = if let Some(origin_poster_src) = meta.origin_poster_src {
let poster_meta = scrape_mikan_poster_meta_from_image_url(
mikan_client,
storage_service,
origin_poster_src,
subscriber_id,
)
.await?;
poster_meta.poster_src
} else {
None
};
Ok(Self {
mikan_bangumi_id: ActiveValue::Set(Some(meta.mikan_bangumi_id)),
mikan_fansub_id: ActiveValue::Set(Some(meta.mikan_fansub_id)),
subscriber_id: ActiveValue::Set(subscriber_id),
display_name: ActiveValue::Set(meta.bangumi_title.clone()),
raw_name: ActiveValue::Set(meta.bangumi_title),
season: ActiveValue::Set(raw_meta.season),
season_raw: ActiveValue::Set(raw_meta.season_raw),
fansub: ActiveValue::Set(Some(meta.fansub)),
poster_link: ActiveValue::Set(poster_link),
homepage: ActiveValue::Set(Some(meta.homepage.to_string())),
rss_link: ActiveValue::Set(Some(rss_url.to_string())),
..Default::default()
})
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn get_or_insert_from_mikan<F>(
ctx: &dyn AppContextTrait,
@@ -181,40 +241,44 @@ impl Model {
Ok(bgm)
}
}
}
impl ActiveModel {
pub fn from_mikan_bangumi_meta(
ctx: Arc<dyn AppContextTrait>,
meta: MikanBangumiMeta,
pub async fn get_existed_mikan_bangumi_list(
ctx: &dyn AppContextTrait,
hashes: impl Iterator<Item = MikanBangumiHash>,
subscriber_id: i32,
) -> RecorderResult<Self> {
let mikan_base_url = ctx.mikan().base_url();
let raw_meta = parse_episode_meta_from_raw_name(&meta.bangumi_title)?;
let rss_url = build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(),
&meta.mikan_bangumi_id,
Some(&meta.mikan_fansub_id),
);
Ok(Self {
mikan_bangumi_id: ActiveValue::Set(Some(meta.mikan_bangumi_id)),
mikan_fansub_id: ActiveValue::Set(Some(meta.mikan_fansub_id)),
subscriber_id: ActiveValue::Set(subscriber_id),
display_name: ActiveValue::Set(meta.bangumi_title.clone()),
raw_name: ActiveValue::Set(meta.bangumi_title),
season: ActiveValue::Set(raw_meta.season),
season_raw: ActiveValue::Set(raw_meta.season_raw),
fansub: ActiveValue::Set(Some(meta.fansub)),
poster_link: ActiveValue::Set(meta.origin_poster_src.map(|url| url.to_string())),
homepage: ActiveValue::Set(Some(meta.homepage.to_string())),
rss_link: ActiveValue::Set(Some(rss_url.to_string())),
..Default::default()
})
_subscription_id: i32,
) -> RecorderResult<impl Iterator<Item = (i32, MikanBangumiHash)>> {
Ok(Entity::find()
.select_only()
.column(Column::Id)
.column(Column::MikanBangumiId)
.column(Column::MikanFansubId)
.filter(
Expr::tuple([
Column::MikanBangumiId.into_simple_expr(),
Column::MikanFansubId.into_simple_expr(),
Column::SubscriberId.into_simple_expr(),
])
.in_tuples(hashes.map(|hash| {
(
hash.mikan_bangumi_id.clone(),
hash.mikan_fansub_id.clone(),
subscriber_id,
)
})),
)
.into_tuple::<(i32, String, String)>()
.all(ctx.db())
.await?
.into_iter()
.map(|(bangumi_id, mikan_bangumi_id, mikan_fansub_id)| {
(
bangumi_id,
MikanBangumiHash {
mikan_bangumi_id,
mikan_fansub_id,
},
)
}))
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -1,15 +1,19 @@
use std::sync::Arc;
use async_trait::async_trait;
use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::OnConflict};
use sea_orm::{
ActiveValue, ColumnTrait, FromJsonQueryResult, IntoSimpleExpr, JoinType, QuerySelect,
entity::prelude::*,
sea_query::{Alias, IntoCondition, OnConflict},
};
use serde::{Deserialize, Serialize};
use super::{bangumi, query::InsertManyReturningExt, subscription_episode};
use super::{bangumi, query::InsertManyReturningExt, subscription_bangumi, subscription_episode};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
errors::{RecorderError, RecorderResult},
extract::{
mikan::{MikanEpisodeMeta, build_mikan_episode_homepage_url},
mikan::{MikanEpisodeHash, MikanEpisodeMeta, build_mikan_episode_homepage_url},
rawname::parse_episode_meta_from_raw_name,
},
};
@@ -134,59 +138,6 @@ pub struct MikanEpsiodeCreation {
pub bangumi: Arc<bangumi::Model>,
}
impl Model {
pub async fn add_episodes(
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
creations: impl IntoIterator<Item = MikanEpsiodeCreation>,
) -> RecorderResult<()> {
let db = ctx.db();
let new_episode_active_modes = creations
.into_iter()
.map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr))
.inspect(|result| {
if let Err(e) = result {
tracing::warn!("Failed to create episode: {:?}", e);
}
})
.flatten();
let inserted_episodes = Entity::insert_many(new_episode_active_modes)
.on_conflict(
OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId])
.do_nothing()
.to_owned(),
)
.exec_with_returning_columns(db, [Column::Id])
.await?
.into_iter()
.flat_map(|r| r.try_get_many_by_index::<i32>());
let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| {
subscription_episode::ActiveModel::from_subscription_and_episode(
subscriber_id,
subscription_id,
episode_id,
)
});
subscription_episode::Entity::insert_many(insert_subscription_episode_links)
.on_conflict(
OnConflict::columns([
subscription_episode::Column::SubscriptionId,
subscription_episode::Column::EpisodeId,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?;
Ok(())
}
}
impl ActiveModel {
pub fn from_mikan_episode_meta(
ctx: &dyn AppContextTrait,
@@ -239,3 +190,92 @@ impl ActiveModel {
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn get_existed_mikan_episode_list(
ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = MikanEpisodeHash>,
subscriber_id: i32,
_subscription_id: i32,
) -> RecorderResult<impl Iterator<Item = (i32, MikanEpisodeHash)>> {
let db = ctx.db();
Ok(Entity::find()
.select_only()
.column(Column::Id)
.column(Column::MikanEpisodeId)
.filter(
Expr::tuple([
Column::MikanEpisodeId.into_simple_expr(),
Column::SubscriberId.into_simple_expr(),
])
.in_tuples(
ids.into_iter()
.map(|id| (id.mikan_episode_token, subscriber_id)),
),
)
.into_tuple::<(i32, String)>()
.all(db)
.await?
.into_iter()
.map(|(id, mikan_episode_id)| {
(
id,
MikanEpisodeHash {
mikan_episode_token: mikan_episode_id,
},
)
}))
}
pub async fn add_episodes(
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
creations: impl IntoIterator<Item = MikanEpsiodeCreation>,
) -> RecorderResult<()> {
let db = ctx.db();
let new_episode_active_modes = creations
.into_iter()
.map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr))
.inspect(|result| {
if let Err(e) = result {
tracing::warn!("Failed to create episode: {:?}", e);
}
})
.flatten();
let inserted_episodes = Entity::insert_many(new_episode_active_modes)
.on_conflict(
OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId])
.do_nothing()
.to_owned(),
)
.exec_with_returning_columns(db, [Column::Id])
.await?
.into_iter()
.flat_map(|r| r.try_get_many_by_index::<i32>());
let insert_subscription_episode_links = inserted_episodes.into_iter().map(|episode_id| {
subscription_episode::ActiveModel::from_subscription_and_episode(
subscriber_id,
subscription_id,
episode_id,
)
});
subscription_episode::Entity::insert_many(insert_subscription_episode_links)
.on_conflict(
OnConflict::columns([
subscription_episode::Column::SubscriptionId,
subscription_episode::Column::EpisodeId,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?;
Ok(())
}
}

View File

@@ -2,6 +2,8 @@ use async_trait::async_trait;
use sea_orm::{ActiveValue, entity::prelude::*};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RecorderResult};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription_bangumi")]
pub struct Model {
@@ -69,3 +71,25 @@ impl ActiveModel {
}
}
}
impl Model {
pub async fn add_bangumis_for_subscription(
ctx: &dyn AppContextTrait,
bangumi_ids: impl Iterator<Item = i32>,
subscriber_id: i32,
subscription_id: i32,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::insert_many(bangumi_ids.map(|bangumi_id| ActiveModel {
bangumi_id: ActiveValue::Set(bangumi_id),
subscriber_id: ActiveValue::Set(subscriber_id),
subscription_id: ActiveValue::Set(subscription_id),
..Default::default()
}))
.on_conflict_do_nothing()
.exec(db)
.await?;
Ok(())
}
}

View File

@@ -2,6 +2,8 @@ use async_trait::async_trait;
use sea_orm::{ActiveValue, entity::prelude::*};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RecorderResult};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription_episode")]
pub struct Model {
@@ -69,3 +71,25 @@ impl ActiveModel {
}
}
}
impl Model {
pub async fn add_episodes_for_subscription(
ctx: &dyn AppContextTrait,
episode_ids: impl Iterator<Item = i32>,
subscriber_id: i32,
subscription_id: i32,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::insert_many(episode_ids.map(|episode_id| ActiveModel {
episode_id: ActiveValue::Set(episode_id),
subscription_id: ActiveValue::Set(subscription_id),
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default()
}))
.on_conflict_do_nothing()
.exec(db)
.await?;
Ok(())
}
}