From b2f327d48fc95c8f54e9ac504f80458af2198fb4 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Tue, 20 May 2025 01:23:13 +0800 Subject: [PATCH] feat: refactor tasks --- .vscode/settings.json | 13 +++- apps/recorder/Cargo.toml | 5 +- apps/recorder/examples/playground.rs | 77 +++++++------------ apps/recorder/src/database/service.rs | 9 ++- apps/recorder/src/errors/app_error.rs | 2 +- apps/recorder/src/extract/mikan/client.rs | 6 +- apps/recorder/src/extract/mikan/web.rs | 6 +- .../recorder/src/graphql/infra/filter/json.rs | 74 ++++++++++++++---- apps/recorder/src/graphql/infra/filter/mod.rs | 69 +++-------------- .../src/graphql/infra/filter/subscriber.rs | 39 ++++++++++ apps/recorder/src/graphql/schema_root.rs | 26 ++++++- apps/recorder/src/lib.rs | 3 +- .../src/migrations/m20220101_000001_init.rs | 22 ++++++ .../m20250520_021135_subscriber_tasks.rs | 64 +++++++++++++++ apps/recorder/src/migrations/mod.rs | 2 + apps/recorder/src/models/subscriber_tasks.rs | 35 ++++++++- .../src/models/subscription_bangumi.rs | 16 ++++ .../src/models/subscription_episode.rs | 16 ++++ apps/recorder/src/task/service.rs | 8 +- apps/recorder/src/test_utils/app.rs | 20 +++-- apps/recorder/src/test_utils/database.rs | 16 +++- apps/recorder/src/test_utils/mod.rs | 1 + apps/recorder/src/test_utils/task.rs | 15 ++++ 23 files changed, 393 insertions(+), 151 deletions(-) create mode 100644 apps/recorder/src/graphql/infra/filter/subscriber.rs create mode 100644 apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs create mode 100644 apps/recorder/src/test_utils/task.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 1637c31..fcbf6ee 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -29,7 +29,8 @@ "prettier.enable": false, "typescript.tsdk": "node_modules/typescript/lib", "rust-analyzer.cargo.features": [ - "testcontainers" + "testcontainers", + "playground" ], "sqltools.connections": [ { @@ -40,6 +41,16 @@ "name": "konobangu-dev", "database": "konobangu", "username": "konobangu" + }, + { + "previewLimit": 50, + "server": "localhost", + "port": 32770, + "askForPassword": true, + "driver": "PostgreSQL", + "name": "docker-pgsql", + "database": "konobangu", + "username": "konobangu" } ] } diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 231e732..bad20e1 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -15,6 +15,7 @@ required-features = [] [features] default = [] +playground = ["dep:mockito"] testcontainers = [ "dep:testcontainers", "dep:testcontainers-modules", @@ -110,17 +111,17 @@ apalis = { version = "0.7", features = ["limit", "tracing", "catch-panic"] } apalis-sql = { version = "0.7", features = ["postgres"] } cocoon = { version = "0.4.3", features = ["getrandom", "thiserror"] } rand = "0.9.1" +rust_decimal = "1.37.1" reqwest_cookie_store = "0.8.0" +mockito = { version = "1.6.1", optional = true } downloader = { workspace = true } util = { workspace = true } fetch = { workspace = true } nanoid = "0.4.0" -rust_decimal = "1.37.1" [dev-dependencies] serial_test = "3" insta = { version = "1", features = ["redactions", "yaml", "filters"] } -mockito = "1.6.1" rstest = "0.25" ctor = "0.4.0" diff --git a/apps/recorder/examples/playground.rs b/apps/recorder/examples/playground.rs index 5fb4445..a9ce3fa 100644 --- a/apps/recorder/examples/playground.rs +++ b/apps/recorder/examples/playground.rs @@ -1,56 +1,33 @@ -use recorder::errors::RecorderResult; -// #![allow(unused_imports)] -// use recorder::{ -// app::{AppContext, AppContextTrait}, -// errors::RecorderResult, -// migrations::Migrator, -// models::{ -// subscribers::SEED_SUBSCRIBER, -// subscriptions::{self, SubscriptionCreateFromRssDto}, -// }, -// }; -// use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; -// use sea_orm_migration::MigratorTrait; +#![feature(duration_constructors_lite)] +use std::{sync::Arc, time::Duration}; -// async fn pull_mikan_bangumi_rss(ctx: &dyn AppContextTrait) -> RecorderResult<()> { -// let rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=3416&subgroupid=370"; - -// // let rss_link = -// // "https://mikanani.me/RSS/MyBangumi?token=FE9tccsML2nBPUUqpCuJW2uJZydAXCntHJ7RpD9LDP8%3d"; -// let subscription = if let Some(subscription) = -// subscriptions::Entity::find() -// .filter(subscriptions::Column::SourceUrl.eq(String::from(rss_link))) -// .one(ctx.db()) -// .await? -// { -// subscription -// } else { -// subscriptions::Model::add_subscription( -// ctx, -// -// subscriptions::SubscriptionCreateDto::Mikan(SubscriptionCreateFromRssDto { -// rss_link: rss_link.to_string(), -// display_name: String::from("Mikan Project - 我的番组"), -// enabled: Some(true), -// }), -// 1, -// ) -// .await? -// }; - -// subscription.pull_subscription(ctx).await?; - -// Ok(()) -// } - -// #[tokio::main] -// async fn main() -> RecorderResult<()> { -// pull_mikan_bangumi_rss(&ctx).await?; - -// Ok(()) -// } +use apalis_sql::postgres::PostgresStorage; +use recorder::{ + app::AppContextTrait, + errors::RecorderResult, + test_utils::{ + app::TestingAppContext, + database::{TestingDatabaseServiceConfig, build_testing_database_service}, + }, +}; #[tokio::main] async fn main() -> RecorderResult<()> { + let app_ctx = { + let db_service = build_testing_database_service(TestingDatabaseServiceConfig { + auto_migrate: false, + }) + .await?; + Arc::new(TestingAppContext::builder().db(db_service).build()) + }; + + let db = app_ctx.db(); + + PostgresStorage::setup(db.get_postgres_connection_pool()).await?; + + dbg!(db.get_postgres_connection_pool().connect_options()); + + tokio::time::sleep(Duration::from_hours(1)).await; + Ok(()) } diff --git a/apps/recorder/src/database/service.rs b/apps/recorder/src/database/service.rs index 519449e..dcf71f4 100644 --- a/apps/recorder/src/database/service.rs +++ b/apps/recorder/src/database/service.rs @@ -16,7 +16,7 @@ pub trait DatabaseServiceConnectionTrait { pub struct DatabaseService { connection: DatabaseConnection, - #[cfg(all(test, feature = "testcontainers"))] + #[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))] pub container: Option>, } @@ -54,7 +54,7 @@ impl DatabaseService { let me = Self { connection: db, - #[cfg(all(test, feature = "testcontainers"))] + #[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))] container: None, }; @@ -66,18 +66,19 @@ impl DatabaseService { } pub async fn migrate_up(&self) -> RecorderResult<()> { - Migrator::up(&self.connection, None).await?; { let pool = &self.get_postgres_connection_pool(); PostgresStorage::setup(pool).await?; } + Migrator::up(&self.connection, None).await?; Ok(()) } pub async fn migrate_down(&self) -> RecorderResult<()> { Migrator::down(&self.connection, None).await?; { - let _pool = &self.get_postgres_connection_pool(); + self.execute_unprepared(r#"DROP SCHEMA IF EXISTS apalis CASCADE"#) + .await?; } Ok(()) } diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs index 117b2bc..487f978 100644 --- a/apps/recorder/src/errors/app_error.rs +++ b/apps/recorder/src/errors/app_error.rs @@ -76,7 +76,7 @@ pub enum RecorderError { }, #[snafu(transparent)] HttpClientError { source: HttpClientError }, - #[cfg(all(feature = "testcontainers", test))] + #[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))] #[snafu(transparent)] TestcontainersError { source: testcontainers::TestcontainersError, diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index 4ca6d01..cf5b766 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -253,7 +253,7 @@ mod tests { use super::*; use crate::test_utils::{ - app::UnitTestAppContext, + app::TestingAppContext, crypto::build_testing_crypto_service, database::build_testing_database_service, mikan::{MikanMockServer, build_testing_mikan_client, build_testing_mikan_credential_form}, @@ -264,9 +264,9 @@ mod tests { mikan_base_url: Url, ) -> RecorderResult> { let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; - let db_service = build_testing_database_service().await?; + let db_service = build_testing_database_service(Default::default()).await?; let crypto_service = build_testing_crypto_service().await?; - let ctx = UnitTestAppContext::builder() + let ctx = TestingAppContext::builder() .db(db_service) .crypto(crypto_service) .mikan(mikan_client) diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index 99e77ac..4990442 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -967,7 +967,7 @@ mod test { use crate::{ extract::mikan::{MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_SEASON_FLOW_PAGE_PATH}, test_utils::{ - app::UnitTestAppContext, + app::TestingAppContext, crypto::build_testing_crypto_service, database::build_testing_database_service, mikan::{ @@ -1195,9 +1195,9 @@ mod test { let app_ctx = { let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; - let db_service = build_testing_database_service().await?; + let db_service = build_testing_database_service(Default::default()).await?; let crypto_service = build_testing_crypto_service().await?; - let app_ctx = UnitTestAppContext::builder() + let app_ctx = TestingAppContext::builder() .mikan(mikan_client) .db(db_service) .crypto(crypto_service) diff --git a/apps/recorder/src/graphql/infra/filter/json.rs b/apps/recorder/src/graphql/infra/filter/json.rs index 2200a6f..b27cf79 100644 --- a/apps/recorder/src/graphql/infra/filter/json.rs +++ b/apps/recorder/src/graphql/infra/filter/json.rs @@ -1,12 +1,17 @@ -use async_graphql::dynamic::SchemaError; +use async_graphql::{ + Error as GraphqlError, InputValueResult, Scalar, ScalarType, dynamic::SchemaError, to_value, +}; use itertools::Itertools; +use once_cell::sync::OnceCell; use rust_decimal::{Decimal, prelude::FromPrimitive}; use sea_orm::{ - Condition, + Condition, EntityTrait, sea_query::{ArrayType, Expr, ExprTrait, IntoLikeExpr, SimpleExpr, Value as DbValue}, }; +use seaography::{BuilderContext, FilterInfo, SeaographyError}; use serde_json::Value as JsonValue; +use super::subscriber::FnFilterCondition; use crate::errors::RecorderResult; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)] @@ -317,7 +322,7 @@ fn json_path_type_assert_expr( typestr: &str, ) -> SimpleExpr { Expr::cust_with_exprs( - format!("jsonb_path_exists($1, $2 || ' ? (@.type = \"{typestr}\")')"), + format!("jsonb_path_exists($1, $2 || ' ? (@.type() = \"{typestr}\")')"), [col_expr.into(), json_path_expr(path)], ) } @@ -767,8 +772,7 @@ where .map(|(i, v)| (JsonIndex::Num(i as u64), v)) .collect(), _ => Err(SchemaError(format!( - "Json filter input node must be an object or array, but got {}", - node.to_string() + "Json filter input node must be an object or array, but got {node}" )))?, }; let mut conditions = Condition::all(); @@ -866,6 +870,46 @@ where Ok(condition) } +#[derive(Clone, Debug)] +pub struct JsonFilterInput(pub serde_json::Value); + +#[Scalar(name = "JsonFilterInput")] +impl ScalarType for JsonFilterInput { + fn parse(value: async_graphql::Value) -> InputValueResult { + Ok(JsonFilterInput(value.into_json()?)) + } + + fn to_value(&self) -> async_graphql::Value { + async_graphql::Value::from_json(self.0.clone()).unwrap() + } +} + +pub static JSONB_FILTER_INFO: OnceCell = OnceCell::new(); + +pub fn jsonb_filter_condition_function( + _context: &BuilderContext, + column: &T::Column, +) -> FnFilterCondition +where + T: EntityTrait, + ::Model: Sync, +{ + let column = *column; + Box::new(move |mut condition, filter| { + let filter_value = to_value(filter.as_index_map()) + .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?; + + let filter = JsonFilterInput::parse(filter_value) + .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}"))))?; + + let cond_where = prepare_json_filter_input(&Expr::col(column), filter.0) + .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?; + + condition = condition.add(cond_where); + Ok(condition) + }) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -965,7 +1009,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE jsonb_path_exists(\"test_table\".\"job\", \ - $1 || ' ? (@.type = \"string\")')" + $1 || ' ? (@.type() = \"string\")')" ); } @@ -981,7 +1025,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE (CASE WHEN \ - ((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"array\")')) \ + ((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"array\")')) \ AND (jsonb_path_query_first(\"test_table\".\"job\", $2) @> $3)) THEN true ELSE \ false END) = (true)" ); @@ -1000,9 +1044,9 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE (CASE WHEN \ - ((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"array\")')) \ + ((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"array\")')) \ AND (jsonb_path_query_first(\"test_table\".\"job\", $2) @> $3)) THEN true WHEN \ - ((jsonb_path_exists(\"test_table\".\"job\", $4 || ' ? (@.type = \"string\")')) \ + ((jsonb_path_exists(\"test_table\".\"job\", $4 || ' ? (@.type() = \"string\")')) \ AND CAST((jsonb_path_query_first(\"test_table\".\"job\", $5)) AS text) LIKE $6) \ THEN true ELSE false END) = (true)" ); @@ -1028,7 +1072,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE \ - (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"number\")')) \ + (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"number\")')) \ AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS numeric) \ BETWEEN $3 AND $4)" ); @@ -1048,7 +1092,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE \ - (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"string\")')) \ + (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"string\")')) \ AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) BETWEEN \ $3 AND $4)" ); @@ -1068,7 +1112,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE \ - (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"boolean\")')) \ + (jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"boolean\")')) \ AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS boolean) \ BETWEEN $3 AND $4)" ); @@ -1090,7 +1134,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \ - $1 || ' ? (@.type = \"string\")')) AND \ + $1 || ' ? (@.type() = \"string\")')) AND \ CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) LIKE $3" ); assert_eq!(params.len(), 3); @@ -1109,7 +1153,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \ - $1 || ' ? (@.type = \"string\")')) AND \ + $1 || ' ? (@.type() = \"string\")')) AND \ (starts_with(CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text), $3))" ); assert_eq!(params.len(), 3); @@ -1128,7 +1172,7 @@ mod tests { assert_eq!( sql, "SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \ - $1 || ' ? (@.type = \"string\")')) AND \ + $1 || ' ? (@.type() = \"string\")')) AND \ CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) LIKE $3" ); assert_eq!(params.len(), 3); diff --git a/apps/recorder/src/graphql/infra/filter/mod.rs b/apps/recorder/src/graphql/infra/filter/mod.rs index ec9debd..6d20bc8 100644 --- a/apps/recorder/src/graphql/infra/filter/mod.rs +++ b/apps/recorder/src/graphql/infra/filter/mod.rs @@ -1,18 +1,13 @@ mod json; +mod subscriber; -use async_graphql::{ - InputValueResult, Scalar, ScalarType, - dynamic::{ObjectAccessor, TypeRef}, -}; -pub use json::prepare_json_filter_input; +use std::borrow::Cow; + +use async_graphql::dynamic::TypeRef; +pub use json::{JSONB_FILTER_INFO, jsonb_filter_condition_function}; use maplit::btreeset; -use once_cell::sync::OnceCell; -use sea_orm::{ColumnTrait, Condition, EntityTrait}; -use seaography::{ - BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult, -}; - -pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell = OnceCell::new(); +use seaography::{FilterInfo, FilterOperation as SeaographqlFilterOperation}; +pub use subscriber::{SUBSCRIBER_ID_FILTER_INFO, subscriber_id_condition_function}; pub fn init_custom_filter_info() { SUBSCRIBER_ID_FILTER_INFO.get_or_init(|| FilterInfo { @@ -20,49 +15,9 @@ pub fn init_custom_filter_info() { base_type: TypeRef::INT.into(), supported_operations: btreeset! { SeaographqlFilterOperation::Equals }, }); -} - -pub type FnFilterCondition = - Box SeaResult + Send + Sync>; - -pub fn subscriber_id_condition_function( - _context: &BuilderContext, - column: &T::Column, -) -> FnFilterCondition -where - T: EntityTrait, - ::Model: Sync, -{ - let column = *column; - Box::new(move |mut condition, filter| { - let subscriber_id_filter_info = SUBSCRIBER_ID_FILTER_INFO.get().unwrap(); - let operations = &subscriber_id_filter_info.supported_operations; - for operation in operations { - match operation { - SeaographqlFilterOperation::Equals => { - if let Some(value) = filter.get("eq") { - let value: i32 = value.i64()?.try_into()?; - let value = sea_orm::Value::Int(Some(value)); - condition = condition.add(column.eq(value)); - } - } - _ => unreachable!("unreachable filter operation for subscriber_id"), - } - } - Ok(condition) - }) -} - -#[derive(Clone, Debug)] -pub struct JsonFilterInput(pub serde_json::Value); - -#[Scalar(name = "JsonFilterInput")] -impl ScalarType for JsonFilterInput { - fn parse(value: async_graphql::Value) -> InputValueResult { - Ok(JsonFilterInput(value.into_json()?)) - } - - fn to_value(&self) -> async_graphql::Value { - async_graphql::Value::from_json(self.0.clone()).unwrap() - } + JSONB_FILTER_INFO.get_or_init(|| FilterInfo { + type_name: String::from("JsonbFilterInput"), + base_type: TypeRef::Named(Cow::Borrowed("serde_json::Value")).to_string(), + supported_operations: btreeset! { SeaographqlFilterOperation::Equals }, + }); } diff --git a/apps/recorder/src/graphql/infra/filter/subscriber.rs b/apps/recorder/src/graphql/infra/filter/subscriber.rs new file mode 100644 index 0000000..b8e8a4b --- /dev/null +++ b/apps/recorder/src/graphql/infra/filter/subscriber.rs @@ -0,0 +1,39 @@ +use async_graphql::dynamic::ObjectAccessor; +use once_cell::sync::OnceCell; +use sea_orm::{ColumnTrait, Condition, EntityTrait}; +use seaography::{ + BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult, +}; + +pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell = OnceCell::new(); + +pub type FnFilterCondition = + Box SeaResult + Send + Sync>; + +pub fn subscriber_id_condition_function( + _context: &BuilderContext, + column: &T::Column, +) -> FnFilterCondition +where + T: EntityTrait, + ::Model: Sync, +{ + let column = *column; + Box::new(move |mut condition, filter| { + let subscriber_id_filter_info = SUBSCRIBER_ID_FILTER_INFO.get().unwrap(); + let operations = &subscriber_id_filter_info.supported_operations; + for operation in operations { + match operation { + SeaographqlFilterOperation::Equals => { + if let Some(value) = filter.get("eq") { + let value: i32 = value.i64()?.try_into()?; + let value = sea_orm::Value::Int(Some(value)); + condition = condition.add(column.eq(value)); + } + } + _ => unreachable!("unreachable filter operation for subscriber_id"), + } + } + Ok(condition) + }) +} diff --git a/apps/recorder/src/graphql/schema_root.rs b/apps/recorder/src/graphql/schema_root.rs index 86e3283..bd38c8c 100644 --- a/apps/recorder/src/graphql/schema_root.rs +++ b/apps/recorder/src/graphql/schema_root.rs @@ -5,7 +5,8 @@ use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper}; use crate::graphql::infra::{ filter::{ - SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info, subscriber_id_condition_function, + JSONB_FILTER_INFO, SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info, + subscriber_id_condition_function, }, guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id}, transformer::{filter_condition_transformer, mutation_input_object_transformer}, @@ -26,6 +27,20 @@ fn restrict_filter_input_for_entity( context.filter_types.overwrites.insert(key, filter_type); } +fn restrict_jsonb_filter_input_for_entity(context: &mut BuilderContext, column: &T::Column) +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_column_key = get_entity_column_key::(context, column); + context.filter_types.overwrites.insert( + entity_column_key.clone(), + Some(FilterType::Custom( + JSONB_FILTER_INFO.get().unwrap().type_name.clone(), + )), + ); +} + fn restrict_subscriber_for_entity(context: &mut BuilderContext, column: &T::Column) where T: EntityTrait, @@ -118,6 +133,14 @@ pub fn schema( &mut context, &subscription_episode::Column::SubscriberId, ); + restrict_subscriber_for_entity::( + &mut context, + &subscriber_tasks::Column::SubscriberId, + ); + restrict_jsonb_filter_input_for_entity::( + &mut context, + &subscriber_tasks::Column::Job, + ); for column in subscribers::Column::iter() { if !matches!(column, subscribers::Column::Id) { restrict_filter_input_for_entity::( @@ -159,6 +182,7 @@ pub fn schema( subscription_bangumi, subscription_episode, subscriptions, + subscriber_tasks, ] ); diff --git a/apps/recorder/src/lib.rs b/apps/recorder/src/lib.rs index 4f791dd..18d6ace 100644 --- a/apps/recorder/src/lib.rs +++ b/apps/recorder/src/lib.rs @@ -9,6 +9,7 @@ associated_type_defaults, let_chains )] +#![allow(clippy::enum_variant_names)] pub use downloader; pub mod app; @@ -25,6 +26,6 @@ pub mod migrations; pub mod models; pub mod storage; pub mod task; -#[cfg(test)] +#[cfg(any(test, feature = "playground"))] pub mod test_utils; pub mod web; diff --git a/apps/recorder/src/migrations/m20220101_000001_init.rs b/apps/recorder/src/migrations/m20220101_000001_init.rs index 4f77117..9aeb245 100644 --- a/apps/recorder/src/migrations/m20220101_000001_init.rs +++ b/apps/recorder/src/migrations/m20220101_000001_init.rs @@ -181,6 +181,17 @@ impl MigrationTrait for Migration { .on_update(ForeignKeyAction::Cascade) .on_delete(ForeignKeyAction::Cascade), ) + .foreign_key( + ForeignKey::create() + .name("fk_subscription_bangumi_subscriber_id") + .from( + SubscriptionBangumi::Table, + SubscriptionBangumi::SubscriberId, + ) + .to(Subscribers::Table, Subscribers::Id) + .on_update(ForeignKeyAction::Cascade) + .on_delete(ForeignKeyAction::Cascade), + ) .index( Index::create() .if_not_exists() @@ -299,6 +310,17 @@ impl MigrationTrait for Migration { .on_update(ForeignKeyAction::Cascade) .on_delete(ForeignKeyAction::Cascade), ) + .foreign_key( + ForeignKey::create() + .name("fk_subscription_episode_subscriber_id") + .from( + SubscriptionEpisode::Table, + SubscriptionEpisode::SubscriberId, + ) + .to(Subscribers::Table, Subscribers::Id) + .on_update(ForeignKeyAction::Cascade) + .on_delete(ForeignKeyAction::Cascade), + ) .index( Index::create() .if_not_exists() diff --git a/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs b/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs new file mode 100644 index 0000000..cb0fda6 --- /dev/null +++ b/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs @@ -0,0 +1,64 @@ +use async_trait::async_trait; +use sea_orm_migration::prelude::*; + +use crate::task::SUBSCRIBER_TASK_APALIS_NAME; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared(&format!( + r#"CREATE VIEW IF NOT EXISTS subscriber_task AS +SELECT + job, + task_type, + status, + (job->'subscriber_id')::integer AS subscriber_id, + (job->'task_type')::text AS task_type, + id, + attempts, + max_attempts, + run_at, + last_error, + lock_at, + lock_by, + done_at, + priority +FROM apalis.jobs +WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME} +AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")') +AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#, + )) + .await?; + + db.execute_unprepared(&format!( + r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id + ON apalis.jobs ((job -> 'subscriber_id')) + WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME} +AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")') +AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"# + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared( + r#"DROP INDEX IF EXISTS idx_apalis_jobs_subscriber_id + ON apalis.jobs"#, + ) + .await?; + + db.execute_unprepared("DROP VIEW IF EXISTS subscriber_task") + .await?; + + Ok(()) + } +} diff --git a/apps/recorder/src/migrations/mod.rs b/apps/recorder/src/migrations/mod.rs index 6d31a78..b2bfaae 100644 --- a/apps/recorder/src/migrations/mod.rs +++ b/apps/recorder/src/migrations/mod.rs @@ -7,6 +7,7 @@ pub mod m20220101_000001_init; pub mod m20240224_082543_add_downloads; pub mod m20241231_000001_auth; pub mod m20250501_021523_credential_3rd; +pub mod m20250520_021135_subscriber_tasks; pub struct Migrator; @@ -18,6 +19,7 @@ impl MigratorTrait for Migrator { Box::new(m20240224_082543_add_downloads::Migration), Box::new(m20241231_000001_auth::Migration), Box::new(m20250501_021523_credential_3rd::Migration), + Box::new(m20250520_021135_subscriber_tasks::Migration), ] } } diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks.rs index c2cae68..8b7de33 100644 --- a/apps/recorder/src/models/subscriber_tasks.rs +++ b/apps/recorder/src/models/subscriber_tasks.rs @@ -6,13 +6,42 @@ use crate::task::SubscriberTask; #[sea_orm(table_name = "subscriber_tasks")] pub struct Model { #[sea_orm(primary_key)] - pub id: i32, + pub id: String, pub subscriber_id: i32, pub job: SubscriberTask, - pub state: String, + pub status: String, + pub attempts: i32, + pub max_attempts: i32, + pub run_at: DateTimeUtc, + pub last_error: Option, + pub lock_at: Option, + pub lock_by: Option, + pub done_at: Option, + pub priority: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} +pub enum Relation { + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] +pub enum RelatedEntity { + #[sea_orm(entity = "super::subscribers::Entity")] + Subscriber, +} impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/subscription_bangumi.rs b/apps/recorder/src/models/subscription_bangumi.rs index f802790..17d0324 100644 --- a/apps/recorder/src/models/subscription_bangumi.rs +++ b/apps/recorder/src/models/subscription_bangumi.rs @@ -32,6 +32,14 @@ pub enum Relation { on_delete = "Cascade" )] Bangumi, + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, } impl Related for Entity { @@ -46,12 +54,20 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] pub enum RelatedEntity { #[sea_orm(entity = "super::subscriptions::Entity")] Subscription, #[sea_orm(entity = "super::bangumi::Entity")] Bangumi, + #[sea_orm(entity = "super::subscribers::Entity")] + Subscriber, } #[async_trait] diff --git a/apps/recorder/src/models/subscription_episode.rs b/apps/recorder/src/models/subscription_episode.rs index f32e2c1..945cee1 100644 --- a/apps/recorder/src/models/subscription_episode.rs +++ b/apps/recorder/src/models/subscription_episode.rs @@ -32,6 +32,14 @@ pub enum Relation { on_delete = "Cascade" )] Episode, + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, } impl Related for Entity { @@ -46,12 +54,20 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] pub enum RelatedEntity { #[sea_orm(entity = "super::subscriptions::Entity")] Subscription, #[sea_orm(entity = "super::episodes::Entity")] Episode, + #[sea_orm(entity = "super::subscribers::Entity")] + Subscriber, } #[async_trait] diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index 2ce00b7..9699f98 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -1,7 +1,7 @@ use std::{ops::Deref, sync::Arc}; use apalis::prelude::*; -use apalis_sql::postgres::PostgresStorage; +use apalis_sql::{Config, postgres::PostgresStorage}; use tokio::sync::RwLock; use crate::{ @@ -22,7 +22,11 @@ impl TaskService { ctx: Arc, ) -> RecorderResult { let pool = ctx.db().get_postgres_connection_pool().clone(); - let subscriber_task_storage = Arc::new(RwLock::new(PostgresStorage::new(pool))); + let storage_config = Config::new(SUBSCRIBER_TASK_APALIS_NAME); + let subscriber_task_storage = Arc::new(RwLock::new(PostgresStorage::new_with_config( + pool, + storage_config, + ))); Ok(Self { config, diff --git a/apps/recorder/src/test_utils/app.rs b/apps/recorder/src/test_utils/app.rs index 57390ef..fe9622f 100644 --- a/apps/recorder/src/test_utils/app.rs +++ b/apps/recorder/src/test_utils/app.rs @@ -1,12 +1,13 @@ -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; +use once_cell::sync::OnceCell; use typed_builder::TypedBuilder; use crate::app::AppContextTrait; #[derive(TypedBuilder)] #[builder(field_defaults(default, setter(strip_option)))] -pub struct UnitTestAppContext { +pub struct TestingAppContext { logger: Option, db: Option, config: Option, @@ -16,7 +17,8 @@ pub struct UnitTestAppContext { graphql: Option, storage: Option, crypto: Option, - task: Option, + #[builder(default = Arc::new(OnceCell::new()), setter(!strip_option))] + task: Arc>, message: Option, #[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))] working_dir: Option, @@ -24,13 +26,19 @@ pub struct UnitTestAppContext { environment: crate::app::Environment, } -impl Debug for UnitTestAppContext { +impl TestingAppContext { + pub fn set_task(&self, task: crate::task::TaskService) { + self.task.get_or_init(|| task); + } +} + +impl Debug for TestingAppContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "UnitTestAppContext") } } -impl AppContextTrait for UnitTestAppContext { +impl AppContextTrait for TestingAppContext { fn logger(&self) -> &crate::logger::LoggerService { self.logger.as_ref().expect("should set logger") } @@ -76,7 +84,7 @@ impl AppContextTrait for UnitTestAppContext { } fn task(&self) -> &crate::task::TaskService { - self.task.as_ref().expect("should set tasks") + self.task.get().expect("should set task") } fn message(&self) -> &crate::message::MessageService { diff --git a/apps/recorder/src/test_utils/database.rs b/apps/recorder/src/test_utils/database.rs index e683e26..b091085 100644 --- a/apps/recorder/src/test_utils/database.rs +++ b/apps/recorder/src/test_utils/database.rs @@ -3,8 +3,20 @@ use crate::{ errors::RecorderResult, }; +pub struct TestingDatabaseServiceConfig { + pub auto_migrate: bool, +} + +impl Default for TestingDatabaseServiceConfig { + fn default() -> Self { + Self { auto_migrate: true } + } +} + #[cfg(feature = "testcontainers")] -pub async fn build_testing_database_service() -> RecorderResult { +pub async fn build_testing_database_service( + config: TestingDatabaseServiceConfig, +) -> RecorderResult { use testcontainers::{ImageExt, runners::AsyncRunner}; use testcontainers_ext::{ImageDefaultLogConsumerExt, ImagePruneExistedLabelExt}; use testcontainers_modules::postgres::Postgres; @@ -34,7 +46,7 @@ pub async fn build_testing_database_service() -> RecorderResult connect_timeout: 5000, idle_timeout: 10000, acquire_timeout: None, - auto_migrate: true, + auto_migrate: config.auto_migrate, }) .await?; db_service.container = Some(container); diff --git a/apps/recorder/src/test_utils/mod.rs b/apps/recorder/src/test_utils/mod.rs index 2feadbe..6b72912 100644 --- a/apps/recorder/src/test_utils/mod.rs +++ b/apps/recorder/src/test_utils/mod.rs @@ -3,4 +3,5 @@ pub mod crypto; pub mod database; pub mod mikan; pub mod storage; +pub mod task; pub mod tracing; diff --git a/apps/recorder/src/test_utils/task.rs b/apps/recorder/src/test_utils/task.rs new file mode 100644 index 0000000..5b1e9a0 --- /dev/null +++ b/apps/recorder/src/test_utils/task.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + task::{TaskConfig, TaskService}, +}; + +pub async fn build_testing_task_service( + ctx: Arc, +) -> RecorderResult { + let config = TaskConfig {}; + let task_service = TaskService::from_config_and_ctx(config, ctx).await?; + Ok(task_service) +}