From 65505f91b2c7d081251ba87ba16e42a08d9c2086 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Fri, 27 Jun 2025 04:06:58 +0800 Subject: [PATCH] refactor: refactor graphql --- Cargo.lock | 32 + apps/recorder/Cargo.toml | 1 + apps/recorder/src/errors/app_error.rs | 17 +- apps/recorder/src/extract/mikan/client.rs | 10 +- .../src/graphql/domains/credential_3rd.rs | 115 ++- apps/recorder/src/graphql/domains/cron.rs | 127 +++ apps/recorder/src/graphql/domains/feeds.rs | 25 +- apps/recorder/src/graphql/domains/mod.rs | 1 + .../src/graphql/domains/subscriber_tasks.rs | 344 ++++++-- .../src/graphql/domains/subscribers.rs | 74 +- .../src/graphql/domains/subscriptions.rs | 117 +-- apps/recorder/src/graphql/infra/crypto.rs | 10 +- apps/recorder/src/graphql/infra/custom.rs | 755 +++++++++++++++++- apps/recorder/src/graphql/infra/json.rs | 40 +- apps/recorder/src/graphql/infra/mod.rs | 2 +- apps/recorder/src/graphql/infra/name.rs | 203 +++++ apps/recorder/src/graphql/infra/util.rs | 30 - apps/recorder/src/graphql/schema.rs | 11 +- apps/recorder/src/lib.rs | 1 - apps/recorder/src/migrations/defs.rs | 2 +- .../src/migrations/m20220101_000001_init.rs | 3 +- ...add_subscription_id_to_subscriber_tasks.rs | 6 +- .../migrations/m20250629_065628_add_cron.rs | 62 +- apps/recorder/src/models/auth.rs | 4 +- apps/recorder/src/models/cron/core.rs | 14 + apps/recorder/src/models/cron/mod.rs | 82 +- apps/recorder/src/models/feeds/mod.rs | 2 +- apps/recorder/src/models/feeds/registry.rs | 2 +- apps/recorder/src/models/subscribers.rs | 2 +- apps/recorder/src/models/subscriptions/mod.rs | 49 +- .../src/models/subscriptions/registry.rs | 240 +++--- apps/recorder/src/task/core.rs | 48 +- apps/recorder/src/task/mod.rs | 1 + apps/recorder/src/task/registry/mod.rs | 12 +- apps/recorder/src/task/registry/subscriber.rs | 100 --- .../src/task/registry/subscriber/base.rs | 29 + .../src/task/registry/subscriber/mod.rs | 140 ++++ .../task/registry/subscriber/subscription.rs | 67 ++ .../src/task/registry/subscription.rs | 62 -- apps/recorder/src/task/registry/system.rs | 43 - .../src/task/registry/{ => system}/media.rs | 0 apps/recorder/src/task/registry/system/mod.rs | 108 +++ apps/recorder/src/task/service.rs | 24 +- 43 files changed, 2199 insertions(+), 818 deletions(-) create mode 100644 apps/recorder/src/graphql/domains/cron.rs create mode 100644 apps/recorder/src/graphql/infra/name.rs delete mode 100644 apps/recorder/src/graphql/infra/util.rs delete mode 100644 apps/recorder/src/task/registry/subscriber.rs create mode 100644 apps/recorder/src/task/registry/subscriber/base.rs create mode 100644 apps/recorder/src/task/registry/subscriber/mod.rs create mode 100644 apps/recorder/src/task/registry/subscriber/subscription.rs delete mode 100644 apps/recorder/src/task/registry/subscription.rs delete mode 100644 apps/recorder/src/task/registry/system.rs rename apps/recorder/src/task/registry/{ => system}/media.rs (100%) create mode 100644 apps/recorder/src/task/registry/system/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6c8031e..b44ce59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6824,6 +6824,7 @@ dependencies = [ "tracing-appender", "tracing-subscriber", "tracing-tree", + "ts-rs", "typed-builder 0.21.0", "url", "util", @@ -8690,6 +8691,15 @@ dependencies = [ "unic-segment", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "testcontainers" version = "0.24.0" @@ -9214,6 +9224,28 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "ts-rs" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ef1b7a6d914a34127ed8e1fa927eb7088903787bcded4fa3eef8f85ee1568be" +dependencies = [ + "thiserror 2.0.12", + "ts-rs-macros", +] + +[[package]] +name = "ts-rs-macros" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d4ed7b4c18cc150a6a0a1e9ea1ecfa688791220781af6e119f9599a8502a0a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", + "termcolor", +] + [[package]] name = "tungstenite" version = "0.26.2" diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 2c30a70..913c345 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -165,6 +165,7 @@ quick-xml = { version = "0.37.5", features = [ "serde", ] } croner = "2.2.0" +ts-rs = "11.0.1" [dev-dependencies] inquire = { workspace = true } diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs index 271296f..c968917 100644 --- a/apps/recorder/src/errors/app_error.rs +++ b/apps/recorder/src/errors/app_error.rs @@ -18,6 +18,8 @@ use crate::{ #[derive(Snafu, Debug)] #[snafu(visibility(pub(crate)))] pub enum RecorderError { + #[snafu(transparent)] + SeaographyError { source: seaography::SeaographyError }, #[snafu(transparent)] CronError { source: croner::errors::CronError }, #[snafu(display( @@ -192,20 +194,17 @@ impl RecorderError { } } - pub fn from_model_not_found_detail>, T: ToString>( - model: C, - detail: T, - ) -> Self { + pub fn from_entity_not_found() -> Self { Self::ModelEntityNotFound { - entity: model.into(), - detail: Some(detail.to_string()), + entity: std::any::type_name::().into(), + detail: None, } } - pub fn from_model_not_found>>(model: C) -> Self { + pub fn from_entity_not_found_detail(detail: T) -> Self { Self::ModelEntityNotFound { - entity: model.into(), - detail: None, + entity: std::any::type_name::().into(), + detail: Some(detail.to_string()), } } } diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index 51e9234..4034b37 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -227,10 +227,12 @@ impl MikanClient { self.fork_with_userpass_credential(userpass_credential) .await } else { - Err(RecorderError::from_model_not_found_detail( - "credential", - format!("credential id {credential_id} not found"), - )) + Err(RecorderError::from_entity_not_found_detail::< + credential_3rd::Entity, + _, + >(format!( + "credential id {credential_id} not found" + ))) } } diff --git a/apps/recorder/src/graphql/domains/credential_3rd.rs b/apps/recorder/src/graphql/domains/credential_3rd.rs index 42ec864..745b732 100644 --- a/apps/recorder/src/graphql/domains/credential_3rd.rs +++ b/apps/recorder/src/graphql/domains/credential_3rd.rs @@ -1,50 +1,28 @@ use std::sync::Arc; -use async_graphql::dynamic::{ - Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef, -}; -use seaography::{Builder as SeaographyBuilder, BuilderContext}; +use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, TypeRef}; +use sea_orm::{EntityTrait, QueryFilter}; +use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions}; use serde::{Deserialize, Serialize}; use util_derive::DynamicGraphql; use crate::{ app::AppContextTrait, - auth::AuthUserInfo, errors::RecorderError, graphql::{ domains::subscribers::restrict_subscriber_for_entity, - infra::crypto::{ - register_crypto_column_input_conversion_to_schema_context, - register_crypto_column_output_conversion_to_schema_context, + infra::{ + crypto::{ + register_crypto_column_input_conversion_to_schema_context, + register_crypto_column_output_conversion_to_schema_context, + }, + custom::generate_entity_filtered_mutation_field, + name::get_entity_custom_mutation_field_name, }, }, models::credential_3rd, }; -#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)] -struct Credential3rdCheckAvailableInput { - pub id: i32, -} - -impl Credential3rdCheckAvailableInput { - fn input_type_name() -> &'static str { - "Credential3rdCheckAvailableInput" - } - - fn arg_name() -> &'static str { - "filter" - } - - fn generate_input_object() -> InputObject { - InputObject::new(Self::input_type_name()) - .description("The input of the credential3rdCheckAvailable query") - .field(InputValue::new( - Credential3rdCheckAvailableInputFieldEnum::Id.as_str(), - TypeRef::named_nn(TypeRef::INT), - )) - } -} - #[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)] pub struct Credential3rdCheckAvailableInfo { pub available: bool, @@ -119,50 +97,47 @@ pub fn register_credential3rd_to_schema_builder( builder.register_enumeration::(); seaography::register_entity!(builder, credential_3rd); - builder.schema = builder - .schema - .register(Credential3rdCheckAvailableInput::generate_input_object()); builder.schema = builder .schema .register(Credential3rdCheckAvailableInfo::generate_output_object()); - builder.queries.push( - Field::new( - "credential3rdCheckAvailable", - TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()), - move |ctx| { - FieldFuture::new(async move { - let auth_user_info = ctx.data::()?; - let input: Credential3rdCheckAvailableInput = ctx - .args - .get(Credential3rdCheckAvailableInput::arg_name()) - .unwrap() - .deserialize()?; - let app_ctx = ctx.data::>()?; + let builder_context = builder.context; + { + let check_available_mutation_name = get_entity_custom_mutation_field_name::< + credential_3rd::Entity, + >(builder_context, "CheckAvailable"); + let check_available_mutation = + generate_entity_filtered_mutation_field::( + builder_context, + check_available_mutation_name, + TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()), + Arc::new(|resolver_ctx, app_ctx, filters| { + let filters_condition = get_filter_conditions::( + resolver_ctx, + builder_context, + filters, + ); - let credential_model = credential_3rd::Model::find_by_id_and_subscriber_id( - app_ctx.as_ref(), - input.id, - auth_user_info.subscriber_auth.subscriber_id, - ) - .await? - .ok_or_else(|| RecorderError::Credential3rdError { - message: format!("credential = {} not found", input.id), - source: None.into(), - })?; + Box::pin(async move { + let db = app_ctx.db(); - let available = credential_model.check_available(app_ctx.as_ref()).await?; - Ok(Some(FieldValue::owned_any( - Credential3rdCheckAvailableInfo { available }, - ))) - }) - }, - ) - .argument(InputValue::new( - Credential3rdCheckAvailableInput::arg_name(), - TypeRef::named_nn(Credential3rdCheckAvailableInput::input_type_name()), - )), - ); + let credential_model = credential_3rd::Entity::find() + .filter(filters_condition) + .one(db) + .await? + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; + + let available = credential_model.check_available(app_ctx.as_ref()).await?; + Ok(Some(FieldValue::owned_any( + Credential3rdCheckAvailableInfo { available }, + ))) + }) + }), + ); + builder.mutations.push(check_available_mutation); + } builder } diff --git a/apps/recorder/src/graphql/domains/cron.rs b/apps/recorder/src/graphql/domains/cron.rs new file mode 100644 index 0000000..68f661b --- /dev/null +++ b/apps/recorder/src/graphql/domains/cron.rs @@ -0,0 +1,127 @@ +use convert_case::Case; +use sea_orm::Iterable; +use seaography::{Builder as SeaographyBuilder, BuilderContext}; + +use crate::{ + graphql::{ + domains::subscribers::restrict_subscriber_for_entity, + infra::{ + custom::{ + generate_entity_default_create_batch_mutation_field, + generate_entity_default_create_one_mutation_field, + generate_entity_default_delete_mutation_field, + generate_entity_default_insert_input_object, + generate_entity_default_update_input_object, + generate_entity_default_update_mutation_field, + }, + json::{ + convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity, + validate_jsonb_input_for_entity, + }, + name::get_entity_and_column_name, + }, + }, + models::{cron, subscriber_tasks}, +}; + +fn skip_columns_for_entity_input(context: &mut BuilderContext) { + for column in cron::Column::iter() { + if matches!( + column, + cron::Column::SubscriberTask + | cron::Column::Id + | cron::Column::CronExpr + | cron::Column::Enabled + | cron::Column::TimeoutMs + | cron::Column::MaxAttempts + ) { + continue; + } + let entity_column_key = get_entity_and_column_name::(context, &column); + context.entity_input.insert_skips.push(entity_column_key); + } + for column in cron::Column::iter() { + if matches!(column, |cron::Column::CronExpr| cron::Column::Enabled + | cron::Column::TimeoutMs + | cron::Column::Priority + | cron::Column::MaxAttempts) + { + continue; + } + let entity_column_key = get_entity_and_column_name::(context, &column); + context.entity_input.update_skips.push(entity_column_key); + } +} + +pub fn register_cron_to_schema_context(context: &mut BuilderContext) { + restrict_subscriber_for_entity::(context, &cron::Column::SubscriberId); + + restrict_jsonb_filter_input_for_entity::(context, &cron::Column::SubscriberTask); + convert_jsonb_output_case_for_entity::( + context, + &cron::Column::SubscriberTask, + Case::Camel, + ); + validate_jsonb_input_for_entity::>( + context, + &cron::Column::SubscriberTask, + ); + skip_columns_for_entity_input(context); +} + +pub fn register_cron_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder { + builder.register_entity::( + ::iter() + .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) + .collect(), + ); + builder = builder.register_entity_dataloader_one_to_one(cron::Entity, tokio::spawn); + builder = builder.register_entity_dataloader_one_to_many(cron::Entity, tokio::spawn); + + builder.register_enumeration::(); + + let builder_context = builder.context; + + { + builder + .inputs + .push(generate_entity_default_insert_input_object::( + builder_context, + )); + builder + .mutations + .push(generate_entity_default_create_one_mutation_field::< + cron::Entity, + _, + >(builder_context, true)); + builder + .mutations + .push(generate_entity_default_create_batch_mutation_field::< + cron::Entity, + _, + >(builder_context, true)); + } + { + builder + .inputs + .push(generate_entity_default_update_input_object::( + builder_context, + )); + builder + .mutations + .push(generate_entity_default_update_mutation_field::< + cron::Entity, + _, + >(builder_context, true)); + } + { + builder + .mutations + .push(generate_entity_default_delete_mutation_field::< + cron::Entity, + _, + >(builder_context, false)); + } + + builder +} diff --git a/apps/recorder/src/graphql/domains/feeds.rs b/apps/recorder/src/graphql/domains/feeds.rs index 095a795..fe1ea20 100644 --- a/apps/recorder/src/graphql/domains/feeds.rs +++ b/apps/recorder/src/graphql/domains/feeds.rs @@ -7,7 +7,10 @@ use seaography::{Builder as SeaographyBuilder, BuilderContext, SeaResult}; use crate::{ graphql::{ domains::subscribers::restrict_subscriber_for_entity, - infra::util::{get_entity_column_key, get_entity_key}, + infra::name::{ + get_entity_and_column_name, get_entity_create_batch_mutation_field_name, + get_entity_create_one_mutation_field_name, + }, }, models::feeds, }; @@ -15,22 +18,14 @@ use crate::{ pub fn register_feeds_to_schema_context(context: &mut BuilderContext) { restrict_subscriber_for_entity::(context, &feeds::Column::SubscriberId); { - let entity_column_key = - get_entity_column_key::(context, &feeds::Column::Token); - let entity_key = get_entity_key::(context); - let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); - let entity_create_one_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, context.entity_create_one_mutation.mutation_suffix - )); - let entity_create_batch_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, - context.entity_create_batch_mutation.mutation_suffix.clone() - )); + let entity_create_one_mutation_field_name = Arc::new( + get_entity_create_one_mutation_field_name::(context), + ); + let entity_create_batch_mutation_field_name = + Arc::new(get_entity_create_batch_mutation_field_name::(context)); context.types.input_none_conversions.insert( - entity_column_key, + get_entity_and_column_name::(context, &feeds::Column::Token), Box::new( move |context: &ResolverContext| -> SeaResult> { let field_name = context.field().name(); diff --git a/apps/recorder/src/graphql/domains/mod.rs b/apps/recorder/src/graphql/domains/mod.rs index eb07557..de8239b 100644 --- a/apps/recorder/src/graphql/domains/mod.rs +++ b/apps/recorder/src/graphql/domains/mod.rs @@ -10,3 +10,4 @@ pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; pub mod subscriptions; +pub mod cron; diff --git a/apps/recorder/src/graphql/domains/subscriber_tasks.rs b/apps/recorder/src/graphql/domains/subscriber_tasks.rs index 4887b91..b05a255 100644 --- a/apps/recorder/src/graphql/domains/subscriber_tasks.rs +++ b/apps/recorder/src/graphql/domains/subscriber_tasks.rs @@ -1,82 +1,253 @@ use std::{ops::Deref, sync::Arc}; -use async_graphql::dynamic::{FieldValue, TypeRef}; +use async_graphql::dynamic::{FieldValue, TypeRef, ValueAccessor}; +use convert_case::Case; use sea_orm::{ - ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr, - sea_query::Query, + ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter, QuerySelect, QueryTrait, + prelude::Expr, sea_query::Query, }; use seaography::{ - Builder as SeaographyBuilder, BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder, - EntityQueryFieldBuilder, get_filter_conditions, + Builder as SeaographyBuilder, BuilderContext, GuardAction, get_filter_conditions, }; use crate::{ + auth::AuthUserInfo, errors::RecorderError, graphql::{ domains::subscribers::restrict_subscriber_for_entity, infra::{ - custom::generate_entity_filter_mutation_field, - json::{convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity}, + custom::{ + generate_entity_create_one_mutation_field, + generate_entity_default_insert_input_object, + generate_entity_filtered_mutation_field, + }, + json::{ + convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity, + validate_jsonb_input_for_entity, + }, + name::{ + get_column_name, get_entity_and_column_name, get_entity_basic_type_name, + get_entity_create_batch_mutation_data_field_name, + get_entity_create_batch_mutation_field_name, + get_entity_create_one_mutation_data_field_name, + get_entity_create_one_mutation_field_name, get_entity_custom_mutation_field_name, + get_entity_delete_mutation_field_name, get_entity_update_mutation_field_name, + }, }, }, models::subscriber_tasks, task::{ApalisJobs, ApalisSchema}, }; -pub fn register_subscriber_tasks_entity_mutations( +pub fn check_entity_and_task_subscriber_id_matches( + value_accessor: &ValueAccessor<'_>, + subscriber_id: i32, + subscriber_id_column_name: &str, + subscriber_task_column_name: &str, +) -> bool { + value_accessor.object().is_ok_and(|input_object| { + input_object + .get(subscriber_task_column_name) + .and_then(|subscriber_task_value| subscriber_task_value.object().ok()) + .and_then(|subscriber_task_object| { + subscriber_task_object + .get("subscriber_id") + .and_then(|job_subscriber_id| job_subscriber_id.i64().ok()) + }) + .is_some_and(|subscriber_task_subscriber_id| { + subscriber_task_subscriber_id as i32 + == input_object + .get(subscriber_id_column_name) + .and_then(|subscriber_id_object| subscriber_id_object.i64().ok()) + .map(|subscriber_id| subscriber_id as i32) + .unwrap_or(subscriber_id) + }) + }) +} + +fn skip_columns_for_entity_input(context: &mut BuilderContext) { + for column in subscriber_tasks::Column::iter() { + if matches!( + column, + subscriber_tasks::Column::Job + | subscriber_tasks::Column::Id + | subscriber_tasks::Column::SubscriberId + | subscriber_tasks::Column::Priority + | subscriber_tasks::Column::MaxAttempts + ) { + continue; + } + let entity_column_key = + get_entity_and_column_name::(context, &column); + context.entity_input.insert_skips.push(entity_column_key); + } +} + +pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) { + restrict_subscriber_for_entity::( + context, + &subscriber_tasks::Column::SubscriberId, + ); + restrict_jsonb_filter_input_for_entity::( + context, + &subscriber_tasks::Column::Job, + ); + convert_jsonb_output_case_for_entity::( + context, + &subscriber_tasks::Column::Job, + Case::Camel, + ); + validate_jsonb_input_for_entity::( + context, + &subscriber_tasks::Column::Job, + ); + skip_columns_for_entity_input(context); + + context.guards.field_guards.insert( + get_entity_and_column_name::( + context, + &subscriber_tasks::Column::Job, + ), + { + let create_one_mutation_field_name = + Arc::new(get_entity_create_one_mutation_field_name::< + subscriber_tasks::Entity, + >(context)); + let create_one_mutation_data_field_name = + Arc::new(get_entity_create_one_mutation_data_field_name(context).to_string()); + let create_batch_mutation_field_name = + Arc::new(get_entity_create_batch_mutation_field_name::< + subscriber_tasks::Entity, + >(context)); + let create_batch_mutation_data_field_name = + Arc::new(get_entity_create_batch_mutation_data_field_name(context).to_string()); + let update_mutation_field_name = Arc::new(get_entity_update_mutation_field_name::< + subscriber_tasks::Entity, + >(context)); + let job_column_name = Arc::new(get_column_name::( + context, + &subscriber_tasks::Column::Job, + )); + let subscriber_id_column_name = Arc::new(get_column_name::( + context, + &subscriber_tasks::Column::SubscriberId, + )); + + Box::new(move |resolve_context| { + let field_name = resolve_context.field().name(); + let subscriber_id = resolve_context + .data_opt::() + .unwrap() + .subscriber_auth + .subscriber_id; + let matched_subscriber_id = match field_name { + field if field == create_one_mutation_field_name.as_str() => resolve_context + .args + .get(create_one_mutation_data_field_name.as_str()) + .is_some_and(|value_accessor| { + check_entity_and_task_subscriber_id_matches( + &value_accessor, + subscriber_id, + subscriber_id_column_name.as_str(), + job_column_name.as_str(), + ) + }), + field if field == create_batch_mutation_field_name.as_str() => resolve_context + .args + .get(create_batch_mutation_data_field_name.as_str()) + .and_then(|value| value.list().ok()) + .is_some_and(|list| { + list.iter().all(|value| { + check_entity_and_task_subscriber_id_matches( + &value, + subscriber_id, + subscriber_id_column_name.as_str(), + job_column_name.as_str(), + ) + }) + }), + field if field == update_mutation_field_name.as_str() => { + unreachable!("subscriberTask entity do not support update job") + } + _ => true, + }; + if matched_subscriber_id { + GuardAction::Allow + } else { + GuardAction::Block(Some( + "subscriber_id mismatch between entity and job".to_string(), + )) + } + }) + }, + ); +} + +pub fn register_subscriber_tasks_to_schema_builder( mut builder: SeaographyBuilder, ) -> SeaographyBuilder { + builder.register_entity::( + ::iter() + .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) + .collect(), + ); + builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn); + builder = + builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn); + builder.register_enumeration::(); + builder.register_enumeration::(); + let context = builder.context; { - let entitity_delete_mutation_builder = EntityDeleteMutationBuilder { context }; - let delete_mutation = generate_entity_filter_mutation_field::( - context, - entitity_delete_mutation_builder.type_name::(), - TypeRef::named_nn(TypeRef::INT), - Arc::new(|resolver_ctx, app_ctx, filters| { - let filters_condition = get_filter_conditions::( - resolver_ctx, - context, - filters, - ); - Box::pin(async move { - let db = app_ctx.db(); + let delete_mutation = + generate_entity_filtered_mutation_field::( + context, + get_entity_delete_mutation_field_name::(context), + TypeRef::named_nn(TypeRef::INT), + Arc::new(|resolver_ctx, app_ctx, filters| { + let filters_condition = get_filter_conditions::( + resolver_ctx, + context, + filters, + ); + Box::pin(async move { + let db = app_ctx.db(); - let select_subquery = subscriber_tasks::Entity::find() - .select_only() - .column(subscriber_tasks::Column::Id) - .filter(filters_condition); + let select_subquery = subscriber_tasks::Entity::find() + .select_only() + .column(subscriber_tasks::Column::Id) + .filter(filters_condition); - let delete_query = Query::delete() - .from_table((ApalisSchema::Schema, ApalisJobs::Table)) - .and_where( - Expr::col(ApalisJobs::Id).in_subquery(select_subquery.into_query()), - ) - .to_owned(); + let delete_query = Query::delete() + .from_table((ApalisSchema::Schema, ApalisJobs::Table)) + .and_where( + Expr::col(ApalisJobs::Id).in_subquery(select_subquery.into_query()), + ) + .to_owned(); - let db_backend = db.deref().get_database_backend(); - let delete_statement = db_backend.build(&delete_query); + let db_backend = db.deref().get_database_backend(); + let delete_statement = db_backend.build(&delete_query); - let result = db.execute(delete_statement).await?; + let result = db.execute(delete_statement).await?; - Ok::<_, RecorderError>(Some(FieldValue::value(result.rows_affected() as i32))) - }) - }), - ); + Ok::<_, RecorderError>(Some(FieldValue::value( + result.rows_affected() as i32 + ))) + }) + }), + ); builder.mutations.push(delete_mutation); } { - let entity_object_builder = EntityObjectBuilder { context }; - let entity_query_field = EntityQueryFieldBuilder { context }; - let entity_retry_one_mutation_name = format!( - "{}RetryOne", - entity_query_field.type_name::() - ); + let entity_retry_one_mutation_name = + get_entity_custom_mutation_field_name::(context, "RetryOne"); let retry_one_mutation = - generate_entity_filter_mutation_field::( + generate_entity_filtered_mutation_field::( context, entity_retry_one_mutation_name, - TypeRef::named_nn(entity_object_builder.type_name::()), + TypeRef::named_nn(get_entity_basic_type_name::( + context, + )), Arc::new(|resolver_ctx, app_ctx, filters| { let filters_condition = get_filter_conditions::( resolver_ctx, @@ -93,7 +264,9 @@ pub fn register_subscriber_tasks_entity_mutations( .into_tuple::() .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; let task = app_ctx.task(); task.retry_subscriber_task(job_id.clone()).await?; @@ -102,7 +275,9 @@ pub fn register_subscriber_tasks_entity_mutations( .filter(subscriber_tasks::Column::Id.eq(&job_id)) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model))) }) @@ -110,38 +285,47 @@ pub fn register_subscriber_tasks_entity_mutations( ); builder.mutations.push(retry_one_mutation); } + { + builder + .inputs + .push(generate_entity_default_insert_input_object::< + subscriber_tasks::Entity, + >(context)); + let create_one_mutation = + generate_entity_create_one_mutation_field::( + context, + None, + Arc::new(|_resolver_ctx, app_ctx, input_object| { + let job_column_name = get_column_name::( + context, + &subscriber_tasks::Column::Job, + ); + let task = input_object + .get(job_column_name.as_str()) + .unwrap() + .deserialize::() + .unwrap(); - builder -} - -pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) { - restrict_subscriber_for_entity::( - context, - &subscriber_tasks::Column::SubscriberId, - ); - restrict_jsonb_filter_input_for_entity::( - context, - &subscriber_tasks::Column::Job, - ); - convert_jsonb_output_case_for_entity::( - context, - &subscriber_tasks::Column::Job, - ); -} - -pub fn register_subscriber_tasks_to_schema_builder( - mut builder: SeaographyBuilder, -) -> SeaographyBuilder { - builder.register_entity::( - ::iter() - .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) - .collect(), - ); - builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn); - builder = - builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn); - builder = register_subscriber_tasks_entity_mutations(builder); - builder.register_enumeration::(); - builder.register_enumeration::(); + Box::pin(async move { + let task_service = app_ctx.task(); + + let task_id = task_service.add_subscriber_task(task).await?.to_string(); + + let db = app_ctx.db(); + + let task = subscriber_tasks::Entity::find() + .filter(subscriber_tasks::Column::Id.eq(&task_id)) + .one(db) + .await? + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; + + Ok::<_, RecorderError>(task) + }) + }), + ); + builder.mutations.push(create_one_mutation); + } builder } diff --git a/apps/recorder/src/graphql/domains/subscribers.rs b/apps/recorder/src/graphql/domains/subscribers.rs index 00a2b1f..be20012 100644 --- a/apps/recorder/src/graphql/domains/subscribers.rs +++ b/apps/recorder/src/graphql/domains/subscribers.rs @@ -12,7 +12,14 @@ use seaography::{ use crate::{ auth::{AuthError, AuthUserInfo}, - graphql::infra::util::{get_column_key, get_entity_column_key, get_entity_key}, + graphql::infra::name::{ + get_column_name, get_entity_and_column_name, + get_entity_create_batch_mutation_data_field_name, + get_entity_create_batch_mutation_field_name, + get_entity_create_one_mutation_data_field_name, get_entity_create_one_mutation_field_name, + get_entity_name, get_entity_update_mutation_data_field_name, + get_entity_update_mutation_field_name, + }, models::subscribers, }; @@ -82,32 +89,19 @@ where T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_key::(context); - let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); - let column_key = get_column_key::(context, column); - let column_name = Arc::new(context.entity_object.column_name.as_ref()( - &entity_key, - &column_key, - )); - let entity_create_one_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, context.entity_create_one_mutation.mutation_suffix - )); + let column_name = Arc::new(get_column_name::(context, column)); + let entity_create_one_mutation_field_name = + Arc::new(get_entity_create_one_mutation_field_name::(context)); let entity_create_one_mutation_data_field_name = - Arc::new(context.entity_create_one_mutation.data_field.clone()); - let entity_create_batch_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, - context.entity_create_batch_mutation.mutation_suffix.clone() - )); + Arc::new(get_entity_create_one_mutation_data_field_name(context).to_string()); + let entity_create_batch_mutation_field_name = + Arc::new(get_entity_create_batch_mutation_field_name::(context)); let entity_create_batch_mutation_data_field_name = - Arc::new(context.entity_create_batch_mutation.data_field.clone()); - let entity_update_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, context.entity_update_mutation.mutation_suffix - )); + Arc::new(get_entity_create_batch_mutation_data_field_name(context).to_string()); + let entity_update_mutation_field_name = + Arc::new(get_entity_update_mutation_field_name::(context)); let entity_update_mutation_data_field_name = - Arc::new(context.entity_update_mutation.data_field.clone()); + Arc::new(get_entity_update_mutation_data_field_name(context).to_string()); Box::new(move |context: &ResolverContext| -> GuardAction { match context.ctx.data::() { @@ -253,17 +247,10 @@ where T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_key::(context); - let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); - let entity_create_one_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, context.entity_create_one_mutation.mutation_suffix - )); - let entity_create_batch_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, - context.entity_create_batch_mutation.mutation_suffix.clone() - )); + let entity_create_one_mutation_field_name = + Arc::new(get_entity_create_one_mutation_field_name::(context)); + let entity_create_batch_mutation_field_name = + Arc::new(get_entity_create_batch_mutation_field_name::(context)); Box::new( move |context: &ResolverContext| -> SeaResult> { let field_name = context.field().name(); @@ -289,40 +276,39 @@ where T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_key::(context); - let entity_column_key = get_entity_column_key::(context, column); + let entity_and_column = get_entity_and_column_name::(context, column); context.guards.entity_guards.insert( - entity_key.clone(), + get_entity_name::(context), guard_entity_with_subscriber_id::(context, column), ); context.guards.field_guards.insert( - entity_column_key.clone(), + get_entity_and_column_name::(context, column), guard_field_with_subscriber_id::(context, column), ); context.filter_types.overwrites.insert( - entity_column_key.clone(), + get_entity_and_column_name::(context, column), Some(FilterType::Custom( SUBSCRIBER_ID_FILTER_INFO.type_name.clone(), )), ); context.filter_types.condition_functions.insert( - entity_column_key.clone(), + entity_and_column.clone(), generate_subscriber_id_filter_condition::(context, column), ); context.types.input_none_conversions.insert( - entity_column_key.clone(), + entity_and_column.clone(), generate_default_subscriber_id_input_conversion::(context, column), ); - context.entity_input.update_skips.push(entity_column_key); + context.entity_input.update_skips.push(entity_and_column); } pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) { restrict_subscriber_for_entity::(context, &subscribers::Column::Id); for column in subscribers::Column::iter() { if !matches!(column, subscribers::Column::Id) { - let key = get_entity_column_key::(context, &column); + let key = get_entity_and_column_name::(context, &column); context.filter_types.overwrites.insert(key, None); } } diff --git a/apps/recorder/src/graphql/domains/subscriptions.rs b/apps/recorder/src/graphql/domains/subscriptions.rs index 99f8cb8..2718762 100644 --- a/apps/recorder/src/graphql/domains/subscriptions.rs +++ b/apps/recorder/src/graphql/domains/subscriptions.rs @@ -2,22 +2,22 @@ use std::sync::Arc; use async_graphql::dynamic::{FieldValue, TypeRef}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; -use seaography::{ - Builder as SeaographyBuilder, BuilderContext, EntityObjectBuilder, EntityQueryFieldBuilder, - get_filter_conditions, -}; +use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions}; use crate::{ errors::RecorderError, graphql::{ domains::subscribers::restrict_subscriber_for_entity, - infra::custom::generate_entity_filter_mutation_field, + infra::{ + custom::generate_entity_filtered_mutation_field, + name::{get_entity_basic_type_name, get_entity_custom_mutation_field_name}, + }, }, - models::{ - subscriber_tasks, - subscriptions::{self, SubscriptionTrait}, + models::{subscriber_tasks, subscriptions}, + task::{ + SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, + SyncOneSubscriptionSourcesTask, }, - task::SubscriberTask, }; pub fn register_subscriptions_to_schema_context(context: &mut BuilderContext) { @@ -35,23 +35,21 @@ pub fn register_subscriptions_to_schema_builder( let context = builder.context; - let entity_object_builder = EntityObjectBuilder { context }; - let entity_query_field = EntityQueryFieldBuilder { context }; - { - let sync_one_feeds_incremental_mutation_name = format!( - "{}SyncOneFeedsIncremental", - entity_query_field.type_name::() - ); + let sync_one_feeds_incremental_mutation_name = get_entity_custom_mutation_field_name::< + subscriptions::Entity, + >(context, "SyncOneFeedsIncremental"); - let sync_one_feeds_incremental_mutation = generate_entity_filter_mutation_field::< + let sync_one_feeds_incremental_mutation = generate_entity_filtered_mutation_field::< subscriptions::Entity, _, _, >( builder.context, sync_one_feeds_incremental_mutation_name, - TypeRef::named_nn(entity_object_builder.type_name::()), + TypeRef::named_nn(get_entity_basic_type_name::( + context, + )), Arc::new(|resolver_ctx, app_ctx, filters| { let filters_condition = get_filter_conditions::(resolver_ctx, context, filters); @@ -63,19 +61,19 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; - - let subscription = - subscriptions::Subscription::try_from_model(&subscription_model)?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; let task_service = app_ctx.task(); let task_id = task_service .add_subscriber_task( - subscription_model.subscriber_id, - SubscriberTask::SyncOneSubscriptionFeedsIncremental( - subscription.into(), - ), + SyncOneSubscriptionFeedsIncrementalTask::builder() + .subscriber_id(subscription_model.subscriber_id) + .subscription_id(subscription_model.id) + .build() + .into(), ) .await?; @@ -83,7 +81,9 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; Ok(Some(FieldValue::owned_any(task_model))) }) @@ -93,19 +93,19 @@ pub fn register_subscriptions_to_schema_builder( builder.mutations.push(sync_one_feeds_incremental_mutation); } { - let sync_one_feeds_full_mutation_name = format!( - "{}SyncOneFeedsFull", - entity_query_field.type_name::() - ); - - let sync_one_feeds_full_mutation = generate_entity_filter_mutation_field::< + let sync_one_feeds_full_mutation_name = get_entity_custom_mutation_field_name::< + subscriptions::Entity, + >(builder.context, "SyncOneFeedsFull"); + let sync_one_feeds_full_mutation = generate_entity_filtered_mutation_field::< subscriptions::Entity, _, _, >( builder.context, sync_one_feeds_full_mutation_name, - TypeRef::named_nn(entity_object_builder.type_name::()), + TypeRef::named_nn(get_entity_basic_type_name::( + context, + )), Arc::new(|resolver_ctx, app_ctx, filters| { let filters_condition = get_filter_conditions::(resolver_ctx, context, filters); @@ -117,17 +117,19 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; - - let subscription = - subscriptions::Subscription::try_from_model(&subscription_model)?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; let task_service = app_ctx.task(); let task_id = task_service .add_subscriber_task( - subscription_model.subscriber_id, - SubscriberTask::SyncOneSubscriptionFeedsFull(subscription.into()), + SyncOneSubscriptionFeedsFullTask::builder() + .subscriber_id(subscription_model.subscriber_id) + .subscription_id(subscription_model.id) + .build() + .into(), ) .await?; @@ -135,7 +137,9 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; Ok(Some(FieldValue::owned_any(task_model))) }) @@ -146,19 +150,20 @@ pub fn register_subscriptions_to_schema_builder( } { - let sync_one_sources_mutation_name = format!( - "{}SyncOneSources", - entity_query_field.type_name::() - ); + let sync_one_sources_mutation_name = get_entity_custom_mutation_field_name::< + subscriptions::Entity, + >(context, "SyncOneSources"); - let sync_one_sources_mutation = generate_entity_filter_mutation_field::< + let sync_one_sources_mutation = generate_entity_filtered_mutation_field::< subscriptions::Entity, _, _, >( builder.context, sync_one_sources_mutation_name, - TypeRef::named_nn(entity_object_builder.type_name::()), + TypeRef::named_nn(get_entity_basic_type_name::( + context, + )), Arc::new(|resolver_ctx, app_ctx, filters| { let filters_condition = get_filter_conditions::(resolver_ctx, context, filters); @@ -170,17 +175,19 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; - - let subscription = - subscriptions::Subscription::try_from_model(&subscription_model)?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; let task_service = app_ctx.task(); let task_id = task_service .add_subscriber_task( - subscription_model.subscriber_id, - SubscriberTask::SyncOneSubscriptionSources(subscription.into()), + SyncOneSubscriptionSourcesTask::builder() + .subscriber_id(subscription_model.subscriber_id) + .subscription_id(subscription_model.id) + .build() + .into(), ) .await?; @@ -188,7 +195,9 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; + .ok_or_else(|| { + RecorderError::from_entity_not_found::() + })?; Ok(Some(FieldValue::owned_any(task_model))) }) diff --git a/apps/recorder/src/graphql/infra/crypto.rs b/apps/recorder/src/graphql/infra/crypto.rs index a978962..4b1fd0a 100644 --- a/apps/recorder/src/graphql/infra/crypto.rs +++ b/apps/recorder/src/graphql/infra/crypto.rs @@ -6,7 +6,7 @@ use seaography::{BuilderContext, SeaResult}; use crate::{ app::AppContextTrait, - graphql::infra::util::{get_column_key, get_entity_key}, + graphql::infra::name::{get_column_name, get_entity_name}, }; pub fn register_crypto_column_input_conversion_to_schema_context( @@ -17,8 +17,8 @@ pub fn register_crypto_column_input_conversion_to_schema_context( T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_key::(context); - let column_name = get_column_key::(context, column); + let entity_key = get_entity_name::(context); + let column_name = get_column_name::(context, column); let entity_name = context.entity_object.type_name.as_ref()(&entity_key); let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); @@ -44,8 +44,8 @@ pub fn register_crypto_column_output_conversion_to_schema_context( T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_key::(context); - let column_name = get_column_key::(context, column); + let entity_key = get_entity_name::(context); + let column_name = get_column_name::(context, column); let entity_name = context.entity_object.type_name.as_ref()(&entity_key); let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); diff --git a/apps/recorder/src/graphql/infra/custom.rs b/apps/recorder/src/graphql/infra/custom.rs index fab2678..36d35ed 100644 --- a/apps/recorder/src/graphql/infra/custom.rs +++ b/apps/recorder/src/graphql/infra/custom.rs @@ -1,12 +1,31 @@ use std::{pin::Pin, sync::Arc}; use async_graphql::dynamic::{ - Field, FieldFuture, FieldValue, InputValue, ResolverContext, TypeRef, ValueAccessor, + Field, FieldFuture, FieldValue, InputObject, InputValue, Object, ObjectAccessor, + ResolverContext, TypeRef, ValueAccessor, +}; +use sea_orm::{ + ActiveModelTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait, +}; +use seaography::{ + BuilderContext, GuardAction, SeaographyError, get_filter_conditions, prepare_active_model, }; -use sea_orm::EntityTrait; -use seaography::{BuilderContext, EntityObjectBuilder, FilterInputBuilder, GuardAction}; -use crate::{app::AppContextTrait, errors::RecorderResult}; +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + graphql::infra::name::{ + get_entity_and_column_name_from_column_str, get_entity_basic_type_name, + get_entity_create_batch_mutation_data_field_name, + get_entity_create_batch_mutation_field_name, + get_entity_create_one_mutation_data_field_name, get_entity_create_one_mutation_field_name, + get_entity_delete_mutation_field_name, get_entity_delete_mutation_filter_field_name, + get_entity_filter_input_type_name, get_entity_insert_data_input_type_name, get_entity_name, + get_entity_renormalized_filter_field_name, get_entity_update_data_input_type_name, + get_entity_update_mutation_data_field_name, get_entity_update_mutation_field_name, + get_entity_update_mutation_filter_field_name, + }, +}; pub type FilterMutationFn = Arc< dyn for<'a> Fn( @@ -19,27 +38,102 @@ pub type FilterMutationFn = Arc< + Sync, >; -pub fn generate_entity_filter_mutation_field( +pub type CreateOneMutationFn = Arc< + dyn for<'a> Fn( + &ResolverContext<'a>, + Arc, + ObjectAccessor<'_>, + ) -> Pin> + Send + 'a>> + + Send + + Sync, +>; + +pub type CreateBatchMutationFn = Arc< + dyn for<'a> Fn( + &ResolverContext<'a>, + Arc, + Vec>, + ) -> Pin>> + Send + 'a>> + + Send + + Sync, +>; + +pub type UpdateMutationFn = Arc< + dyn for<'a> Fn( + &ResolverContext<'a>, + Arc, + Condition, + ObjectAccessor<'_>, + ) -> Pin>> + Send + 'a>> + + Send + + Sync, +>; + +pub type DeleteMutationFn = Arc< + dyn for<'a> Fn( + &ResolverContext<'a>, + Arc, + Condition, + ) -> Pin> + Send + 'a>> + + Send + + Sync, +>; + +pub fn generate_entity_default_insert_input_object( + builder_context: &'static BuilderContext, +) -> InputObject +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_input_builder = seaography::EntityInputBuilder { + context: builder_context, + }; + + entity_input_builder.insert_input_object::() +} + +pub fn generate_entity_default_update_input_object( + builder_context: &'static BuilderContext, +) -> InputObject +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_input_builder = seaography::EntityInputBuilder { + context: builder_context, + }; + + entity_input_builder.update_input_object::() +} + +pub fn generate_entity_default_basic_entity_object( + builder_context: &'static BuilderContext, +) -> Object +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_object_builder = seaography::EntityObjectBuilder { + context: builder_context, + }; + + entity_object_builder.basic_to_object::() +} + +pub fn generate_entity_filtered_mutation_field( builder_context: &'static BuilderContext, field_name: N, type_ref: R, mutation_fn: FilterMutationFn, ) -> Field where - T: EntityTrait, - ::Model: Sync, + E: EntityTrait, + ::Model: Sync, N: Into, R: Into, { - let entity_filter_input_builder = FilterInputBuilder { - context: builder_context, - }; - let entity_object_builder = EntityObjectBuilder { - context: builder_context, - }; - let object_name: String = entity_object_builder.type_name::(); - - let context = builder_context; + let object_name: String = get_entity_name::(builder_context); let guard = builder_context.guards.entity_guards.get(&object_name); @@ -60,7 +154,7 @@ where let app_ctx = ctx.data::>()?; - let filters = ctx.args.get(&context.entity_delete_mutation.filter_field); + let filters = ctx.args.get(get_entity_renormalized_filter_field_name()); let result = mutation_fn(&ctx, app_ctx.clone(), filters) .await @@ -70,7 +164,630 @@ where }) }) .argument(InputValue::new( - &context.entity_delete_mutation.filter_field, - TypeRef::named(entity_filter_input_builder.type_name(&object_name)), + get_entity_renormalized_filter_field_name(), + TypeRef::named(get_entity_filter_input_type_name::(builder_context)), )) } + +pub fn generate_entity_create_one_mutation_field( + builder_context: &'static BuilderContext, + input_data_type_ref: Option, + mutation_fn: CreateOneMutationFn, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ID: Into, +{ + let guard = builder_context + .guards + .entity_guards + .get(&get_entity_name::(builder_context)); + let field_guards = &builder_context.guards.field_guards; + + Field::new( + get_entity_create_one_mutation_field_name::(builder_context), + TypeRef::named_nn(get_entity_basic_type_name::(builder_context)), + move |ctx| { + let mutation_fn = mutation_fn.clone(); + FieldFuture::new(async move { + let guard_flag = if let Some(guard) = guard { + (*guard)(&ctx) + } else { + GuardAction::Allow + }; + + if let GuardAction::Block(reason) = guard_flag { + return Err::, async_graphql::Error>(async_graphql::Error::new( + reason.unwrap_or("Entity guard triggered.".into()), + )); + } + + let app_ctx = ctx.data::>()?; + + let value_accessor = ctx + .args + .get(get_entity_create_one_mutation_data_field_name( + builder_context, + )) + .unwrap(); + let input_object = value_accessor.object()?; + + for (column, _) in input_object.iter() { + let field_guard = field_guards.get( + &get_entity_and_column_name_from_column_str::(builder_context, column), + ); + let field_guard_flag = if let Some(field_guard) = field_guard { + (*field_guard)(&ctx) + } else { + GuardAction::Allow + }; + if let GuardAction::Block(reason) = field_guard_flag { + return match reason { + Some(reason) => Err::, async_graphql::Error>( + async_graphql::Error::new(reason), + ), + None => Err::, async_graphql::Error>( + async_graphql::Error::new("Field guard triggered."), + ), + }; + } + } + + let result = mutation_fn(&ctx, app_ctx.clone(), input_object) + .await + .map_err(async_graphql::Error::new_with_source)?; + + Ok(Some(FieldValue::owned_any(result))) + }) + }, + ) + .argument(InputValue::new( + get_entity_create_one_mutation_data_field_name(builder_context), + input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| { + TypeRef::named_nn(get_entity_insert_data_input_type_name::(builder_context)) + }), + )) +} + +pub fn generate_entity_default_create_one_mutation_fn( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> CreateOneMutationFn +where + T: EntityTrait, + ::Model: Sync + IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + Arc::new(move |resolve_context, app_ctx, input_object| { + let entity_input_builder = seaography::EntityInputBuilder { + context: builder_context, + }; + let entity_object_builder = seaography::EntityObjectBuilder { + context: builder_context, + }; + let active_model = prepare_active_model::( + &entity_input_builder, + &entity_object_builder, + &input_object, + resolve_context, + ) + .map_err(SeaographyError::AsyncGraphQLError); + + Box::pin(async move { + if active_model_hooks { + let transaction = app_ctx.db().begin().await?; + + let active_model = active_model?; + + let active_model = active_model.before_save(&transaction, true).await?; + + let result: T::Model = active_model.insert(&transaction).await?; + + let result = A::after_save(result, &transaction, true).await?; + + transaction.commit().await?; + + Ok(result) + } else { + let db = app_ctx.db(); + + let active_model = active_model?; + + let result: T::Model = active_model.insert(db).await?; + + Ok(result) + } + }) + }) +} + +pub fn generate_entity_default_create_one_mutation_field( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + generate_entity_create_one_mutation_field::( + builder_context, + None, + generate_entity_default_create_one_mutation_fn::(builder_context, active_model_hooks), + ) +} + +pub fn generate_entity_create_batch_mutation_field( + builder_context: &'static BuilderContext, + input_data_type_ref: Option, + mutation_fn: CreateBatchMutationFn, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ID: Into, +{ + let object_name: String = get_entity_name::(builder_context); + let guard = builder_context.guards.entity_guards.get(&object_name); + let field_guards = &builder_context.guards.field_guards; + + Field::new( + get_entity_create_batch_mutation_field_name::(builder_context), + TypeRef::named_nn_list_nn(get_entity_basic_type_name::(builder_context)), + move |ctx| { + let mutation_fn = mutation_fn.clone(); + FieldFuture::new(async move { + let guard_flag = if let Some(guard) = guard { + (*guard)(&ctx) + } else { + GuardAction::Allow + }; + + if let GuardAction::Block(reason) = guard_flag { + return match reason { + Some(reason) => Err::, async_graphql::Error>( + async_graphql::Error::new(reason), + ), + None => Err::, async_graphql::Error>(async_graphql::Error::new( + "Entity guard triggered.", + )), + }; + } + + let mut input_objects: Vec> = vec![]; + let list = ctx + .args + .get(get_entity_create_batch_mutation_data_field_name( + builder_context, + )) + .unwrap() + .list()?; + for input in list.iter() { + let input_object = input.object()?; + for (column, _) in input_object.iter() { + let field_guard = + field_guards.get(&get_entity_and_column_name_from_column_str::( + builder_context, + column, + )); + let field_guard_flag = if let Some(field_guard) = field_guard { + (*field_guard)(&ctx) + } else { + GuardAction::Allow + }; + if let GuardAction::Block(reason) = field_guard_flag { + return match reason { + Some(reason) => Err::, async_graphql::Error>( + async_graphql::Error::new(reason), + ), + None => Err::, async_graphql::Error>( + async_graphql::Error::new("Field guard triggered."), + ), + }; + } + } + + input_objects.push(input_object); + } + + let app_ctx = ctx.data::>()?; + + let results = mutation_fn(&ctx, app_ctx.clone(), input_objects) + .await + .map_err(async_graphql::Error::new_with_source)?; + + Ok(Some(FieldValue::list( + results.into_iter().map(FieldValue::owned_any), + ))) + }) + }, + ) + .argument(InputValue::new( + get_entity_create_batch_mutation_data_field_name(builder_context), + input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| { + TypeRef::named_nn_list_nn(get_entity_insert_data_input_type_name::(builder_context)) + }), + )) +} + +pub fn generate_entity_default_create_batch_mutation_fn( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> CreateBatchMutationFn +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + Arc::new(move |resolve_context, app_ctx, input_objects| { + let entity_input_builder = seaography::EntityInputBuilder { + context: builder_context, + }; + let entity_object_builder = seaography::EntityObjectBuilder { + context: builder_context, + }; + let active_models = input_objects + .into_iter() + .map(|input_object| { + prepare_active_model::( + &entity_input_builder, + &entity_object_builder, + &input_object, + resolve_context, + ) + }) + .collect::, _>>() + .map_err(SeaographyError::AsyncGraphQLError); + + Box::pin(async move { + if active_model_hooks { + let transaction = app_ctx.db().begin().await?; + + let mut before_save_models = vec![]; + + for active_model in active_models? { + let before_save_model = active_model.before_save(&transaction, false).await?; + before_save_models.push(before_save_model); + } + + let models: Vec = E::insert_many(before_save_models) + .exec_with_returning_many(&transaction) + .await?; + + let mut result = vec![]; + for model in models { + let after_save_model = A::after_save(model, &transaction, false).await?; + result.push(after_save_model); + } + + transaction.commit().await?; + + Ok(result) + } else { + let db = app_ctx.db(); + let active_models = active_models?; + let results: Vec = E::insert_many(active_models) + .exec_with_returning_many(db) + .await?; + + Ok(results) + } + }) + }) +} + +pub fn generate_entity_default_create_batch_mutation_field( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + generate_entity_create_batch_mutation_field::( + builder_context, + None, + generate_entity_default_create_batch_mutation_fn::( + builder_context, + active_model_hooks, + ), + ) +} + +pub fn generate_entity_update_mutation_field( + builder_context: &'static BuilderContext, + input_data_type_ref: Option, + mutation_fn: UpdateMutationFn, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + I: Into, +{ + let guard = builder_context + .guards + .entity_guards + .get(&get_entity_name::(builder_context)); + let field_guards = &builder_context.guards.field_guards; + + Field::new( + get_entity_update_mutation_field_name::(builder_context), + TypeRef::named_nn_list_nn(get_entity_basic_type_name::(builder_context)), + move |ctx| { + let mutation_fn = mutation_fn.clone(); + FieldFuture::new(async move { + let guard_flag = if let Some(guard) = guard { + (*guard)(&ctx) + } else { + GuardAction::Allow + }; + + if let GuardAction::Block(reason) = guard_flag { + return match reason { + Some(reason) => Err::, async_graphql::Error>( + async_graphql::Error::new(reason), + ), + None => Err::, async_graphql::Error>(async_graphql::Error::new( + "Entity guard triggered.", + )), + }; + } + + let app_ctx = ctx.data::>()?; + + let filters = ctx.args.get(get_entity_update_mutation_filter_field_name( + builder_context, + )); + let filter_condition = get_filter_conditions::(&ctx, builder_context, filters); + + let value_accessor = ctx + .args + .get(get_entity_update_mutation_data_field_name(builder_context)) + .unwrap(); + let input_object = value_accessor.object()?; + + for (column, _) in input_object.iter() { + let field_guard = field_guards.get( + &get_entity_and_column_name_from_column_str::(builder_context, column), + ); + let field_guard_flag = if let Some(field_guard) = field_guard { + (*field_guard)(&ctx) + } else { + GuardAction::Allow + }; + if let GuardAction::Block(reason) = field_guard_flag { + return match reason { + Some(reason) => Err::, async_graphql::Error>( + async_graphql::Error::new(reason), + ), + None => Err::, async_graphql::Error>( + async_graphql::Error::new("Field guard triggered."), + ), + }; + } + } + + let result = mutation_fn(&ctx, app_ctx.clone(), filter_condition, input_object) + .await + .map_err(async_graphql::Error::new_with_source)?; + + Ok(Some(FieldValue::list( + result.into_iter().map(FieldValue::owned_any), + ))) + }) + }, + ) + .argument(InputValue::new( + get_entity_update_mutation_data_field_name(builder_context), + input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| { + TypeRef::named_nn(get_entity_update_data_input_type_name::(builder_context)) + }), + )) + .argument(InputValue::new( + get_entity_update_mutation_filter_field_name(builder_context), + TypeRef::named(get_entity_filter_input_type_name::(builder_context)), + )) +} + +pub fn generate_entity_default_update_mutation_fn( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> UpdateMutationFn +where + T: EntityTrait, + ::Model: Sync + IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + Arc::new( + move |resolve_context, app_ctx, filter_condition, input_object| { + let entity_input_builder = seaography::EntityInputBuilder { + context: builder_context, + }; + let entity_object_builder = seaography::EntityObjectBuilder { + context: builder_context, + }; + + let active_model = prepare_active_model::( + &entity_input_builder, + &entity_object_builder, + &input_object, + resolve_context, + ) + .map_err(SeaographyError::AsyncGraphQLError); + + Box::pin(async move { + if active_model_hooks { + let transaction = app_ctx.db().begin().await?; + + let active_model = active_model?; + + let active_model = active_model.before_save(&transaction, false).await?; + + let models = T::update_many() + .set(active_model) + .filter(filter_condition.clone()) + .exec_with_returning(&transaction) + .await?; + let mut result = vec![]; + + for model in models { + result.push(A::after_save(model, &transaction, false).await?); + } + + transaction.commit().await?; + + Ok(result) + } else { + let db = app_ctx.db(); + + let active_model = active_model?; + + let result = T::update_many() + .set(active_model) + .filter(filter_condition.clone()) + .exec_with_returning(db) + .await?; + + Ok(result) + } + }) + }, + ) +} + +pub fn generate_entity_default_update_mutation_field( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + generate_entity_update_mutation_field::( + builder_context, + None, + generate_entity_default_update_mutation_fn::(builder_context, active_model_hooks), + ) +} + +pub fn generate_entity_delete_mutation_field( + builder_context: &'static BuilderContext, + mutation_fn: DeleteMutationFn, +) -> Field +where + E: EntityTrait, + ::Model: Sync, +{ + let object_name: String = get_entity_name::(builder_context); + let guard = builder_context.guards.entity_guards.get(&object_name); + + Field::new( + get_entity_delete_mutation_field_name::(builder_context), + TypeRef::named_nn(TypeRef::INT), + move |ctx| { + let mutation_fn = mutation_fn.clone(); + FieldFuture::new(async move { + let guard_flag = if let Some(guard) = guard { + (*guard)(&ctx) + } else { + GuardAction::Allow + }; + + if let GuardAction::Block(reason) = guard_flag { + return Err::, async_graphql::Error>(async_graphql::Error::new( + reason.unwrap_or("Entity guard triggered.".into()), + )); + } + + let filters = ctx.args.get(get_entity_delete_mutation_filter_field_name( + builder_context, + )); + let filter_condition = get_filter_conditions::(&ctx, builder_context, filters); + + let app_ctx = ctx.data::>()?; + + let res = mutation_fn(&ctx, app_ctx.clone(), filter_condition) + .await + .map_err(async_graphql::Error::new_with_source)?; + + Ok(Some(async_graphql::Value::from(res))) + }) + }, + ) + .argument(InputValue::new( + get_entity_delete_mutation_filter_field_name(builder_context), + TypeRef::named(get_entity_filter_input_type_name::(builder_context)), + )) +} + +pub fn generate_entity_default_delete_mutation_fn( + _builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> DeleteMutationFn +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + Arc::new(move |_resolve_context, app_ctx, filter_condition| { + Box::pin(async move { + if active_model_hooks { + let transaction = app_ctx.db().begin().await?; + + let models: Vec = E::find() + .filter(filter_condition.clone()) + .all(&transaction) + .await?; + + let mut active_models: Vec = vec![]; + for model in models { + let active_model = model.into_active_model(); + active_models.push(active_model.before_delete(&transaction).await?); + } + + let result = E::delete_many() + .filter(filter_condition) + .exec(&transaction) + .await?; + + for active_model in active_models { + active_model.after_delete(&transaction).await?; + } + + transaction.commit().await?; + + Ok(result.rows_affected) + } else { + let db = app_ctx.db(); + + let result = E::delete_many().filter(filter_condition).exec(db).await?; + + Ok(result.rows_affected) + } + }) + }) +} + +pub fn generate_entity_default_delete_mutation_field( + builder_context: &'static BuilderContext, + active_model_hooks: bool, +) -> Field +where + E: EntityTrait, + ::Model: Sync, + ::Model: IntoActiveModel, + A: ActiveModelTrait + sea_orm::ActiveModelBehavior + std::marker::Send + 'static, +{ + generate_entity_delete_mutation_field::( + builder_context, + generate_entity_default_delete_mutation_fn::(builder_context, active_model_hooks), + ) +} diff --git a/apps/recorder/src/graphql/infra/json.rs b/apps/recorder/src/graphql/infra/json.rs index 12ff758..b5dff4e 100644 --- a/apps/recorder/src/graphql/infra/json.rs +++ b/apps/recorder/src/graphql/infra/json.rs @@ -17,7 +17,7 @@ use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value as JsonValue; use crate::{ - errors::RecorderResult, graphql::infra::util::get_entity_column_key, + errors::RecorderResult, graphql::infra::name::get_entity_and_column_name, utils::json::convert_json_keys, }; @@ -946,9 +946,8 @@ where T: EntityTrait, ::Model: Sync, { - let entity_column_key = get_entity_column_key::(context, column); context.filter_types.overwrites.insert( - entity_column_key.clone(), + get_entity_and_column_name::(context, column), Some(FilterType::Custom(JSONB_FILTER_NAME.to_string())), ); } @@ -959,20 +958,20 @@ where ::Model: Sync, S: DeserializeOwned + Serialize, { - let entity_column_key = get_entity_column_key::(context, column); + let entity_column_name = get_entity_and_column_name::(context, column); context.types.input_conversions.insert( - entity_column_key.clone(), + entity_column_name.clone(), Box::new(move |_resolve_context, accessor| { let deserialized = accessor.deserialize::().map_err(|err| { SeaographyError::TypeConversionError( err.message, - format!("Json - {entity_column_key}"), + format!("Json - {entity_column_name}"), ) })?; let json_value = serde_json::to_value(deserialized).map_err(|err| { SeaographyError::TypeConversionError( err.to_string(), - format!("Json - {entity_column_key}"), + format!("Json - {entity_column_name}"), ) })?; Ok(sea_orm::Value::Json(Some(Box::new(json_value)))) @@ -980,26 +979,27 @@ where ); } -pub fn convert_jsonb_output_case_for_entity(context: &mut BuilderContext, column: &T::Column) -where +pub fn convert_jsonb_output_case_for_entity( + context: &mut BuilderContext, + column: &T::Column, + case: Case<'static>, +) where T: EntityTrait, ::Model: Sync, { - let entity_column_key = get_entity_column_key::(context, column); + let entity_column_key = get_entity_and_column_name::(context, column); context.types.output_conversions.insert( entity_column_key.clone(), Box::new(move |value| { if let sea_orm::Value::Json(Some(json)) = value { - let result = async_graphql::Value::from_json(convert_json_keys( - json.as_ref().clone(), - Case::Camel, - )) - .map_err(|err| { - SeaographyError::TypeConversionError( - err.to_string(), - format!("Json - {entity_column_key}"), - ) - })?; + let result = + async_graphql::Value::from_json(convert_json_keys(json.as_ref().clone(), case)) + .map_err(|err| { + SeaographyError::TypeConversionError( + err.to_string(), + format!("Json - {entity_column_key}"), + ) + })?; Ok(result) } else { Err(SeaographyError::TypeConversionError( diff --git a/apps/recorder/src/graphql/infra/mod.rs b/apps/recorder/src/graphql/infra/mod.rs index 4079fd7..2ca158b 100644 --- a/apps/recorder/src/graphql/infra/mod.rs +++ b/apps/recorder/src/graphql/infra/mod.rs @@ -1,4 +1,4 @@ pub mod crypto; pub mod custom; pub mod json; -pub mod util; +pub mod name; diff --git a/apps/recorder/src/graphql/infra/name.rs b/apps/recorder/src/graphql/infra/name.rs new file mode 100644 index 0000000..42fe8cf --- /dev/null +++ b/apps/recorder/src/graphql/infra/name.rs @@ -0,0 +1,203 @@ +use std::fmt::Display; + +use sea_orm::{EntityName, EntityTrait, IdenStatic}; +use seaography::BuilderContext; + +pub fn get_entity_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let t = T::default(); + let name = ::table_name(&t); + context.entity_object.type_name.as_ref()(name) +} + +pub fn get_column_name(context: &BuilderContext, column: &T::Column) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + context.entity_object.column_name.as_ref()(&entity_name, column.as_str()) +} + +pub fn get_entity_and_column_name(context: &BuilderContext, column: &T::Column) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + let column_name = get_column_name::(context, column); + + format!("{entity_name}.{column_name}") +} + +pub fn get_entity_and_column_name_from_column_str( + context: &BuilderContext, + column_str: &str, +) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + + format!("{entity_name}.{column_str}") +} + +pub fn get_entity_basic_type_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let t = T::default(); + let name = ::table_name(&t); + format!( + "{}{}", + context.entity_object.type_name.as_ref()(name), + context.entity_object.basic_type_suffix + ) +} + +pub fn get_entity_query_field_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + context.entity_query_field.type_name.as_ref()(&entity_name) +} + +pub fn get_entity_filter_input_type_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + context.filter_input.type_name.as_ref()(&entity_name) +} + +pub fn get_entity_insert_data_input_type_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + format!("{entity_name}{}", context.entity_input.insert_suffix) +} + +pub fn get_entity_update_data_input_type_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_name = get_entity_name::(context); + format!("{entity_name}{}", context.entity_input.update_suffix) +} + +pub fn get_entity_create_one_mutation_field_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let query_field_name = get_entity_query_field_name::(context); + format!( + "{}{}", + query_field_name, context.entity_create_one_mutation.mutation_suffix + ) +} + +pub fn get_entity_create_batch_mutation_field_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let query_field_name = get_entity_query_field_name::(context); + format!( + "{}{}", + query_field_name, context.entity_create_batch_mutation.mutation_suffix + ) +} + +pub fn get_entity_delete_mutation_field_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let query_field_name = get_entity_query_field_name::(context); + format!( + "{}{}", + query_field_name, context.entity_delete_mutation.mutation_suffix + ) +} + +pub fn get_entity_update_mutation_field_name(context: &BuilderContext) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let query_field_name = get_entity_query_field_name::(context); + format!( + "{}{}", + query_field_name, context.entity_update_mutation.mutation_suffix + ) +} + +pub fn get_entity_custom_mutation_field_name( + context: &BuilderContext, + mutation_suffix: impl Display, +) -> String +where + T: EntityTrait, + ::Model: Sync, +{ + let query_field_name = get_entity_query_field_name::(context); + format!("{query_field_name}{mutation_suffix}") +} + +pub fn get_entity_renormalized_filter_field_name() -> &'static str { + "filter" +} + +pub fn get_entity_query_filter_field_name(context: &BuilderContext) -> &str { + &context.entity_query_field.filters +} + +pub fn get_entity_update_mutation_filter_field_name(context: &BuilderContext) -> &str { + &context.entity_update_mutation.filter_field +} + +pub fn get_entity_delete_mutation_filter_field_name(context: &BuilderContext) -> &str { + &context.entity_delete_mutation.filter_field +} + +pub fn renormalize_filter_field_names_to_schema_context(context: &mut BuilderContext) { + let renormalized_filter_field_name = get_entity_renormalized_filter_field_name(); + context.entity_query_field.filters = renormalized_filter_field_name.to_string(); + context.entity_update_mutation.filter_field = renormalized_filter_field_name.to_string(); + context.entity_delete_mutation.filter_field = renormalized_filter_field_name.to_string(); +} + +pub fn get_entity_renormalized_data_field_name() -> &'static str { + "data" +} + +pub fn get_entity_create_one_mutation_data_field_name(context: &BuilderContext) -> &str { + &context.entity_create_one_mutation.data_field +} + +pub fn get_entity_create_batch_mutation_data_field_name(context: &BuilderContext) -> &str { + &context.entity_create_batch_mutation.data_field +} + +pub fn get_entity_update_mutation_data_field_name(context: &BuilderContext) -> &str { + &context.entity_update_mutation.data_field +} + +pub fn renormalize_data_field_names_to_schema_context(context: &mut BuilderContext) { + let renormalized_data_field_name = get_entity_renormalized_data_field_name(); + context.entity_create_one_mutation.data_field = renormalized_data_field_name.to_string(); + context.entity_create_batch_mutation.data_field = renormalized_data_field_name.to_string(); + context.entity_update_mutation.data_field = renormalized_data_field_name.to_string(); +} diff --git a/apps/recorder/src/graphql/infra/util.rs b/apps/recorder/src/graphql/infra/util.rs deleted file mode 100644 index 50d77e2..0000000 --- a/apps/recorder/src/graphql/infra/util.rs +++ /dev/null @@ -1,30 +0,0 @@ -use sea_orm::{EntityName, EntityTrait, IdenStatic}; -use seaography::BuilderContext; - -pub fn get_entity_key(context: &BuilderContext) -> String -where - T: EntityTrait, - ::Model: Sync, -{ - context.entity_object.type_name.as_ref()(::table_name(&T::default())) -} - -pub fn get_column_key(context: &BuilderContext, column: &T::Column) -> String -where - T: EntityTrait, - ::Model: Sync, -{ - let entity_name = get_entity_key::(context); - context.entity_object.column_name.as_ref()(&entity_name, column.as_str()) -} - -pub fn get_entity_column_key(context: &BuilderContext, column: &T::Column) -> String -where - T: EntityTrait, - ::Model: Sync, -{ - let entity_name = get_entity_key::(context); - let column_name = get_column_key::(context, column); - - format!("{}.{}", &entity_name, &column_name) -} diff --git a/apps/recorder/src/graphql/schema.rs b/apps/recorder/src/graphql/schema.rs index 2a3a9bc..d458f3e 100644 --- a/apps/recorder/src/graphql/schema.rs +++ b/apps/recorder/src/graphql/schema.rs @@ -39,7 +39,13 @@ use crate::{ register_subscriptions_to_schema_builder, register_subscriptions_to_schema_context, }, }, - infra::json::register_jsonb_input_filter_to_schema_builder, + infra::{ + json::register_jsonb_input_filter_to_schema_builder, + name::{ + renormalize_data_field_names_to_schema_context, + renormalize_filter_field_names_to_schema_context, + }, + }, }, }; @@ -55,6 +61,9 @@ pub fn build_schema( let context = CONTEXT.get_or_init(|| { let mut context = BuilderContext::default(); + renormalize_filter_field_names_to_schema_context(&mut context); + renormalize_data_field_names_to_schema_context(&mut context); + { // domains register_feeds_to_schema_context(&mut context); diff --git a/apps/recorder/src/lib.rs b/apps/recorder/src/lib.rs index a7c02d7..da3c3aa 100644 --- a/apps/recorder/src/lib.rs +++ b/apps/recorder/src/lib.rs @@ -12,7 +12,6 @@ )] #![allow(clippy::enum_variant_names)] pub use downloader; - pub mod app; pub mod auth; pub mod cache; diff --git a/apps/recorder/src/migrations/defs.rs b/apps/recorder/src/migrations/defs.rs index 2427f33..310f068 100644 --- a/apps/recorder/src/migrations/defs.rs +++ b/apps/recorder/src/migrations/defs.rs @@ -175,7 +175,6 @@ pub enum Feeds { pub enum Cron { Table, Id, - CronSource, SubscriberId, SubscriptionId, CronExpr, @@ -190,6 +189,7 @@ pub enum Cron { MaxAttempts, Priority, Status, + SubscriberTask, } macro_rules! create_postgres_enum_for_active_enum { diff --git a/apps/recorder/src/migrations/m20220101_000001_init.rs b/apps/recorder/src/migrations/m20220101_000001_init.rs index 4429e23..0a589aa 100644 --- a/apps/recorder/src/migrations/m20220101_000001_init.rs +++ b/apps/recorder/src/migrations/m20220101_000001_init.rs @@ -52,8 +52,7 @@ impl MigrationTrait for Migration { subscriptions::SubscriptionCategoryEnum, subscriptions::SubscriptionCategory::MikanSubscriber, subscriptions::SubscriptionCategory::MikanBangumi, - subscriptions::SubscriptionCategory::MikanSeason, - subscriptions::SubscriptionCategory::Manual + subscriptions::SubscriptionCategory::MikanSeason ) .await?; diff --git a/apps/recorder/src/migrations/m20250625_060701_add_subscription_id_to_subscriber_tasks.rs b/apps/recorder/src/migrations/m20250625_060701_add_subscription_id_to_subscriber_tasks.rs index 44347c4..79dae78 100644 --- a/apps/recorder/src/migrations/m20250625_060701_add_subscription_id_to_subscriber_tasks.rs +++ b/apps/recorder/src/migrations/m20250625_060701_add_subscription_id_to_subscriber_tasks.rs @@ -17,8 +17,8 @@ SELECT job, job_type, status, - (job ->> 'subscriber_id'::text)::integer AS subscriber_id, - job ->> 'task_type'::text AS task_type, + (job ->> 'subscriber_id')::integer AS subscriber_id, + job ->> 'task_type' AS task_type, id, attempts, max_attempts, @@ -28,7 +28,7 @@ SELECT lock_by, done_at, priority, - (job ->> 'subscription_id'::text)::integer AS subscription_id + (job ->> 'subscription_id')::integer AS subscription_id FROM apalis.jobs WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}' AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")') diff --git a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs index f725077..1695bce 100644 --- a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs +++ b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs @@ -7,9 +7,9 @@ use crate::{ }, models::cron::{ CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, - CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum, - CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, - NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronStatus, CronStatusEnum, + NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, }, }; @@ -19,9 +19,6 @@ pub struct Migration; #[async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - create_postgres_enum_for_active_enum!(manager, CronSourceEnum, CronSource::Subscription) - .await?; - create_postgres_enum_for_active_enum!( manager, CronStatusEnum, @@ -37,11 +34,6 @@ impl MigrationTrait for Migration { table_auto_z(Cron::Table) .col(pk_auto(Cron::Id)) .col(string(Cron::CronExpr)) - .col(enumeration( - Cron::CronSource, - CronSourceEnum, - CronSource::iden_values(), - )) .col(integer_null(Cron::SubscriberId)) .col(integer_null(Cron::SubscriptionId)) .col(timestamp_with_time_zone_null(Cron::NextRun)) @@ -59,13 +51,14 @@ impl MigrationTrait for Migration { CronStatusEnum, CronStatus::iden_values(), )) + .col(json_binary_null(Cron::SubscriberTask)) .foreign_key( ForeignKey::create() .name("fk_cron_subscriber_id") .from(Cron::Table, Cron::SubscriberId) .to(Subscribers::Table, Subscribers::Id) .on_delete(ForeignKeyAction::Cascade) - .on_update(ForeignKeyAction::Cascade), + .on_update(ForeignKeyAction::Restrict), ) .foreign_key( ForeignKey::create() @@ -73,7 +66,7 @@ impl MigrationTrait for Migration { .from(Cron::Table, Cron::SubscriptionId) .to(Subscriptions::Table, Subscriptions::Id) .on_delete(ForeignKeyAction::Cascade) - .on_update(ForeignKeyAction::Cascade), + .on_update(ForeignKeyAction::Restrict), ) .to_owned(), ) @@ -83,17 +76,6 @@ impl MigrationTrait for Migration { .create_postgres_auto_update_ts_trigger_for_col(Cron::Table, GeneralIds::UpdatedAt) .await?; - manager - .create_index( - IndexCreateStatement::new() - .if_not_exists() - .name("idx_cron_cron_source") - .table(Cron::Table) - .col(Cron::CronSource) - .to_owned(), - ) - .await?; - manager .create_index( IndexCreateStatement::new() @@ -107,6 +89,32 @@ impl MigrationTrait for Migration { let db = manager.get_connection(); + db.execute_unprepared(&format!( + r#"CREATE OR REPLACE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$ + BEGIN + IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscriber_id ? (@.type() == "number")') THEN + NEW.{subscriber_id} = (NEW.{subscriber_task} ->> 'subscriber_id')::integer; + END IF; + IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscription_id ? (@.type() == "number")') THEN + NEW.{subscription_id} = (NEW.{subscriber_task} ->> 'subscription_id')::integer; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql;"#, + subscriber_task = &Cron::SubscriberTask.to_string(), + subscriber_id = &Cron::SubscriberId.to_string(), + subscription_id = &Cron::SubscriptionId.to_string(), + )).await?; + + db.execute_unprepared(&format!( + r#"CREATE OR REPLACE TRIGGER {SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME} + BEFORE INSERT OR UPDATE ON {table} + FOR EACH ROW + EXECUTE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}();"#, + table = &Cron::Table.to_string(), + )) + .await?; + db.execute_unprepared(&format!( r#"CREATE OR REPLACE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}() RETURNS trigger AS $$ BEGIN @@ -150,7 +158,7 @@ impl MigrationTrait for Migration { .await?; db.execute_unprepared(&format!( - r#"CREATE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} + r#"CREATE OR REPLACE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} AFTER INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, @@ -265,10 +273,6 @@ impl MigrationTrait for Migration { ) .await?; - manager - .drop_postgres_enum_for_active_enum(CronSourceEnum) - .await?; - manager .drop_postgres_enum_for_active_enum(CronStatusEnum) .await?; diff --git a/apps/recorder/src/models/auth.rs b/apps/recorder/src/models/auth.rs index a916145..63b77b5 100644 --- a/apps/recorder/src/models/auth.rs +++ b/apps/recorder/src/models/auth.rs @@ -64,7 +64,9 @@ impl Model { .one(db) .await? .ok_or_else(|| { - RecorderError::from_model_not_found_detail("auth", format!("pid {pid} not found")) + RecorderError::from_entity_not_found_detail::(format!( + "pid {pid} not found" + )) })?; Ok(subscriber_auth) } diff --git a/apps/recorder/src/models/cron/core.rs b/apps/recorder/src/models/cron/core.rs index 2324876..bbca05a 100644 --- a/apps/recorder/src/models/cron/core.rs +++ b/apps/recorder/src/models/cron/core.rs @@ -1,3 +1,5 @@ +use serde::{Deserialize, Serialize}; + pub const CRON_DUE_EVENT: &str = "cron_due"; pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str = @@ -7,3 +9,15 @@ pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_d pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str = "notify_due_cron_when_mutating_trigger"; +pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME: &str = "setup_cron_extra_foreign_keys"; +pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME: &str = + "setup_cron_extra_foreign_keys_trigger"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CronCreateOptions { + pub cron_expr: String, + pub priority: Option, + pub timeout_ms: Option, + pub max_attempts: Option, + pub enabled: Option, +} diff --git a/apps/recorder/src/models/cron/mod.rs b/apps/recorder/src/models/cron/mod.rs index dc9b3a5..8588d29 100644 --- a/apps/recorder/src/models/cron/mod.rs +++ b/apps/recorder/src/models/cron/mod.rs @@ -3,8 +3,9 @@ mod registry; pub use core::{ CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, - CRON_DUE_EVENT, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, - NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + CRON_DUE_EVENT, CronCreateOptions, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, + NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, + SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, }; use async_trait::async_trait; @@ -17,21 +18,7 @@ use sea_orm::{ }; use serde::{Deserialize, Serialize}; -use crate::{ - app::AppContextTrait, - errors::{RecorderError, RecorderResult}, - models::subscriptions::{self}, -}; - -#[derive( - Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, -)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_source")] -#[serde(rename_all = "snake_case")] -pub enum CronSource { - #[sea_orm(string_value = "subscription")] - Subscription, -} +use crate::{app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks}; #[derive( Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, @@ -58,7 +45,6 @@ pub struct Model { pub updated_at: DateTimeUtc, #[sea_orm(primary_key)] pub id: i32, - pub cron_source: CronSource, pub subscriber_id: Option, pub subscription_id: Option, pub cron_expr: String, @@ -67,6 +53,7 @@ pub struct Model { pub last_error: Option, pub locked_by: Option, pub locked_at: Option, + #[sea_orm(default_expr = "5000")] pub timeout_ms: i32, #[sea_orm(default_expr = "0")] pub attempts: i32, @@ -77,6 +64,7 @@ pub struct Model { pub status: CronStatus, #[sea_orm(default_expr = "true")] pub enabled: bool, + pub subscriber_task: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -119,6 +107,38 @@ pub enum RelatedEntity { Subscription, } +impl ActiveModel { + pub fn from_subscriber_task( + subscriber_task: subscriber_tasks::SubscriberTask, + cron_options: CronCreateOptions, + ) -> RecorderResult { + let mut active_model = Self { + next_run: Set(Some(Model::calculate_next_run(&cron_options.cron_expr)?)), + cron_expr: Set(cron_options.cron_expr), + subscriber_task: Set(Some(subscriber_task)), + ..Default::default() + }; + + if let Some(priority) = cron_options.priority { + active_model.priority = Set(priority); + } + + if let Some(timeout_ms) = cron_options.timeout_ms { + active_model.timeout_ms = Set(timeout_ms); + } + + if let Some(max_attempts) = cron_options.max_attempts { + active_model.max_attempts = Set(max_attempts); + } + + if let Some(enabled) = cron_options.enabled { + active_model.enabled = Set(enabled); + } + + Ok(active_model) + } +} + #[async_trait] impl ActiveModelBehavior for ActiveModel {} @@ -196,19 +216,13 @@ impl Model { } async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { - match self.cron_source { - CronSource::Subscription => { - let subscription_id = self.subscription_id.unwrap_or_else(|| { - unreachable!("Subscription cron must have a subscription id") - }); - - let subscription = subscriptions::Entity::find_by_id(subscription_id) - .one(ctx.db()) - .await? - .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; - - subscription.exec_cron(ctx).await?; - } + if let Some(subscriber_task) = self.subscriber_task.as_ref() { + let task_service = ctx.task(); + task_service + .add_subscriber_task(subscriber_task.clone()) + .await?; + } else { + unimplemented!("Cron without subscriber task is not supported now"); } Ok(()) @@ -217,7 +231,7 @@ impl Model { async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { let db = ctx.db(); - let next_run = self.calculate_next_run(&self.cron_expr)?; + let next_run = Self::calculate_next_run(&self.cron_expr)?; ActiveModel { id: Set(self.id), @@ -250,7 +264,7 @@ impl Model { let next_run = if should_retry { Some(Utc::now() + chrono::Duration::seconds(5)) } else { - Some(self.calculate_next_run(&self.cron_expr)?) + Some(Self::calculate_next_run(&self.cron_expr)?) }; ActiveModel { @@ -295,7 +309,7 @@ impl Model { Ok(()) } - fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult> { + pub fn calculate_next_run(cron_expr: &str) -> RecorderResult> { let cron_expr = Cron::new(cron_expr).parse()?; let next = cron_expr.find_next_occurrence(&Utc::now(), false)?; diff --git a/apps/recorder/src/models/feeds/mod.rs b/apps/recorder/src/models/feeds/mod.rs index 150af3a..a3cd329 100644 --- a/apps/recorder/src/models/feeds/mod.rs +++ b/apps/recorder/src/models/feeds/mod.rs @@ -122,7 +122,7 @@ impl Model { .filter(Column::FeedType.eq(FeedType::Rss)) .one(db) .await? - .ok_or(RecorderError::from_model_not_found("Feed"))?; + .ok_or(RecorderError::from_entity_not_found::())?; let feed = Feed::from_model(ctx, feed_model).await?; diff --git a/apps/recorder/src/models/feeds/registry.rs b/apps/recorder/src/models/feeds/registry.rs index 73acf60..05086a4 100644 --- a/apps/recorder/src/models/feeds/registry.rs +++ b/apps/recorder/src/models/feeds/registry.rs @@ -44,7 +44,7 @@ impl Feed { .await?; (subscription, episodes) } else { - return Err(RecorderError::from_model_not_found("Subscription")); + return Err(RecorderError::from_entity_not_found::()); }; Ok(Feed::SubscritpionEpisodes( diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index 981522f..f344f61 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -131,7 +131,7 @@ impl Model { let db = ctx.db(); let subscriber = Entity::find_by_id(id).one(db).await?.ok_or_else(|| { - RecorderError::from_model_not_found_detail("subscribers", format!("id {id} not found")) + RecorderError::from_entity_not_found_detail::(format!("id {id} not found")) })?; Ok(subscriber) } diff --git a/apps/recorder/src/models/subscriptions/mod.rs b/apps/recorder/src/models/subscriptions/mod.rs index ce385e1..53f7b99 100644 --- a/apps/recorder/src/models/subscriptions/mod.rs +++ b/apps/recorder/src/models/subscriptions/mod.rs @@ -11,10 +11,7 @@ pub use registry::{ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -use crate::{ - app::AppContextTrait, - errors::{RecorderError, RecorderResult}, -}; +use crate::{app::AppContextTrait, errors::RecorderResult}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscriptions")] @@ -155,50 +152,6 @@ impl ActiveModelBehavior for ActiveModel {} impl ActiveModel {} impl Model { - pub async fn toggle_with_ids( - ctx: &dyn AppContextTrait, - ids: impl Iterator, - enabled: bool, - ) -> RecorderResult<()> { - let db = ctx.db(); - Entity::update_many() - .col_expr(Column::Enabled, Expr::value(enabled)) - .filter(Column::Id.is_in(ids)) - .exec(db) - .await?; - Ok(()) - } - - pub async fn delete_with_ids( - ctx: &dyn AppContextTrait, - ids: impl Iterator, - ) -> RecorderResult<()> { - let db = ctx.db(); - Entity::delete_many() - .filter(Column::Id.is_in(ids)) - .exec(db) - .await?; - Ok(()) - } - - pub async fn find_by_id_and_subscriber_id( - ctx: &dyn AppContextTrait, - subscriber_id: i32, - subscription_id: i32, - ) -> RecorderResult { - let db = ctx.db(); - let subscription_model = Entity::find_by_id(subscription_id) - .one(db) - .await? - .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; - - if subscription_model.subscriber_id != subscriber_id { - Err(RecorderError::from_model_not_found("Subscription"))?; - } - - Ok(subscription_model) - } - pub async fn exec_cron(&self, _ctx: &dyn AppContextTrait) -> RecorderResult<()> { todo!() } diff --git a/apps/recorder/src/models/subscriptions/registry.rs b/apps/recorder/src/models/subscriptions/registry.rs index 432225a..66ec2ec 100644 --- a/apps/recorder/src/models/subscriptions/registry.rs +++ b/apps/recorder/src/models/subscriptions/registry.rs @@ -1,129 +1,147 @@ use std::{fmt::Debug, sync::Arc}; -use async_trait::async_trait; use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter}; use serde::{Deserialize, Serialize}; use crate::{ - app::AppContextTrait, - errors::{RecorderError, RecorderResult}, + errors::RecorderResult, extract::mikan::{ MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, }, models::subscriptions::{self, SubscriptionTrait}, }; -#[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, DeriveDisplay, -)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "subscription_category" -)] -#[serde(rename_all = "snake_case")] -pub enum SubscriptionCategory { - #[sea_orm(string_value = "mikan_subscriber")] - MikanSubscriber, - #[sea_orm(string_value = "mikan_season")] - MikanSeason, - #[sea_orm(string_value = "mikan_bangumi")] - MikanBangumi, - #[sea_orm(string_value = "manual")] - Manual, -} - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "category")] -pub enum Subscription { - #[serde(rename = "mikan_subscriber")] - MikanSubscriber(MikanSubscriberSubscription), - #[serde(rename = "mikan_season")] - MikanSeason(MikanSeasonSubscription), - #[serde(rename = "mikan_bangumi")] - MikanBangumi(MikanBangumiSubscription), - #[serde(rename = "manual")] - Manual, -} - -impl Subscription { - pub fn category(&self) -> SubscriptionCategory { - match self { - Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber, - Self::MikanSeason(_) => SubscriptionCategory::MikanSeason, - Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi, - Self::Manual => SubscriptionCategory::Manual, - } - } -} - -#[async_trait] -impl SubscriptionTrait for Subscription { - fn get_subscriber_id(&self) -> i32 { - match self { - Self::MikanSubscriber(subscription) => subscription.get_subscriber_id(), - Self::MikanSeason(subscription) => subscription.get_subscriber_id(), - Self::MikanBangumi(subscription) => subscription.get_subscriber_id(), - Self::Manual => unreachable!(), - } - } - - fn get_subscription_id(&self) -> i32 { - match self { - Self::MikanSubscriber(subscription) => subscription.get_subscription_id(), - Self::MikanSeason(subscription) => subscription.get_subscription_id(), - Self::MikanBangumi(subscription) => subscription.get_subscription_id(), - Self::Manual => unreachable!(), - } - } - - async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::MikanSubscriber(subscription) => subscription.sync_feeds_incremental(ctx).await, - Self::MikanSeason(subscription) => subscription.sync_feeds_incremental(ctx).await, - Self::MikanBangumi(subscription) => subscription.sync_feeds_incremental(ctx).await, - Self::Manual => Ok(()), - } - } - - async fn sync_feeds_full(&self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::MikanSubscriber(subscription) => subscription.sync_feeds_full(ctx).await, - Self::MikanSeason(subscription) => subscription.sync_feeds_full(ctx).await, - Self::MikanBangumi(subscription) => subscription.sync_feeds_full(ctx).await, - Self::Manual => Ok(()), - } - } - - async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::MikanSubscriber(subscription) => subscription.sync_sources(ctx).await, - Self::MikanSeason(subscription) => subscription.sync_sources(ctx).await, - Self::MikanBangumi(subscription) => subscription.sync_sources(ctx).await, - Self::Manual => Ok(()), - } - } - - fn try_from_model(model: &subscriptions::Model) -> RecorderResult { - match model.category { - SubscriptionCategory::MikanSubscriber => { - MikanSubscriberSubscription::try_from_model(model).map(Self::MikanSubscriber) +macro_rules! register_subscription_type { + ( + subscription_category_enum: { + $(#[$subscription_category_enum_meta:meta])* + pub enum $type_enum_name:ident { + $( + $(#[$variant_meta:meta])* + $variant:ident => $string_value:literal + ),* $(,)? } - SubscriptionCategory::MikanSeason => { - MikanSeasonSubscription::try_from_model(model).map(Self::MikanSeason) + }$(,)? + subscription_enum: { + $(#[$subscription_enum_meta:meta])* + pub enum $subscription_enum_name:ident { + $( + $subscription_variant:ident($subscription_type:ty) + ),* $(,)? } - SubscriptionCategory::MikanBangumi => { - MikanBangumiSubscription::try_from_model(model).map(Self::MikanBangumi) + } + ) => { + $(#[$subscription_category_enum_meta])* + #[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "subscription_category" + )] + pub enum $type_enum_name { + $( + $(#[$variant_meta])* + #[serde(rename = $string_value)] + #[sea_orm(string_value = $string_value)] + $variant, + )* + } + + + $(#[$subscription_enum_meta])* + #[serde(tag = "category")] + pub enum $subscription_enum_name { + $( + #[serde(rename = $string_value)] + $subscription_variant($subscription_type), + )* + } + + impl $subscription_enum_name { + pub fn category(&self) -> $type_enum_name { + match self { + $(Self::$subscription_variant(_) => $type_enum_name::$variant,)* + } } - SubscriptionCategory::Manual => Ok(Self::Manual), + } + + #[async_trait::async_trait] + impl $crate::models::subscriptions::SubscriptionTrait for $subscription_enum_name { + fn get_subscriber_id(&self) -> i32 { + match self { + $(Self::$subscription_variant(subscription) => subscription.get_subscriber_id(),)* + } + } + + fn get_subscription_id(&self) -> i32 { + match self { + $(Self::$subscription_variant(subscription) => subscription.get_subscription_id(),)* + } + } + + async fn sync_feeds_incremental(&self, ctx: Arc) -> $crate::errors::RecorderResult<()> { + match self { + $(Self::$subscription_variant(subscription) => subscription.sync_feeds_incremental(ctx).await,)* + } + } + + async fn sync_feeds_full(&self, ctx: Arc) -> $crate::errors::RecorderResult<()> { + match self { + $(Self::$subscription_variant(subscription) => subscription.sync_feeds_full(ctx).await,)* + } + } + + async fn sync_sources(&self, ctx: Arc) -> $crate::errors::RecorderResult<()> { + match self { + $(Self::$subscription_variant(subscription) => subscription.sync_sources(ctx).await,)* + } + } + + fn try_from_model(model: &subscriptions::Model) -> RecorderResult { + + match model.category { + $($type_enum_name::$variant => { + <$subscription_type as $crate::models::subscriptions::SubscriptionTrait>::try_from_model(model).map(Self::$subscription_variant) + })* + } + } + } + + impl TryFrom<&$crate::models::subscriptions::Model> for $subscription_enum_name { + type Error = $crate::errors::RecorderError; + + fn try_from(model: &$crate::models::subscriptions::Model) -> Result { + Self::try_from_model(model) + } + } + }; +} + +register_subscription_type! { + subscription_category_enum: { + #[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, + )] + pub enum SubscriptionCategory { + MikanSubscriber => "mikan_subscriber", + MikanSeason => "mikan_season", + MikanBangumi => "mikan_bangumi", + } + } + subscription_enum: { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub enum Subscription { + MikanSubscriber(MikanSubscriberSubscription), + MikanSeason(MikanSeasonSubscription), + MikanBangumi(MikanBangumiSubscription) } } } - -impl TryFrom<&subscriptions::Model> for Subscription { - type Error = RecorderError; - - fn try_from(model: &subscriptions::Model) -> Result { - Self::try_from_model(model) - } -} diff --git a/apps/recorder/src/task/core.rs b/apps/recorder/src/task/core.rs index e568bb0..4da9e44 100644 --- a/apps/recorder/src/task/core.rs +++ b/apps/recorder/src/task/core.rs @@ -1,34 +1,56 @@ use std::sync::Arc; -use futures::Stream; -use serde::{Serialize, de::DeserializeOwned}; +use async_trait::async_trait; +use futures::{Stream, StreamExt, pin_mut}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; use crate::{app::AppContextTrait, errors::RecorderResult}; pub const SYSTEM_TASK_APALIS_NAME: &str = "system_task"; pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task"; -#[async_trait::async_trait] +#[async_trait] pub trait AsyncTaskTrait: Serialize + DeserializeOwned + Sized { async fn run_async(self, ctx: Arc) -> RecorderResult<()>; - - async fn run(self, ctx: Arc) -> RecorderResult<()> { - self.run_async(ctx).await?; - - Ok(()) - } } -#[async_trait::async_trait] -pub trait StreamTaskTrait: Serialize + DeserializeOwned + Sized { +pub trait StreamTaskTrait { type Yield: Serialize + DeserializeOwned + Send; fn run_stream( self, ctx: Arc, ) -> impl Stream> + Send; +} - async fn run(self, _ctx: Arc) -> RecorderResult<()> { - unimplemented!() +#[async_trait] +impl AsyncTaskTrait for T +where + T: StreamTaskTrait + Serialize + DeserializeOwned + Sized + Send, +{ + async fn run_async(self, _ctx: Arc) -> RecorderResult<()> { + let s = self.run_stream(_ctx); + + pin_mut!(s); + + while let Some(item) = s.next().await { + item?; + } + + Ok(()) } } + +pub trait SubscriberTaskTrait: AsyncTaskTrait { + fn get_subscriber_id(&self) -> i32; + + fn get_cron_id(&self) -> Option; +} + +pub trait SystemTaskTrait: AsyncTaskTrait {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] +pub struct SubscriberTaskBase { + pub subscriber_id: i32, + pub cron_id: Option, +} diff --git a/apps/recorder/src/task/mod.rs b/apps/recorder/src/task/mod.rs index f530518..3d593f2 100644 --- a/apps/recorder/src/task/mod.rs +++ b/apps/recorder/src/task/mod.rs @@ -6,6 +6,7 @@ mod service; pub use core::{ AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, StreamTaskTrait, + SubscriberTaskBase, SubscriberTaskTrait, SystemTaskTrait, }; pub use config::TaskConfig; diff --git a/apps/recorder/src/task/registry/mod.rs b/apps/recorder/src/task/registry/mod.rs index e3c5750..adc9622 100644 --- a/apps/recorder/src/task/registry/mod.rs +++ b/apps/recorder/src/task/registry/mod.rs @@ -1,18 +1,12 @@ -mod media; mod subscriber; -mod subscription; mod system; -pub use media::OptimizeImageTask; pub use subscriber::{ SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, - SubscriberTaskTypeVariantIter, -}; -pub use subscription::{ - SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, - SyncOneSubscriptionSourcesTask, + SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask, + SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, }; pub use system::{ - SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, + OptimizeImageTask, SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, SystemTaskTypeVariantIter, }; diff --git a/apps/recorder/src/task/registry/subscriber.rs b/apps/recorder/src/task/registry/subscriber.rs deleted file mode 100644 index b1be78b..0000000 --- a/apps/recorder/src/task/registry/subscriber.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::sync::Arc; - -use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; -use serde::{Deserialize, Serialize}; - -use crate::{ - app::AppContextTrait, - errors::{RecorderError, RecorderResult}, - models::subscriptions::SubscriptionTrait, - task::{ - AsyncTaskTrait, - registry::{ - SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, - SyncOneSubscriptionSourcesTask, - }, - }, -}; - -#[derive( - Clone, - Debug, - Serialize, - Deserialize, - PartialEq, - Eq, - Copy, - DeriveActiveEnum, - DeriveDisplay, - EnumIter, -)] -#[sea_orm(rs_type = "String", db_type = "Text")] -pub enum SubscriberTaskType { - #[serde(rename = "sync_one_subscription_feeds_incremental")] - #[sea_orm(string_value = "sync_one_subscription_feeds_incremental")] - SyncOneSubscriptionFeedsIncremental, - #[serde(rename = "sync_one_subscription_feeds_full")] - #[sea_orm(string_value = "sync_one_subscription_feeds_full")] - SyncOneSubscriptionFeedsFull, - #[serde(rename = "sync_one_subscription_sources")] - #[sea_orm(string_value = "sync_one_subscription_sources")] - SyncOneSubscriptionSources, -} - -impl TryFrom<&SubscriberTask> for serde_json::Value { - type Error = RecorderError; - - fn try_from(value: &SubscriberTask) -> Result { - let json_value = serde_json::to_value(value)?; - Ok(match json_value { - serde_json::Value::Object(mut map) => { - map.remove("task_type"); - serde_json::Value::Object(map) - } - _ => { - unreachable!("subscriber task must be an json object"); - } - }) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] -#[serde(tag = "task_type")] -pub enum SubscriberTask { - #[serde(rename = "sync_one_subscription_feeds_incremental")] - SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), - #[serde(rename = "sync_one_subscription_feeds_full")] - SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), - #[serde(rename = "sync_one_subscription_sources")] - SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), -} - -impl SubscriberTask { - pub fn get_subscriber_id(&self) -> i32 { - match self { - Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(), - Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(), - Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(), - } - } - - pub async fn run(self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await, - Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await, - Self::SyncOneSubscriptionSources(task) => task.run(ctx).await, - } - } - - pub fn task_type(&self) -> SubscriberTaskType { - match self { - Self::SyncOneSubscriptionFeedsIncremental(_) => { - SubscriberTaskType::SyncOneSubscriptionFeedsIncremental - } - Self::SyncOneSubscriptionFeedsFull(_) => { - SubscriberTaskType::SyncOneSubscriptionFeedsFull - } - Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources, - } - } -} diff --git a/apps/recorder/src/task/registry/subscriber/base.rs b/apps/recorder/src/task/registry/subscriber/base.rs new file mode 100644 index 0000000..b3ab4a6 --- /dev/null +++ b/apps/recorder/src/task/registry/subscriber/base.rs @@ -0,0 +1,29 @@ +macro_rules! register_subscriber_task_type { + ( + $(#[$type_meta:meta])* + $task_vis:vis struct $task_name:ident { + $($(#[$field_meta:meta])* pub $field_name:ident: $field_type:ty),* $(,)? + } + ) => { + $(#[$type_meta])* + #[derive(typed_builder::TypedBuilder)] + $task_vis struct $task_name { + $($(#[$field_meta])* pub $field_name: $field_type,)* + pub subscriber_id: i32, + #[builder(default = None)] + pub cron_id: Option, + } + + impl $crate::task::SubscriberTaskTrait for $task_name { + fn get_subscriber_id(&self) -> i32 { + self.subscriber_id + } + + fn get_cron_id(&self) -> Option { + self.cron_id + } + } + } +} + +pub(crate) use register_subscriber_task_type; diff --git a/apps/recorder/src/task/registry/subscriber/mod.rs b/apps/recorder/src/task/registry/subscriber/mod.rs new file mode 100644 index 0000000..186de04 --- /dev/null +++ b/apps/recorder/src/task/registry/subscriber/mod.rs @@ -0,0 +1,140 @@ +mod base; +mod subscription; + +use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; +use serde::{Deserialize, Serialize}; +pub use subscription::{ + SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, + SyncOneSubscriptionSourcesTask, +}; + +macro_rules! register_subscriber_task_types { + ( + task_type_enum: { + $(#[$type_enum_meta:meta])* + pub enum $type_enum_name:ident { + $( + $(#[$variant_meta:meta])* + $variant:ident => $string_value:literal + ),* $(,)? + } + }, + task_enum: { + $(#[$task_enum_meta:meta])* + pub enum $task_enum_name:ident { + $( + $task_variant:ident($task_type:ty) + ),* $(,)? + } + } + ) => { + $(#[$type_enum_meta])* + #[sea_orm(rs_type = "String", db_type = "Text")] + pub enum $type_enum_name { + $( + $(#[$variant_meta])* + #[serde(rename = $string_value)] + #[sea_orm(string_value = $string_value)] + $variant, + )* + } + + + $(#[$task_enum_meta])* + #[serde(tag = "task_type")] + pub enum $task_enum_name { + $( + $task_variant($task_type), + )* + } + + impl TryFrom<$task_enum_name> for serde_json::Value { + type Error = $crate::errors::RecorderError; + + fn try_from(value: $task_enum_name) -> Result { + let json_value = serde_json::to_value(value)?; + Ok(match json_value { + serde_json::Value::Object(mut map) => { + map.remove("task_type"); + serde_json::Value::Object(map) + } + _ => { + unreachable!("subscriber task must be an json object"); + } + }) + } + } + + impl $task_enum_name { + pub fn task_type(&self) -> $type_enum_name { + match self { + $(Self::$task_variant(_) => $type_enum_name::$variant,)* + } + } + } + + #[async_trait::async_trait] + impl $crate::task::AsyncTaskTrait for $task_enum_name { + async fn run_async(self, ctx: std::sync::Arc) -> $crate::errors::RecorderResult<()> { + match self { + $(Self::$task_variant(t) => + <$task_type as $crate::task::AsyncTaskTrait>::run_async(t, ctx).await,)* + } + } + } + + impl $crate::task::SubscriberTaskTrait for $task_enum_name { + fn get_subscriber_id(&self) -> i32 { + match self { + $(Self::$task_variant(t) => + <$task_type as $crate::task::SubscriberTaskTrait>::get_subscriber_id(t),)* + } + } + + fn get_cron_id(&self) -> Option { + match self { + $(Self::$task_variant(t) => + <$task_type as $crate::task::SubscriberTaskTrait>::get_cron_id(t),)* + } + } + } + + $( + impl From<$task_type> for $task_enum_name { + fn from(task: $task_type) -> Self { + Self::$task_variant(task) + } + } + )* + }; +} + +register_subscriber_task_types!( + task_type_enum: { + #[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, + )] + pub enum SubscriberTaskType { + SyncOneSubscriptionFeedsIncremental => "sync_one_subscription_feeds_incremental", + SyncOneSubscriptionFeedsFull => "sync_one_subscription_feeds_full", + SyncOneSubscriptionSources => "sync_one_subscription_sources" + } + }, + task_enum: { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] + pub enum SubscriberTask { + SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), + SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), + SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), + } + } +); diff --git a/apps/recorder/src/task/registry/subscriber/subscription.rs b/apps/recorder/src/task/registry/subscriber/subscription.rs new file mode 100644 index 0000000..8f9ef44 --- /dev/null +++ b/apps/recorder/src/task/registry/subscriber/subscription.rs @@ -0,0 +1,67 @@ +use sea_orm::prelude::*; +use serde::{Deserialize, Serialize}; + +use super::base::register_subscriber_task_type; +use crate::{errors::RecorderResult, models::subscriptions::SubscriptionTrait}; + +macro_rules! register_subscription_task_type { + ( + $(#[$type_meta:meta])* pub struct $task_name:ident { + $($(#[$field_meta:meta])* pub $field_name:ident: $field_type:ty),* $(,)? + } => async |$subscription_param:ident, $ctx_param:ident| -> $task_return_type:ty $method_body:block + ) => { + register_subscriber_task_type! { + $(#[$type_meta])* + pub struct $task_name { + $($(#[$field_meta])* pub $field_name: $field_type,)* + pub subscription_id: i32, + } + } + + #[async_trait::async_trait] + impl $crate::task::AsyncTaskTrait for $task_name { + async fn run_async(self, ctx: std::sync::Arc) -> $task_return_type { + use $crate::models::subscriptions::{ + Entity, Column, Subscription, + }; + let subscription_model = Entity::find() + .filter(Column::Id.eq(self.subscription_id)) + .filter(Column::SubscriberId.eq(self.subscriber_id)) + .one(ctx.db()) + .await? + .ok_or_else(|| $crate::errors::RecorderError::from_entity_not_found::())?; + + let $subscription_param = Subscription::try_from_model(&subscription_model)?; + let $ctx_param = ctx; + $method_body + } + } + } +} + +register_subscription_task_type! { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub struct SyncOneSubscriptionFeedsIncrementalTask { + } => async |subscription, ctx| -> RecorderResult<()> { + subscription.sync_feeds_incremental(ctx).await?; + Ok(()) + } +} + +register_subscription_task_type! { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub struct SyncOneSubscriptionFeedsFullTask { + } => async |subscription, ctx| -> RecorderResult<()> { + subscription.sync_feeds_full(ctx).await?; + Ok(()) + } +} + +register_subscription_task_type! { + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] + pub struct SyncOneSubscriptionSourcesTask { + } => async |subscription, ctx| -> RecorderResult<()> { + subscription.sync_sources(ctx).await?; + Ok(()) + } +} diff --git a/apps/recorder/src/task/registry/subscription.rs b/apps/recorder/src/task/registry/subscription.rs deleted file mode 100644 index eae87cc..0000000 --- a/apps/recorder/src/task/registry/subscription.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::sync::Arc; - -use sea_orm::prelude::*; -use serde::{Deserialize, Serialize}; - -use crate::{ - app::AppContextTrait, - errors::RecorderResult, - models::subscriptions::{self, SubscriptionTrait}, - task::AsyncTaskTrait, -}; - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct SyncOneSubscriptionFeedsIncrementalTask(pub subscriptions::Subscription); - -impl From for SyncOneSubscriptionFeedsIncrementalTask { - fn from(subscription: subscriptions::Subscription) -> Self { - Self(subscription) - } -} - -#[async_trait::async_trait] -impl AsyncTaskTrait for SyncOneSubscriptionFeedsIncrementalTask { - async fn run_async(self, ctx: Arc) -> RecorderResult<()> { - self.0.sync_feeds_incremental(ctx).await?; - Ok(()) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct SyncOneSubscriptionFeedsFullTask(pub subscriptions::Subscription); - -impl From for SyncOneSubscriptionFeedsFullTask { - fn from(subscription: subscriptions::Subscription) -> Self { - Self(subscription) - } -} - -#[async_trait::async_trait] -impl AsyncTaskTrait for SyncOneSubscriptionFeedsFullTask { - async fn run_async(self, ctx: Arc) -> RecorderResult<()> { - self.0.sync_feeds_full(ctx).await?; - Ok(()) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct SyncOneSubscriptionSourcesTask(pub subscriptions::Subscription); - -#[async_trait::async_trait] -impl AsyncTaskTrait for SyncOneSubscriptionSourcesTask { - async fn run_async(self, ctx: Arc) -> RecorderResult<()> { - self.0.sync_sources(ctx).await?; - Ok(()) - } -} - -impl From for SyncOneSubscriptionSourcesTask { - fn from(subscription: subscriptions::Subscription) -> Self { - Self(subscription) - } -} diff --git a/apps/recorder/src/task/registry/system.rs b/apps/recorder/src/task/registry/system.rs deleted file mode 100644 index 88d7b86..0000000 --- a/apps/recorder/src/task/registry/system.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::sync::Arc; - -use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; -use serde::{Deserialize, Serialize}; - -use crate::{ - app::AppContextTrait, - errors::RecorderResult, - task::{AsyncTaskTrait, registry::media::OptimizeImageTask}, -}; - -#[derive( - Clone, - Debug, - Serialize, - Deserialize, - PartialEq, - Eq, - Copy, - DeriveActiveEnum, - DeriveDisplay, - EnumIter, -)] -#[sea_orm(rs_type = "String", db_type = "Text")] -pub enum SystemTaskType { - #[serde(rename = "optimize_image")] - #[sea_orm(string_value = "optimize_image")] - OptimizeImage, -} - -#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)] -pub enum SystemTask { - #[serde(rename = "optimize_image")] - OptimizeImage(OptimizeImageTask), -} - -impl SystemTask { - pub async fn run(self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::OptimizeImage(task) => task.run(ctx).await, - } - } -} diff --git a/apps/recorder/src/task/registry/media.rs b/apps/recorder/src/task/registry/system/media.rs similarity index 100% rename from apps/recorder/src/task/registry/media.rs rename to apps/recorder/src/task/registry/system/media.rs diff --git a/apps/recorder/src/task/registry/system/mod.rs b/apps/recorder/src/task/registry/system/mod.rs new file mode 100644 index 0000000..f248602 --- /dev/null +++ b/apps/recorder/src/task/registry/system/mod.rs @@ -0,0 +1,108 @@ +mod media; + +pub use media::OptimizeImageTask; +use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; +use serde::{Deserialize, Serialize}; + +macro_rules! register_system_task_types { + ( + task_type_enum: { + $(#[$type_enum_meta:meta])* + pub enum $type_enum_name:ident { + $( + $(#[$variant_meta:meta])* + $variant:ident => $string_value:literal + ),* $(,)? + } + }, + task_enum: { + $(#[$task_enum_meta:meta])* + pub enum $task_enum_name:ident { + $( + $task_variant:ident($task_type:ty) + ),* $(,)? + } + } + ) => { + $(#[$type_enum_meta])* + #[sea_orm(rs_type = "String", db_type = "Text")] + pub enum $type_enum_name { + $( + $(#[$variant_meta])* + #[serde(rename = $string_value)] + #[sea_orm(string_value = $string_value)] + $variant, + )* + } + + + $(#[$task_enum_meta])* + #[serde(tag = "task_type")] + pub enum $task_enum_name { + $( + $task_variant($task_type), + )* + } + + impl TryFrom<$task_enum_name> for serde_json::Value { + type Error = $crate::errors::RecorderError; + + fn try_from(value: $task_enum_name) -> Result { + let json_value = serde_json::to_value(value)?; + Ok(match json_value { + serde_json::Value::Object(mut map) => { + map.remove("task_type"); + serde_json::Value::Object(map) + } + _ => { + unreachable!("subscriber task must be an json object"); + } + }) + } + } + + impl $task_enum_name { + pub fn task_type(&self) -> $type_enum_name { + match self { + $(Self::$task_variant(_) => $type_enum_name::$variant,)* + } + } + } + + #[async_trait::async_trait] + impl $crate::task::AsyncTaskTrait for $task_enum_name { + async fn run_async(self, ctx: std::sync::Arc) -> $crate::errors::RecorderResult<()> { + match self { + $(Self::$task_variant(t) => + <$task_type as $crate::task::AsyncTaskTrait>::run_async(t, ctx).await,)* + } + } + } + }; +} + +register_system_task_types! { + task_type_enum: { + #[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, + )] + pub enum SystemTaskType { + OptimizeImage => "optimize_image" + } + }, + task_enum: { + #[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)] + pub enum SystemTask { + OptimizeImage(OptimizeImageTask), + } + } +} diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index da68bbc..1acccf1 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -6,15 +6,16 @@ use apalis_sql::{ context::SqlContext, postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage}, }; -use sea_orm::sqlx::postgres::PgListener; +use sea_orm::{ActiveModelTrait, sqlx::postgres::PgListener}; use tokio::sync::RwLock; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, - models::cron::{self, CRON_DUE_EVENT}, + models::cron::{self, CRON_DUE_EVENT, CronCreateOptions}, task::{ - SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, TaskConfig, + AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, + TaskConfig, config::{default_subscriber_task_workers, default_system_task_workers}, registry::SystemTask, }, @@ -65,7 +66,7 @@ impl TaskService { ) -> RecorderResult<()> { let ctx = data.deref().clone(); - job.run(ctx).await + job.run_async(ctx).await } async fn run_system_task( @@ -73,7 +74,7 @@ impl TaskService { data: Data>, ) -> RecorderResult<()> { let ctx = data.deref().clone(); - job.run(ctx).await + job.run_async(ctx).await } pub async fn retry_subscriber_task(&self, job_id: String) -> RecorderResult<()> { @@ -104,7 +105,6 @@ impl TaskService { pub async fn add_subscriber_task( &self, - _subscriber_id: i32, subscriber_task: SubscriberTask, ) -> RecorderResult { let task_id = { @@ -121,6 +121,18 @@ impl TaskService { Ok(task_id) } + pub async fn add_subscriber_task_cron( + &self, + subscriber_task: SubscriberTask, + cron_options: CronCreateOptions, + ) -> RecorderResult { + let c = cron::ActiveModel::from_subscriber_task(subscriber_task, cron_options)?; + + let c = c.insert(self.ctx.db()).await?; + + Ok(c) + } + pub async fn add_system_task(&self, system_task: SystemTask) -> RecorderResult { let task_id = { let mut storage = self.system_task_storage.write().await;