diff --git a/Cargo.lock b/Cargo.lock index 1e51be3..e78b8a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5217,6 +5217,7 @@ dependencies = [ "clap", "cocoon", "color-eyre", + "convert_case 0.8.0", "ctor", "dotenvy", "downloader", diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 38ff377..c55301b 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -55,6 +55,9 @@ moka = { workspace = true } chrono = { workspace = true } tracing-subscriber = { workspace = true } mockito = { workspace = true } +color-eyre = { workspace = true, optional = true } +inquire = { workspace = true, optional = true } +convert_case = { workspace = true } sea-orm = { version = "1.1", features = [ "sqlx-sqlite", @@ -124,8 +127,6 @@ rust_decimal = "1.37.1" reqwest_cookie_store = "0.8.0" nanoid = "0.4.0" jwtk = "0.4.0" -color-eyre = { workspace = true, optional = true } -inquire = { workspace = true, optional = true } percent-encoding = "2.3.1" diff --git a/apps/recorder/src/graphql/domains/subscriber_tasks.rs b/apps/recorder/src/graphql/domains/subscriber_tasks.rs index f3a1445..4c81bc5 100644 --- a/apps/recorder/src/graphql/domains/subscriber_tasks.rs +++ b/apps/recorder/src/graphql/domains/subscriber_tasks.rs @@ -1,19 +1,95 @@ -use seaography::{Builder as SeaographyBuilder, BuilderContext}; +use std::{ops::Deref, pin::Pin, sync::Arc}; + +use async_graphql::dynamic::{ResolverContext, ValueAccessor}; +use sea_orm::{ + ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr, + sea_query::Query, +}; +use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions}; use crate::{ - graphql::infra::json::restrict_jsonb_filter_input_for_entity, models::subscriber_tasks, + app::AppContextTrait, + errors::{RecorderError, RecorderResult}, + graphql::{ + domains::subscribers::restrict_subscriber_for_entity, + infra::{ + custom::generate_custom_entity_delete_mutation_field, + json::{convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity}, + }, + }, + models::subscriber_tasks, + task::ApalisJob, }; +pub fn register_subscriber_tasks_entity_mutations(builder: &mut SeaographyBuilder) { + let context = builder.context; + let delete_mutation = generate_custom_entity_delete_mutation_field::( + context, + Arc::new( + |resolver_ctx: &ResolverContext<'_>, + app_ctx: Arc, + filters: Option>| + -> Pin>> + Send>> { + let filters_condition = get_filter_conditions::( + resolver_ctx, + context, + filters, + ); + Box::pin(async move { + let db = app_ctx.db(); + + let select_subquery = subscriber_tasks::Entity::find() + .select_only() + .column(subscriber_tasks::Column::Id) + .filter(filters_condition); + + let delete_query = Query::delete() + .from_table(ApalisJob::Table) + .and_where( + Expr::col(ApalisJob::Id).in_subquery(select_subquery.into_query()), + ) + .to_owned(); + + let db_backend = db.deref().get_database_backend(); + let delete_statement = db_backend.build(&delete_query); + let result = db.execute(delete_statement).await?; + + Ok::, RecorderError>(Some(result.rows_affected() as i32)) + }) + as Pin>> + Send>> + }, + ), + ); + builder.mutations.push(delete_mutation); +} + pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) { + restrict_subscriber_for_entity::( + context, + &subscriber_tasks::Column::SubscriberId, + ); restrict_jsonb_filter_input_for_entity::( context, &subscriber_tasks::Column::Job, ); + convert_jsonb_output_case_for_entity::( + context, + &subscriber_tasks::Column::Job, + ); } pub fn register_subscriber_tasks_to_schema_builder( mut builder: SeaographyBuilder, ) -> SeaographyBuilder { + builder.register_entity::( + ::iter() + .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) + .collect(), + ); + builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn); + builder = + builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn); builder.register_enumeration::(); + builder.register_enumeration::(); builder } diff --git a/apps/recorder/src/graphql/infra/custom.rs b/apps/recorder/src/graphql/infra/custom.rs new file mode 100644 index 0000000..403aff2 --- /dev/null +++ b/apps/recorder/src/graphql/infra/custom.rs @@ -0,0 +1,79 @@ +use std::{pin::Pin, sync::Arc}; + +use async_graphql::dynamic::{ + Field, FieldFuture, InputValue, ResolverContext, TypeRef, ValueAccessor, +}; +use sea_orm::EntityTrait; +use seaography::{ + BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder, FilterInputBuilder, + GuardAction, +}; + +use crate::{app::AppContextTrait, errors::RecorderResult}; + +pub type DeleteMutationFn = Arc< + dyn Fn( + &ResolverContext<'_>, + Arc, + Option>, + ) -> Pin>> + Send>> + + Send + + Sync, +>; + +pub fn generate_custom_entity_delete_mutation_field( + builder_context: &'static BuilderContext, + mutation_fn: DeleteMutationFn, +) -> Field +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_filter_input_builder = FilterInputBuilder { + context: builder_context, + }; + let entity_object_builder = EntityObjectBuilder { + context: builder_context, + }; + let entity_delete_mutation_builder = EntityDeleteMutationBuilder { + context: builder_context, + }; + let object_name: String = entity_object_builder.type_name::(); + + let context = builder_context; + + let guard = builder_context.guards.entity_guards.get(&object_name); + + Field::new( + entity_delete_mutation_builder.type_name::(), + TypeRef::named_nn(TypeRef::INT), + move |ctx| { + let mutation_fn = mutation_fn.clone(); + FieldFuture::new(async move { + let guard_flag = if let Some(guard) = guard { + (*guard)(&ctx) + } else { + GuardAction::Allow + }; + + if let GuardAction::Block(reason) = guard_flag { + return Err::, async_graphql::Error>(async_graphql::Error::new( + reason.unwrap_or("Entity guard triggered.".into()), + )); + } + + let app_ctx = ctx.data::>()?; + + let filters = ctx.args.get(&context.entity_delete_mutation.filter_field); + + let result = mutation_fn(&ctx, app_ctx.clone(), filters).await?; + + Ok(result.map(async_graphql::Value::from)) + }) + }, + ) + .argument(InputValue::new( + &context.entity_delete_mutation.filter_field, + TypeRef::named(entity_filter_input_builder.type_name(&object_name)), + )) +} diff --git a/apps/recorder/src/graphql/infra/json.rs b/apps/recorder/src/graphql/infra/json.rs index ae8baaf..970153f 100644 --- a/apps/recorder/src/graphql/infra/json.rs +++ b/apps/recorder/src/graphql/infra/json.rs @@ -3,6 +3,7 @@ use async_graphql::{ dynamic::{ResolverContext, Scalar, SchemaError}, to_value, }; +use convert_case::Case; use itertools::Itertools; use rust_decimal::{Decimal, prelude::FromPrimitive}; use sea_orm::{ @@ -12,9 +13,13 @@ use sea_orm::{ use seaography::{ Builder as SeaographyBuilder, BuilderContext, FilterType, FnFilterCondition, SeaographyError, }; +use serde::{Serialize, de::DeserializeOwned}; use serde_json::Value as JsonValue; -use crate::{errors::RecorderResult, graphql::infra::util::get_entity_column_key}; +use crate::{ + errors::RecorderResult, graphql::infra::util::get_entity_column_key, + infra::json::convert_json_keys, +}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)] pub enum JsonbFilterOperation { @@ -948,6 +953,64 @@ where ); } +pub fn validate_jsonb_input_for_entity(context: &mut BuilderContext, column: &T::Column) +where + T: EntityTrait, + ::Model: Sync, + S: DeserializeOwned + Serialize, +{ + let entity_column_key = get_entity_column_key::(context, column); + context.types.input_conversions.insert( + entity_column_key.clone(), + Box::new(move |_resolve_context, accessor| { + let deserialized = accessor.deserialize::().map_err(|err| { + SeaographyError::TypeConversionError( + err.message, + format!("Json - {entity_column_key}"), + ) + })?; + let json_value = serde_json::to_value(deserialized).map_err(|err| { + SeaographyError::TypeConversionError( + err.to_string(), + format!("Json - {entity_column_key}"), + ) + })?; + Ok(sea_orm::Value::Json(Some(Box::new(json_value)))) + }), + ); +} + +pub fn convert_jsonb_output_case_for_entity(context: &mut BuilderContext, column: &T::Column) +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_column_key = get_entity_column_key::(context, column); + context.types.output_conversions.insert( + entity_column_key.clone(), + Box::new(move |value| { + if let sea_orm::Value::Json(Some(json)) = value { + let result = async_graphql::Value::from_json(convert_json_keys( + json.as_ref().clone(), + Case::Camel, + )) + .map_err(|err| { + SeaographyError::TypeConversionError( + err.to_string(), + format!("Json - {entity_column_key}"), + ) + })?; + Ok(result) + } else { + Err(SeaographyError::TypeConversionError( + "value should be json".to_string(), + format!("Json - {entity_column_key}"), + )) + } + }), + ); +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/apps/recorder/src/graphql/infra/mod.rs b/apps/recorder/src/graphql/infra/mod.rs index 421cdfb..af8f548 100644 --- a/apps/recorder/src/graphql/infra/mod.rs +++ b/apps/recorder/src/graphql/infra/mod.rs @@ -1,2 +1,3 @@ +pub mod custom; pub mod json; pub mod util; diff --git a/apps/recorder/src/graphql/schema.rs b/apps/recorder/src/graphql/schema.rs index 229a99e..3d93c6d 100644 --- a/apps/recorder/src/graphql/schema.rs +++ b/apps/recorder/src/graphql/schema.rs @@ -42,10 +42,6 @@ pub fn build_schema( register_subscribers_to_schema_context(&mut context); { - restrict_subscriber_for_entity::( - &mut context, - &bangumi::Column::SubscriberId, - ); restrict_subscriber_for_entity::( &mut context, &downloaders::Column::SubscriberId, @@ -74,10 +70,6 @@ pub fn build_schema( &mut context, &subscription_episode::Column::SubscriberId, ); - restrict_subscriber_for_entity::( - &mut context, - &subscriber_tasks::Column::SubscriberId, - ); restrict_subscriber_for_entity::( &mut context, &credential_3rd::Column::SubscriberId, @@ -110,7 +102,6 @@ pub fn build_schema( subscription_bangumi, subscription_episode, subscriptions, - subscriber_tasks, credential_3rd ] ); @@ -121,7 +112,6 @@ pub fn build_schema( builder.register_enumeration::(); builder.register_enumeration::(); builder.register_enumeration::(); - builder.register_enumeration::(); } builder = register_subscriptions_to_schema_builder(builder); diff --git a/apps/recorder/src/infra/json.rs b/apps/recorder/src/infra/json.rs new file mode 100644 index 0000000..d623700 --- /dev/null +++ b/apps/recorder/src/infra/json.rs @@ -0,0 +1,20 @@ +use convert_case::{Case, Casing}; +use serde_json::Value; + +pub fn convert_json_keys(json: Value, case: Case) -> Value { + match json { + Value::Object(object) => Value::Object( + object + .into_iter() + .map(|(key, value)| (key.to_case(case), convert_json_keys(value, case))) + .collect(), + ), + Value::Array(array) => Value::Array( + array + .into_iter() + .map(|item| convert_json_keys(item, case)) + .collect(), + ), + _ => json, + } +} diff --git a/apps/recorder/src/infra/mod.rs b/apps/recorder/src/infra/mod.rs new file mode 100644 index 0000000..22fdbb3 --- /dev/null +++ b/apps/recorder/src/infra/mod.rs @@ -0,0 +1 @@ +pub mod json; diff --git a/apps/recorder/src/lib.rs b/apps/recorder/src/lib.rs index fb05d5e..a580778 100644 --- a/apps/recorder/src/lib.rs +++ b/apps/recorder/src/lib.rs @@ -20,6 +20,7 @@ pub mod database; pub mod errors; pub mod extract; pub mod graphql; +pub mod infra; pub mod logger; pub mod message; pub mod migrations; diff --git a/apps/recorder/src/task/db.rs b/apps/recorder/src/task/db.rs new file mode 100644 index 0000000..2b097a4 --- /dev/null +++ b/apps/recorder/src/task/db.rs @@ -0,0 +1,7 @@ +use sea_orm::sea_query; + +#[derive(sea_query::Iden)] +pub enum ApalisJob { + Table, + Id, +} diff --git a/apps/recorder/src/task/mod.rs b/apps/recorder/src/task/mod.rs index 4b57a50..2a7b9d9 100644 --- a/apps/recorder/src/task/mod.rs +++ b/apps/recorder/src/task/mod.rs @@ -1,11 +1,13 @@ mod config; mod core; +mod db; mod registry; mod service; pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, SubscriberStreamTaskTrait}; pub use config::TaskConfig; +pub use db::ApalisJob; pub use registry::{ SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,