fix: fix subscription and mikan doppel

This commit is contained in:
2025-05-11 03:41:02 +08:00
parent 8144986a48
commit 0df371adb7
61 changed files with 10241 additions and 47 deletions

View File

@@ -540,7 +540,7 @@ mod tests {
MikanSubscriberSubscriptionRssUrlMeta,
},
models::{
bangumi,
bangumi, episodes,
subscriptions::{self, SubscriptionTrait},
},
test_utils::{
@@ -655,9 +655,7 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_mikan_subscriber_subscription_sync_feeds_incremental(
before_each: (),
) -> RecorderResult<()> {
async fn test_mikan_subscriber_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> {
let TestingResources {
app_ctx,
mut mikan_server,
@@ -675,7 +673,7 @@ mod tests {
category: ActiveValue::Set(subscriptions::SubscriptionCategory::MikanSubscriber),
source_url: ActiveValue::Set(
MikanSubscriberSubscriptionRssUrlMeta {
mikan_subscription_token: "123".into(),
mikan_subscription_token: "test".into(),
}
.build_rss_url(mikan_server.base_url().clone())
.to_string(),
@@ -686,11 +684,38 @@ mod tests {
let subscription_model = subscription_am.insert(app_ctx.db()).await?;
let subscription_task = subscriptions::Subscription::try_from_model(&subscription_model)?;
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
subscription_task
.sync_feeds_incremental(app_ctx.clone())
.await?;
let (incremental_bangumi_list, incremental_episode_list) = {
subscription.sync_feeds_incremental(app_ctx.clone()).await?;
let bangumi_list = bangumi::Entity::find().all(app_ctx.db()).await?;
assert!(!bangumi_list.is_empty());
let episode_list = episodes::Entity::find().all(app_ctx.db()).await?;
assert!(!episode_list.is_empty());
(bangumi_list, episode_list)
};
let (full_bangumi_list, full_episode_list) = {
subscription.sync_feeds_full(app_ctx.clone()).await?;
let bangumi_list = bangumi::Entity::find().all(app_ctx.db()).await?;
assert!(!bangumi_list.is_empty());
let episode_list = episodes::Entity::find().all(app_ctx.db()).await?;
assert!(!episode_list.is_empty());
(bangumi_list, episode_list)
};
assert_eq!(incremental_bangumi_list.len(), full_bangumi_list.len());
assert!(incremental_episode_list.len() < full_episode_list.len());
Ok(())
}
@@ -727,11 +752,21 @@ mod tests {
let subscription_model = subscription_am.insert(app_ctx.db()).await?;
let subscription_task = subscriptions::Subscription::try_from_model(&subscription_model)?;
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
subscription_task
.sync_feeds_incremental(app_ctx.clone())
.await?;
{
subscription.sync_feeds_incremental(app_ctx.clone()).await?;
let bangumi_list = bangumi::Entity::find().all(app_ctx.db()).await?;
assert!(!bangumi_list.is_empty());
};
{
subscription.sync_feeds_full(app_ctx.clone()).await?;
let bangumi_list = bangumi::Entity::find().all(app_ctx.db()).await?;
assert!(!bangumi_list.is_empty());
}
Ok(())
}

View File

@@ -35,14 +35,14 @@ AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
))
.await?;
// db.execute_unprepared(&format!(
// r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
// ON apalis.jobs (((job -> 'subscriber_id')::integer))
// WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
// AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
// AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
// ))
// .await?;
db.execute_unprepared(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
ON apalis.jobs (((job -> 'subscriber_id')::integer))
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
))
.await?;
Ok(())
}

View File

@@ -248,8 +248,15 @@ impl Model {
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default()
})
.on_conflict_do_nothing()
.exec(db)
.on_conflict(
OnConflict::columns([
subscription_bangumi::Column::SubscriptionId,
subscription_bangumi::Column::BangumiId,
])
.do_nothing()
.to_owned(),
)
.exec_without_returning(db)
.await?;
}
Ok(new_bangumi_model)

View File

@@ -224,6 +224,10 @@ impl Model {
})
.collect::<Result<_, _>>()?;
if new_episode_active_modes.is_empty() {
return Ok(());
}
let new_episode_ids = Entity::insert_many(new_episode_active_modes)
.on_conflict(
OnConflict::columns([Column::MikanEpisodeId, Column::SubscriberId])

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use sea_orm::{ActiveValue, entity::prelude::*};
use sea_orm::{ActiveValue, entity::prelude::*, sea_query::OnConflict};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RecorderResult};
@@ -96,15 +96,29 @@ impl Model {
subscription_id: i32,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::insert_many(bangumi_ids.map(|bangumi_id| ActiveModel {
bangumi_id: ActiveValue::Set(bangumi_id),
subscriber_id: ActiveValue::Set(subscriber_id),
subscription_id: ActiveValue::Set(subscription_id),
..Default::default()
}))
.on_conflict_do_nothing()
.exec(db)
.await?;
let active_models = bangumi_ids
.map(|bangumi_id| {
ActiveModel::from_subscription_and_bangumi(
subscriber_id,
subscription_id,
bangumi_id,
)
})
.collect::<Vec<_>>();
if active_models.is_empty() {
return Ok(());
}
Entity::insert_many(active_models)
.on_conflict(
OnConflict::columns([Column::SubscriptionId, Column::BangumiId])
.do_nothing()
.to_owned(),
)
.exec_without_returning(db)
.await?;
Ok(())
}

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use sea_orm::{ActiveValue, entity::prelude::*};
use sea_orm::{ActiveValue, entity::prelude::*, sea_query::OnConflict};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RecorderResult};
@@ -81,15 +81,28 @@ impl Model {
subscription_id: i32,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::insert_many(episode_ids.map(|episode_id| ActiveModel {
episode_id: ActiveValue::Set(episode_id),
subscription_id: ActiveValue::Set(subscription_id),
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default()
}))
.on_conflict_do_nothing()
.exec(db)
.await?;
let active_models = episode_ids
.map(|episode_id| ActiveModel {
episode_id: ActiveValue::Set(episode_id),
subscription_id: ActiveValue::Set(subscription_id),
subscriber_id: ActiveValue::Set(subscriber_id),
..Default::default()
})
.collect::<Vec<_>>();
if active_models.is_empty() {
return Ok(());
}
Entity::insert_many(active_models)
.on_conflict(
OnConflict::columns([Column::SubscriptionId, Column::EpisodeId])
.do_nothing()
.to_owned(),
)
.exec_without_returning(db)
.await?;
Ok(())
}

View File

@@ -18,7 +18,8 @@ use crate::{
MIKAN_ACCOUNT_MANAGE_PAGE_PATH, MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH,
MIKAN_BANGUMI_HOMEPAGE_PATH, MIKAN_BANGUMI_POSTER_PATH, MIKAN_BANGUMI_RSS_PATH,
MIKAN_EPISODE_HOMEPAGE_PATH, MIKAN_EPISODE_TORRENT_PATH, MIKAN_LOGIN_PAGE_PATH,
MIKAN_SEASON_FLOW_PAGE_PATH, MikanClient, MikanConfig, MikanCredentialForm,
MIKAN_SEASON_FLOW_PAGE_PATH, MIKAN_SUBSCRIBER_SUBSCRIPTION_RSS_PATH, MikanClient,
MikanConfig, MikanCredentialForm,
},
};
@@ -382,6 +383,7 @@ impl MikanMockServer {
if !path.starts_with(MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH)
&& !path.starts_with(MIKAN_SEASON_FLOW_PAGE_PATH)
&& (path.starts_with(MIKAN_BANGUMI_RSS_PATH)
|| path.starts_with(MIKAN_SUBSCRIBER_SUBSCRIPTION_RSS_PATH)
|| path.starts_with(MIKAN_BANGUMI_HOMEPAGE_PATH)
|| path.starts_with(MIKAN_EPISODE_HOMEPAGE_PATH)
|| path.starts_with(MIKAN_BANGUMI_POSTER_PATH)
@@ -420,6 +422,7 @@ impl MikanMockServer {
if !path.starts_with(MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH)
&& !path.starts_with(MIKAN_SEASON_FLOW_PAGE_PATH)
&& (path.starts_with(MIKAN_BANGUMI_RSS_PATH)
|| path.starts_with(MIKAN_SUBSCRIBER_SUBSCRIPTION_RSS_PATH)
|| path.starts_with(MIKAN_BANGUMI_HOMEPAGE_PATH)
|| path.starts_with(MIKAN_EPISODE_HOMEPAGE_PATH)
|| path.starts_with(MIKAN_BANGUMI_POSTER_PATH)

View File

@@ -8,9 +8,8 @@ pub fn try_init_testing_tracing(level: Level) {
let level = level.as_str().to_lowercase();
let mut filter = EnvFilter::new(format!("{crate_name}[]={level}"));
let mut modules = vec![];
let mut modules = vec!["mockito"];
modules.extend(MODULE_WHITELIST.iter());
modules.push("mockito");
for module in modules {
filter = filter.add_directive(format!("{module}[]={level}").parse().unwrap());
}