refactor: refactor graphql

This commit is contained in:
master 2025-06-27 04:06:58 +08:00
parent 3a8eb88e1a
commit 65505f91b2
43 changed files with 2199 additions and 818 deletions

32
Cargo.lock generated
View File

@ -6824,6 +6824,7 @@ dependencies = [
"tracing-appender", "tracing-appender",
"tracing-subscriber", "tracing-subscriber",
"tracing-tree", "tracing-tree",
"ts-rs",
"typed-builder 0.21.0", "typed-builder 0.21.0",
"url", "url",
"util", "util",
@ -8690,6 +8691,15 @@ dependencies = [
"unic-segment", "unic-segment",
] ]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "testcontainers" name = "testcontainers"
version = "0.24.0" version = "0.24.0"
@ -9214,6 +9224,28 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "ts-rs"
version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef1b7a6d914a34127ed8e1fa927eb7088903787bcded4fa3eef8f85ee1568be"
dependencies = [
"thiserror 2.0.12",
"ts-rs-macros",
]
[[package]]
name = "ts-rs-macros"
version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9d4ed7b4c18cc150a6a0a1e9ea1ecfa688791220781af6e119f9599a8502a0a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.104",
"termcolor",
]
[[package]] [[package]]
name = "tungstenite" name = "tungstenite"
version = "0.26.2" version = "0.26.2"

View File

@ -165,6 +165,7 @@ quick-xml = { version = "0.37.5", features = [
"serde", "serde",
] } ] }
croner = "2.2.0" croner = "2.2.0"
ts-rs = "11.0.1"
[dev-dependencies] [dev-dependencies]
inquire = { workspace = true } inquire = { workspace = true }

View File

@ -18,6 +18,8 @@ use crate::{
#[derive(Snafu, Debug)] #[derive(Snafu, Debug)]
#[snafu(visibility(pub(crate)))] #[snafu(visibility(pub(crate)))]
pub enum RecorderError { pub enum RecorderError {
#[snafu(transparent)]
SeaographyError { source: seaography::SeaographyError },
#[snafu(transparent)] #[snafu(transparent)]
CronError { source: croner::errors::CronError }, CronError { source: croner::errors::CronError },
#[snafu(display( #[snafu(display(
@ -192,20 +194,17 @@ impl RecorderError {
} }
} }
pub fn from_model_not_found_detail<C: Into<Cow<'static, str>>, T: ToString>( pub fn from_entity_not_found<E: sea_orm::EntityTrait>() -> Self {
model: C,
detail: T,
) -> Self {
Self::ModelEntityNotFound { Self::ModelEntityNotFound {
entity: model.into(), entity: std::any::type_name::<E::Model>().into(),
detail: Some(detail.to_string()), detail: None,
} }
} }
pub fn from_model_not_found<C: Into<Cow<'static, str>>>(model: C) -> Self { pub fn from_entity_not_found_detail<E: sea_orm::EntityTrait, T: ToString>(detail: T) -> Self {
Self::ModelEntityNotFound { Self::ModelEntityNotFound {
entity: model.into(), entity: std::any::type_name::<E::Model>().into(),
detail: None, detail: Some(detail.to_string()),
} }
} }
} }

View File

@ -227,10 +227,12 @@ impl MikanClient {
self.fork_with_userpass_credential(userpass_credential) self.fork_with_userpass_credential(userpass_credential)
.await .await
} else { } else {
Err(RecorderError::from_model_not_found_detail( Err(RecorderError::from_entity_not_found_detail::<
"credential", credential_3rd::Entity,
format!("credential id {credential_id} not found"), _,
)) >(format!(
"credential id {credential_id} not found"
)))
} }
} }

View File

@ -1,50 +1,28 @@
use std::sync::Arc; use std::sync::Arc;
use async_graphql::dynamic::{ use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, TypeRef};
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef, use sea_orm::{EntityTrait, QueryFilter};
}; use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions};
use seaography::{Builder as SeaographyBuilder, BuilderContext};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use util_derive::DynamicGraphql; use util_derive::DynamicGraphql;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
auth::AuthUserInfo,
errors::RecorderError, errors::RecorderError,
graphql::{ graphql::{
domains::subscribers::restrict_subscriber_for_entity, domains::subscribers::restrict_subscriber_for_entity,
infra::crypto::{ infra::{
crypto::{
register_crypto_column_input_conversion_to_schema_context, register_crypto_column_input_conversion_to_schema_context,
register_crypto_column_output_conversion_to_schema_context, register_crypto_column_output_conversion_to_schema_context,
}, },
custom::generate_entity_filtered_mutation_field,
name::get_entity_custom_mutation_field_name,
},
}, },
models::credential_3rd, models::credential_3rd,
}; };
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
struct Credential3rdCheckAvailableInput {
pub id: i32,
}
impl Credential3rdCheckAvailableInput {
fn input_type_name() -> &'static str {
"Credential3rdCheckAvailableInput"
}
fn arg_name() -> &'static str {
"filter"
}
fn generate_input_object() -> InputObject {
InputObject::new(Self::input_type_name())
.description("The input of the credential3rdCheckAvailable query")
.field(InputValue::new(
Credential3rdCheckAvailableInputFieldEnum::Id.as_str(),
TypeRef::named_nn(TypeRef::INT),
))
}
}
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)] #[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
pub struct Credential3rdCheckAvailableInfo { pub struct Credential3rdCheckAvailableInfo {
pub available: bool, pub available: bool,
@ -119,36 +97,36 @@ pub fn register_credential3rd_to_schema_builder(
builder.register_enumeration::<credential_3rd::Credential3rdType>(); builder.register_enumeration::<credential_3rd::Credential3rdType>();
seaography::register_entity!(builder, credential_3rd); seaography::register_entity!(builder, credential_3rd);
builder.schema = builder
.schema
.register(Credential3rdCheckAvailableInput::generate_input_object());
builder.schema = builder builder.schema = builder
.schema .schema
.register(Credential3rdCheckAvailableInfo::generate_output_object()); .register(Credential3rdCheckAvailableInfo::generate_output_object());
builder.queries.push( let builder_context = builder.context;
Field::new( {
"credential3rdCheckAvailable", let check_available_mutation_name = get_entity_custom_mutation_field_name::<
credential_3rd::Entity,
>(builder_context, "CheckAvailable");
let check_available_mutation =
generate_entity_filtered_mutation_field::<credential_3rd::Entity, _, _>(
builder_context,
check_available_mutation_name,
TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()), TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()),
move |ctx| { Arc::new(|resolver_ctx, app_ctx, filters| {
FieldFuture::new(async move { let filters_condition = get_filter_conditions::<credential_3rd::Entity>(
let auth_user_info = ctx.data::<AuthUserInfo>()?; resolver_ctx,
let input: Credential3rdCheckAvailableInput = ctx builder_context,
.args filters,
.get(Credential3rdCheckAvailableInput::arg_name()) );
.unwrap()
.deserialize()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let credential_model = credential_3rd::Model::find_by_id_and_subscriber_id( Box::pin(async move {
app_ctx.as_ref(), let db = app_ctx.db();
input.id,
auth_user_info.subscriber_auth.subscriber_id, let credential_model = credential_3rd::Entity::find()
) .filter(filters_condition)
.one(db)
.await? .await?
.ok_or_else(|| RecorderError::Credential3rdError { .ok_or_else(|| {
message: format!("credential = {} not found", input.id), RecorderError::from_entity_not_found::<credential_3rd::Entity>()
source: None.into(),
})?; })?;
let available = credential_model.check_available(app_ctx.as_ref()).await?; let available = credential_model.check_available(app_ctx.as_ref()).await?;
@ -156,13 +134,10 @@ pub fn register_credential3rd_to_schema_builder(
Credential3rdCheckAvailableInfo { available }, Credential3rdCheckAvailableInfo { available },
))) )))
}) })
}, }),
)
.argument(InputValue::new(
Credential3rdCheckAvailableInput::arg_name(),
TypeRef::named_nn(Credential3rdCheckAvailableInput::input_type_name()),
)),
); );
builder.mutations.push(check_available_mutation);
}
builder builder
} }

View File

@ -0,0 +1,127 @@
use convert_case::Case;
use sea_orm::Iterable;
use seaography::{Builder as SeaographyBuilder, BuilderContext};
use crate::{
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::{
generate_entity_default_create_batch_mutation_field,
generate_entity_default_create_one_mutation_field,
generate_entity_default_delete_mutation_field,
generate_entity_default_insert_input_object,
generate_entity_default_update_input_object,
generate_entity_default_update_mutation_field,
},
json::{
convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity,
validate_jsonb_input_for_entity,
},
name::get_entity_and_column_name,
},
},
models::{cron, subscriber_tasks},
};
fn skip_columns_for_entity_input(context: &mut BuilderContext) {
for column in cron::Column::iter() {
if matches!(
column,
cron::Column::SubscriberTask
| cron::Column::Id
| cron::Column::CronExpr
| cron::Column::Enabled
| cron::Column::TimeoutMs
| cron::Column::MaxAttempts
) {
continue;
}
let entity_column_key = get_entity_and_column_name::<cron::Entity>(context, &column);
context.entity_input.insert_skips.push(entity_column_key);
}
for column in cron::Column::iter() {
if matches!(column, |cron::Column::CronExpr| cron::Column::Enabled
| cron::Column::TimeoutMs
| cron::Column::Priority
| cron::Column::MaxAttempts)
{
continue;
}
let entity_column_key = get_entity_and_column_name::<cron::Entity>(context, &column);
context.entity_input.update_skips.push(entity_column_key);
}
}
pub fn register_cron_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<cron::Entity>(context, &cron::Column::SubscriberId);
restrict_jsonb_filter_input_for_entity::<cron::Entity>(context, &cron::Column::SubscriberTask);
convert_jsonb_output_case_for_entity::<cron::Entity>(
context,
&cron::Column::SubscriberTask,
Case::Camel,
);
validate_jsonb_input_for_entity::<cron::Entity, Option<subscriber_tasks::SubscriberTask>>(
context,
&cron::Column::SubscriberTask,
);
skip_columns_for_entity_input(context);
}
pub fn register_cron_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder {
builder.register_entity::<cron::Entity>(
<cron::RelatedEntity as sea_orm::Iterable>::iter()
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
.collect(),
);
builder = builder.register_entity_dataloader_one_to_one(cron::Entity, tokio::spawn);
builder = builder.register_entity_dataloader_one_to_many(cron::Entity, tokio::spawn);
builder.register_enumeration::<cron::CronStatus>();
let builder_context = builder.context;
{
builder
.inputs
.push(generate_entity_default_insert_input_object::<cron::Entity>(
builder_context,
));
builder
.mutations
.push(generate_entity_default_create_one_mutation_field::<
cron::Entity,
_,
>(builder_context, true));
builder
.mutations
.push(generate_entity_default_create_batch_mutation_field::<
cron::Entity,
_,
>(builder_context, true));
}
{
builder
.inputs
.push(generate_entity_default_update_input_object::<cron::Entity>(
builder_context,
));
builder
.mutations
.push(generate_entity_default_update_mutation_field::<
cron::Entity,
_,
>(builder_context, true));
}
{
builder
.mutations
.push(generate_entity_default_delete_mutation_field::<
cron::Entity,
_,
>(builder_context, false));
}
builder
}

View File

@ -7,7 +7,10 @@ use seaography::{Builder as SeaographyBuilder, BuilderContext, SeaResult};
use crate::{ use crate::{
graphql::{ graphql::{
domains::subscribers::restrict_subscriber_for_entity, domains::subscribers::restrict_subscriber_for_entity,
infra::util::{get_entity_column_key, get_entity_key}, infra::name::{
get_entity_and_column_name, get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_field_name,
},
}, },
models::feeds, models::feeds,
}; };
@ -15,22 +18,14 @@ use crate::{
pub fn register_feeds_to_schema_context(context: &mut BuilderContext) { pub fn register_feeds_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<feeds::Entity>(context, &feeds::Column::SubscriberId); restrict_subscriber_for_entity::<feeds::Entity>(context, &feeds::Column::SubscriberId);
{ {
let entity_column_key = let entity_create_one_mutation_field_name = Arc::new(
get_entity_column_key::<feeds::Entity>(context, &feeds::Column::Token); get_entity_create_one_mutation_field_name::<feeds::Entity>(context),
let entity_key = get_entity_key::<feeds::Entity>(context); );
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); let entity_create_batch_mutation_field_name =
let entity_create_one_mutation_field_name = Arc::new(format!( Arc::new(get_entity_create_batch_mutation_field_name::<feeds::Entity>(context));
"{}{}",
entity_name, context.entity_create_one_mutation.mutation_suffix
));
let entity_create_batch_mutation_field_name = Arc::new(format!(
"{}{}",
entity_name,
context.entity_create_batch_mutation.mutation_suffix.clone()
));
context.types.input_none_conversions.insert( context.types.input_none_conversions.insert(
entity_column_key, get_entity_and_column_name::<feeds::Entity>(context, &feeds::Column::Token),
Box::new( Box::new(
move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> { move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> {
let field_name = context.field().name(); let field_name = context.field().name();

View File

@ -10,3 +10,4 @@ pub mod subscribers;
pub mod subscription_bangumi; pub mod subscription_bangumi;
pub mod subscription_episode; pub mod subscription_episode;
pub mod subscriptions; pub mod subscriptions;
pub mod cron;

View File

@ -1,37 +1,208 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use async_graphql::dynamic::{FieldValue, TypeRef}; use async_graphql::dynamic::{FieldValue, TypeRef, ValueAccessor};
use convert_case::Case;
use sea_orm::{ use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr, ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter, QuerySelect, QueryTrait,
sea_query::Query, prelude::Expr, sea_query::Query,
}; };
use seaography::{ use seaography::{
Builder as SeaographyBuilder, BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder, Builder as SeaographyBuilder, BuilderContext, GuardAction, get_filter_conditions,
EntityQueryFieldBuilder, get_filter_conditions,
}; };
use crate::{ use crate::{
auth::AuthUserInfo,
errors::RecorderError, errors::RecorderError,
graphql::{ graphql::{
domains::subscribers::restrict_subscriber_for_entity, domains::subscribers::restrict_subscriber_for_entity,
infra::{ infra::{
custom::generate_entity_filter_mutation_field, custom::{
json::{convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity}, generate_entity_create_one_mutation_field,
generate_entity_default_insert_input_object,
generate_entity_filtered_mutation_field,
},
json::{
convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity,
validate_jsonb_input_for_entity,
},
name::{
get_column_name, get_entity_and_column_name, get_entity_basic_type_name,
get_entity_create_batch_mutation_data_field_name,
get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_data_field_name,
get_entity_create_one_mutation_field_name, get_entity_custom_mutation_field_name,
get_entity_delete_mutation_field_name, get_entity_update_mutation_field_name,
},
}, },
}, },
models::subscriber_tasks, models::subscriber_tasks,
task::{ApalisJobs, ApalisSchema}, task::{ApalisJobs, ApalisSchema},
}; };
pub fn register_subscriber_tasks_entity_mutations( pub fn check_entity_and_task_subscriber_id_matches(
value_accessor: &ValueAccessor<'_>,
subscriber_id: i32,
subscriber_id_column_name: &str,
subscriber_task_column_name: &str,
) -> bool {
value_accessor.object().is_ok_and(|input_object| {
input_object
.get(subscriber_task_column_name)
.and_then(|subscriber_task_value| subscriber_task_value.object().ok())
.and_then(|subscriber_task_object| {
subscriber_task_object
.get("subscriber_id")
.and_then(|job_subscriber_id| job_subscriber_id.i64().ok())
})
.is_some_and(|subscriber_task_subscriber_id| {
subscriber_task_subscriber_id as i32
== input_object
.get(subscriber_id_column_name)
.and_then(|subscriber_id_object| subscriber_id_object.i64().ok())
.map(|subscriber_id| subscriber_id as i32)
.unwrap_or(subscriber_id)
})
})
}
fn skip_columns_for_entity_input(context: &mut BuilderContext) {
for column in subscriber_tasks::Column::iter() {
if matches!(
column,
subscriber_tasks::Column::Job
| subscriber_tasks::Column::Id
| subscriber_tasks::Column::SubscriberId
| subscriber_tasks::Column::Priority
| subscriber_tasks::Column::MaxAttempts
) {
continue;
}
let entity_column_key =
get_entity_and_column_name::<subscriber_tasks::Entity>(context, &column);
context.entity_input.insert_skips.push(entity_column_key);
}
}
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
);
convert_jsonb_output_case_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
Case::Camel,
);
validate_jsonb_input_for_entity::<subscriber_tasks::Entity, subscriber_tasks::SubscriberTask>(
context,
&subscriber_tasks::Column::Job,
);
skip_columns_for_entity_input(context);
context.guards.field_guards.insert(
get_entity_and_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
),
{
let create_one_mutation_field_name =
Arc::new(get_entity_create_one_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let create_one_mutation_data_field_name =
Arc::new(get_entity_create_one_mutation_data_field_name(context).to_string());
let create_batch_mutation_field_name =
Arc::new(get_entity_create_batch_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let create_batch_mutation_data_field_name =
Arc::new(get_entity_create_batch_mutation_data_field_name(context).to_string());
let update_mutation_field_name = Arc::new(get_entity_update_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let job_column_name = Arc::new(get_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
));
let subscriber_id_column_name = Arc::new(get_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::SubscriberId,
));
Box::new(move |resolve_context| {
let field_name = resolve_context.field().name();
let subscriber_id = resolve_context
.data_opt::<AuthUserInfo>()
.unwrap()
.subscriber_auth
.subscriber_id;
let matched_subscriber_id = match field_name {
field if field == create_one_mutation_field_name.as_str() => resolve_context
.args
.get(create_one_mutation_data_field_name.as_str())
.is_some_and(|value_accessor| {
check_entity_and_task_subscriber_id_matches(
&value_accessor,
subscriber_id,
subscriber_id_column_name.as_str(),
job_column_name.as_str(),
)
}),
field if field == create_batch_mutation_field_name.as_str() => resolve_context
.args
.get(create_batch_mutation_data_field_name.as_str())
.and_then(|value| value.list().ok())
.is_some_and(|list| {
list.iter().all(|value| {
check_entity_and_task_subscriber_id_matches(
&value,
subscriber_id,
subscriber_id_column_name.as_str(),
job_column_name.as_str(),
)
})
}),
field if field == update_mutation_field_name.as_str() => {
unreachable!("subscriberTask entity do not support update job")
}
_ => true,
};
if matched_subscriber_id {
GuardAction::Allow
} else {
GuardAction::Block(Some(
"subscriber_id mismatch between entity and job".to_string(),
))
}
})
},
);
}
pub fn register_subscriber_tasks_to_schema_builder(
mut builder: SeaographyBuilder, mut builder: SeaographyBuilder,
) -> SeaographyBuilder { ) -> SeaographyBuilder {
builder.register_entity::<subscriber_tasks::Entity>(
<subscriber_tasks::RelatedEntity as sea_orm::Iterable>::iter()
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
.collect(),
);
builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn);
builder =
builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn);
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
let context = builder.context; let context = builder.context;
{ {
let entitity_delete_mutation_builder = EntityDeleteMutationBuilder { context }; let delete_mutation =
let delete_mutation = generate_entity_filter_mutation_field::<subscriber_tasks::Entity, _, _>( generate_entity_filtered_mutation_field::<subscriber_tasks::Entity, _, _>(
context, context,
entitity_delete_mutation_builder.type_name::<subscriber_tasks::Entity>(), get_entity_delete_mutation_field_name::<subscriber_tasks::Entity>(context),
TypeRef::named_nn(TypeRef::INT), TypeRef::named_nn(TypeRef::INT),
Arc::new(|resolver_ctx, app_ctx, filters| { Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>( let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>(
@ -59,24 +230,24 @@ pub fn register_subscriber_tasks_entity_mutations(
let result = db.execute(delete_statement).await?; let result = db.execute(delete_statement).await?;
Ok::<_, RecorderError>(Some(FieldValue::value(result.rows_affected() as i32))) Ok::<_, RecorderError>(Some(FieldValue::value(
result.rows_affected() as i32
)))
}) })
}), }),
); );
builder.mutations.push(delete_mutation); builder.mutations.push(delete_mutation);
} }
{ {
let entity_object_builder = EntityObjectBuilder { context }; let entity_retry_one_mutation_name =
let entity_query_field = EntityQueryFieldBuilder { context }; get_entity_custom_mutation_field_name::<subscriber_tasks::Entity>(context, "RetryOne");
let entity_retry_one_mutation_name = format!(
"{}RetryOne",
entity_query_field.type_name::<subscriber_tasks::Entity>()
);
let retry_one_mutation = let retry_one_mutation =
generate_entity_filter_mutation_field::<subscriber_tasks::Entity, _, _>( generate_entity_filtered_mutation_field::<subscriber_tasks::Entity, _, _>(
context, context,
entity_retry_one_mutation_name, entity_retry_one_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()), TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|resolver_ctx, app_ctx, filters| { Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>( let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>(
resolver_ctx, resolver_ctx,
@ -93,7 +264,9 @@ pub fn register_subscriber_tasks_entity_mutations(
.into_tuple::<String>() .into_tuple::<String>()
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
let task = app_ctx.task(); let task = app_ctx.task();
task.retry_subscriber_task(job_id.clone()).await?; task.retry_subscriber_task(job_id.clone()).await?;
@ -102,7 +275,9 @@ pub fn register_subscriber_tasks_entity_mutations(
.filter(subscriber_tasks::Column::Id.eq(&job_id)) .filter(subscriber_tasks::Column::Id.eq(&job_id))
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model))) Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model)))
}) })
@ -110,38 +285,47 @@ pub fn register_subscriber_tasks_entity_mutations(
); );
builder.mutations.push(retry_one_mutation); builder.mutations.push(retry_one_mutation);
} }
{
builder builder
} .inputs
.push(generate_entity_default_insert_input_object::<
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) { subscriber_tasks::Entity,
restrict_subscriber_for_entity::<subscriber_tasks::Entity>( >(context));
let create_one_mutation =
generate_entity_create_one_mutation_field::<subscriber_tasks::Entity, TypeRef>(
context, context,
&subscriber_tasks::Column::SubscriberId, None,
); Arc::new(|_resolver_ctx, app_ctx, input_object| {
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>( let job_column_name = get_column_name::<subscriber_tasks::Entity>(
context, context,
&subscriber_tasks::Column::Job, &subscriber_tasks::Column::Job,
); );
convert_jsonb_output_case_for_entity::<subscriber_tasks::Entity>( let task = input_object
context, .get(job_column_name.as_str())
&subscriber_tasks::Column::Job, .unwrap()
); .deserialize::<subscriber_tasks::SubscriberTask>()
} .unwrap();
pub fn register_subscriber_tasks_to_schema_builder( Box::pin(async move {
mut builder: SeaographyBuilder, let task_service = app_ctx.task();
) -> SeaographyBuilder {
builder.register_entity::<subscriber_tasks::Entity>( let task_id = task_service.add_subscriber_task(task).await?.to_string();
<subscriber_tasks::RelatedEntity as sea_orm::Iterable>::iter()
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) let db = app_ctx.db();
.collect(),
let task = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(&task_id))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok::<_, RecorderError>(task)
})
}),
); );
builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn); builder.mutations.push(create_one_mutation);
builder = }
builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn);
builder = register_subscriber_tasks_entity_mutations(builder);
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
builder builder
} }

View File

@ -12,7 +12,14 @@ use seaography::{
use crate::{ use crate::{
auth::{AuthError, AuthUserInfo}, auth::{AuthError, AuthUserInfo},
graphql::infra::util::{get_column_key, get_entity_column_key, get_entity_key}, graphql::infra::name::{
get_column_name, get_entity_and_column_name,
get_entity_create_batch_mutation_data_field_name,
get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_data_field_name, get_entity_create_one_mutation_field_name,
get_entity_name, get_entity_update_mutation_data_field_name,
get_entity_update_mutation_field_name,
},
models::subscribers, models::subscribers,
}; };
@ -82,32 +89,19 @@ where
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_key = get_entity_key::<T>(context); let column_name = Arc::new(get_column_name::<T>(context, column));
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); let entity_create_one_mutation_field_name =
let column_key = get_column_key::<T>(context, column); Arc::new(get_entity_create_one_mutation_field_name::<T>(context));
let column_name = Arc::new(context.entity_object.column_name.as_ref()(
&entity_key,
&column_key,
));
let entity_create_one_mutation_field_name = Arc::new(format!(
"{}{}",
entity_name, context.entity_create_one_mutation.mutation_suffix
));
let entity_create_one_mutation_data_field_name = let entity_create_one_mutation_data_field_name =
Arc::new(context.entity_create_one_mutation.data_field.clone()); Arc::new(get_entity_create_one_mutation_data_field_name(context).to_string());
let entity_create_batch_mutation_field_name = Arc::new(format!( let entity_create_batch_mutation_field_name =
"{}{}", Arc::new(get_entity_create_batch_mutation_field_name::<T>(context));
entity_name,
context.entity_create_batch_mutation.mutation_suffix.clone()
));
let entity_create_batch_mutation_data_field_name = let entity_create_batch_mutation_data_field_name =
Arc::new(context.entity_create_batch_mutation.data_field.clone()); Arc::new(get_entity_create_batch_mutation_data_field_name(context).to_string());
let entity_update_mutation_field_name = Arc::new(format!( let entity_update_mutation_field_name =
"{}{}", Arc::new(get_entity_update_mutation_field_name::<T>(context));
entity_name, context.entity_update_mutation.mutation_suffix
));
let entity_update_mutation_data_field_name = let entity_update_mutation_data_field_name =
Arc::new(context.entity_update_mutation.data_field.clone()); Arc::new(get_entity_update_mutation_data_field_name(context).to_string());
Box::new(move |context: &ResolverContext| -> GuardAction { Box::new(move |context: &ResolverContext| -> GuardAction {
match context.ctx.data::<AuthUserInfo>() { match context.ctx.data::<AuthUserInfo>() {
@ -253,17 +247,10 @@ where
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_key = get_entity_key::<T>(context); let entity_create_one_mutation_field_name =
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); Arc::new(get_entity_create_one_mutation_field_name::<T>(context));
let entity_create_one_mutation_field_name = Arc::new(format!( let entity_create_batch_mutation_field_name =
"{}{}", Arc::new(get_entity_create_batch_mutation_field_name::<T>(context));
entity_name, context.entity_create_one_mutation.mutation_suffix
));
let entity_create_batch_mutation_field_name = Arc::new(format!(
"{}{}",
entity_name,
context.entity_create_batch_mutation.mutation_suffix.clone()
));
Box::new( Box::new(
move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> { move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> {
let field_name = context.field().name(); let field_name = context.field().name();
@ -289,40 +276,39 @@ where
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_key = get_entity_key::<T>(context); let entity_and_column = get_entity_and_column_name::<T>(context, column);
let entity_column_key = get_entity_column_key::<T>(context, column);
context.guards.entity_guards.insert( context.guards.entity_guards.insert(
entity_key.clone(), get_entity_name::<T>(context),
guard_entity_with_subscriber_id::<T>(context, column), guard_entity_with_subscriber_id::<T>(context, column),
); );
context.guards.field_guards.insert( context.guards.field_guards.insert(
entity_column_key.clone(), get_entity_and_column_name::<T>(context, column),
guard_field_with_subscriber_id::<T>(context, column), guard_field_with_subscriber_id::<T>(context, column),
); );
context.filter_types.overwrites.insert( context.filter_types.overwrites.insert(
entity_column_key.clone(), get_entity_and_column_name::<T>(context, column),
Some(FilterType::Custom( Some(FilterType::Custom(
SUBSCRIBER_ID_FILTER_INFO.type_name.clone(), SUBSCRIBER_ID_FILTER_INFO.type_name.clone(),
)), )),
); );
context.filter_types.condition_functions.insert( context.filter_types.condition_functions.insert(
entity_column_key.clone(), entity_and_column.clone(),
generate_subscriber_id_filter_condition::<T>(context, column), generate_subscriber_id_filter_condition::<T>(context, column),
); );
context.types.input_none_conversions.insert( context.types.input_none_conversions.insert(
entity_column_key.clone(), entity_and_column.clone(),
generate_default_subscriber_id_input_conversion::<T>(context, column), generate_default_subscriber_id_input_conversion::<T>(context, column),
); );
context.entity_input.update_skips.push(entity_column_key); context.entity_input.update_skips.push(entity_and_column);
} }
pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) { pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<subscribers::Entity>(context, &subscribers::Column::Id); restrict_subscriber_for_entity::<subscribers::Entity>(context, &subscribers::Column::Id);
for column in subscribers::Column::iter() { for column in subscribers::Column::iter() {
if !matches!(column, subscribers::Column::Id) { if !matches!(column, subscribers::Column::Id) {
let key = get_entity_column_key::<subscribers::Entity>(context, &column); let key = get_entity_and_column_name::<subscribers::Entity>(context, &column);
context.filter_types.overwrites.insert(key, None); context.filter_types.overwrites.insert(key, None);
} }
} }

View File

@ -2,22 +2,22 @@ use std::sync::Arc;
use async_graphql::dynamic::{FieldValue, TypeRef}; use async_graphql::dynamic::{FieldValue, TypeRef};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use seaography::{ use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions};
Builder as SeaographyBuilder, BuilderContext, EntityObjectBuilder, EntityQueryFieldBuilder,
get_filter_conditions,
};
use crate::{ use crate::{
errors::RecorderError, errors::RecorderError,
graphql::{ graphql::{
domains::subscribers::restrict_subscriber_for_entity, domains::subscribers::restrict_subscriber_for_entity,
infra::custom::generate_entity_filter_mutation_field, infra::{
custom::generate_entity_filtered_mutation_field,
name::{get_entity_basic_type_name, get_entity_custom_mutation_field_name},
}, },
models::{
subscriber_tasks,
subscriptions::{self, SubscriptionTrait},
}, },
task::SubscriberTask, models::{subscriber_tasks, subscriptions},
task::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
},
}; };
pub fn register_subscriptions_to_schema_context(context: &mut BuilderContext) { pub fn register_subscriptions_to_schema_context(context: &mut BuilderContext) {
@ -35,23 +35,21 @@ pub fn register_subscriptions_to_schema_builder(
let context = builder.context; let context = builder.context;
let entity_object_builder = EntityObjectBuilder { context };
let entity_query_field = EntityQueryFieldBuilder { context };
{ {
let sync_one_feeds_incremental_mutation_name = format!( let sync_one_feeds_incremental_mutation_name = get_entity_custom_mutation_field_name::<
"{}SyncOneFeedsIncremental", subscriptions::Entity,
entity_query_field.type_name::<subscriptions::Entity>() >(context, "SyncOneFeedsIncremental");
);
let sync_one_feeds_incremental_mutation = generate_entity_filter_mutation_field::< let sync_one_feeds_incremental_mutation = generate_entity_filtered_mutation_field::<
subscriptions::Entity, subscriptions::Entity,
_, _,
_, _,
>( >(
builder.context, builder.context,
sync_one_feeds_incremental_mutation_name, sync_one_feeds_incremental_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()), TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|resolver_ctx, app_ctx, filters| { Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters); get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
@ -63,19 +61,19 @@ pub fn register_subscriptions_to_schema_builder(
.filter(filters_condition) .filter(filters_condition)
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
let subscription = })?;
subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task(); let task_service = app_ctx.task();
let task_id = task_service let task_id = task_service
.add_subscriber_task( .add_subscriber_task(
subscription_model.subscriber_id, SyncOneSubscriptionFeedsIncrementalTask::builder()
SubscriberTask::SyncOneSubscriptionFeedsIncremental( .subscriber_id(subscription_model.subscriber_id)
subscription.into(), .subscription_id(subscription_model.id)
), .build()
.into(),
) )
.await?; .await?;
@ -83,7 +81,9 @@ pub fn register_subscriptions_to_schema_builder(
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model))) Ok(Some(FieldValue::owned_any(task_model)))
}) })
@ -93,19 +93,19 @@ pub fn register_subscriptions_to_schema_builder(
builder.mutations.push(sync_one_feeds_incremental_mutation); builder.mutations.push(sync_one_feeds_incremental_mutation);
} }
{ {
let sync_one_feeds_full_mutation_name = format!( let sync_one_feeds_full_mutation_name = get_entity_custom_mutation_field_name::<
"{}SyncOneFeedsFull", subscriptions::Entity,
entity_query_field.type_name::<subscriptions::Entity>() >(builder.context, "SyncOneFeedsFull");
); let sync_one_feeds_full_mutation = generate_entity_filtered_mutation_field::<
let sync_one_feeds_full_mutation = generate_entity_filter_mutation_field::<
subscriptions::Entity, subscriptions::Entity,
_, _,
_, _,
>( >(
builder.context, builder.context,
sync_one_feeds_full_mutation_name, sync_one_feeds_full_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()), TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|resolver_ctx, app_ctx, filters| { Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters); get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
@ -117,17 +117,19 @@ pub fn register_subscriptions_to_schema_builder(
.filter(filters_condition) .filter(filters_condition)
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
let subscription = })?;
subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task(); let task_service = app_ctx.task();
let task_id = task_service let task_id = task_service
.add_subscriber_task( .add_subscriber_task(
subscription_model.subscriber_id, SyncOneSubscriptionFeedsFullTask::builder()
SubscriberTask::SyncOneSubscriptionFeedsFull(subscription.into()), .subscriber_id(subscription_model.subscriber_id)
.subscription_id(subscription_model.id)
.build()
.into(),
) )
.await?; .await?;
@ -135,7 +137,9 @@ pub fn register_subscriptions_to_schema_builder(
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model))) Ok(Some(FieldValue::owned_any(task_model)))
}) })
@ -146,19 +150,20 @@ pub fn register_subscriptions_to_schema_builder(
} }
{ {
let sync_one_sources_mutation_name = format!( let sync_one_sources_mutation_name = get_entity_custom_mutation_field_name::<
"{}SyncOneSources", subscriptions::Entity,
entity_query_field.type_name::<subscriptions::Entity>() >(context, "SyncOneSources");
);
let sync_one_sources_mutation = generate_entity_filter_mutation_field::< let sync_one_sources_mutation = generate_entity_filtered_mutation_field::<
subscriptions::Entity, subscriptions::Entity,
_, _,
_, _,
>( >(
builder.context, builder.context,
sync_one_sources_mutation_name, sync_one_sources_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()), TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|resolver_ctx, app_ctx, filters| { Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters); get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
@ -170,17 +175,19 @@ pub fn register_subscriptions_to_schema_builder(
.filter(filters_condition) .filter(filters_condition)
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
let subscription = })?;
subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task(); let task_service = app_ctx.task();
let task_id = task_service let task_id = task_service
.add_subscriber_task( .add_subscriber_task(
subscription_model.subscriber_id, SyncOneSubscriptionSourcesTask::builder()
SubscriberTask::SyncOneSubscriptionSources(subscription.into()), .subscriber_id(subscription_model.subscriber_id)
.subscription_id(subscription_model.id)
.build()
.into(),
) )
.await?; .await?;
@ -188,7 +195,9 @@ pub fn register_subscriptions_to_schema_builder(
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db) .one(db)
.await? .await?
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; .ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model))) Ok(Some(FieldValue::owned_any(task_model)))
}) })

View File

@ -6,7 +6,7 @@ use seaography::{BuilderContext, SeaResult};
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
graphql::infra::util::{get_column_key, get_entity_key}, graphql::infra::name::{get_column_name, get_entity_name},
}; };
pub fn register_crypto_column_input_conversion_to_schema_context<T>( pub fn register_crypto_column_input_conversion_to_schema_context<T>(
@ -17,8 +17,8 @@ pub fn register_crypto_column_input_conversion_to_schema_context<T>(
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_key = get_entity_key::<T>(context); let entity_key = get_entity_name::<T>(context);
let column_name = get_column_key::<T>(context, column); let column_name = get_column_name::<T>(context, column);
let entity_name = context.entity_object.type_name.as_ref()(&entity_key); let entity_name = context.entity_object.type_name.as_ref()(&entity_key);
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);
@ -44,8 +44,8 @@ pub fn register_crypto_column_output_conversion_to_schema_context<T>(
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_key = get_entity_key::<T>(context); let entity_key = get_entity_name::<T>(context);
let column_name = get_column_key::<T>(context, column); let column_name = get_column_name::<T>(context, column);
let entity_name = context.entity_object.type_name.as_ref()(&entity_key); let entity_name = context.entity_object.type_name.as_ref()(&entity_key);
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);

View File

@ -1,12 +1,31 @@
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use async_graphql::dynamic::{ use async_graphql::dynamic::{
Field, FieldFuture, FieldValue, InputValue, ResolverContext, TypeRef, ValueAccessor, Field, FieldFuture, FieldValue, InputObject, InputValue, Object, ObjectAccessor,
ResolverContext, TypeRef, ValueAccessor,
};
use sea_orm::{
ActiveModelTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
};
use seaography::{
BuilderContext, GuardAction, SeaographyError, get_filter_conditions, prepare_active_model,
}; };
use sea_orm::EntityTrait;
use seaography::{BuilderContext, EntityObjectBuilder, FilterInputBuilder, GuardAction};
use crate::{app::AppContextTrait, errors::RecorderResult}; use crate::{
app::AppContextTrait,
errors::RecorderResult,
graphql::infra::name::{
get_entity_and_column_name_from_column_str, get_entity_basic_type_name,
get_entity_create_batch_mutation_data_field_name,
get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_data_field_name, get_entity_create_one_mutation_field_name,
get_entity_delete_mutation_field_name, get_entity_delete_mutation_filter_field_name,
get_entity_filter_input_type_name, get_entity_insert_data_input_type_name, get_entity_name,
get_entity_renormalized_filter_field_name, get_entity_update_data_input_type_name,
get_entity_update_mutation_data_field_name, get_entity_update_mutation_field_name,
get_entity_update_mutation_filter_field_name,
},
};
pub type FilterMutationFn = Arc< pub type FilterMutationFn = Arc<
dyn for<'a> Fn( dyn for<'a> Fn(
@ -19,27 +38,102 @@ pub type FilterMutationFn = Arc<
+ Sync, + Sync,
>; >;
pub fn generate_entity_filter_mutation_field<T, N, R>( pub type CreateOneMutationFn<M> = Arc<
dyn for<'a> Fn(
&ResolverContext<'a>,
Arc<dyn AppContextTrait>,
ObjectAccessor<'_>,
) -> Pin<Box<dyn Future<Output = RecorderResult<M>> + Send + 'a>>
+ Send
+ Sync,
>;
pub type CreateBatchMutationFn<M> = Arc<
dyn for<'a> Fn(
&ResolverContext<'a>,
Arc<dyn AppContextTrait>,
Vec<ObjectAccessor<'_>>,
) -> Pin<Box<dyn Future<Output = RecorderResult<Vec<M>>> + Send + 'a>>
+ Send
+ Sync,
>;
pub type UpdateMutationFn<M> = Arc<
dyn for<'a> Fn(
&ResolverContext<'a>,
Arc<dyn AppContextTrait>,
Condition,
ObjectAccessor<'_>,
) -> Pin<Box<dyn Future<Output = RecorderResult<Vec<M>>> + Send + 'a>>
+ Send
+ Sync,
>;
pub type DeleteMutationFn = Arc<
dyn for<'a> Fn(
&ResolverContext<'a>,
Arc<dyn AppContextTrait>,
Condition,
) -> Pin<Box<dyn Future<Output = RecorderResult<u64>> + Send + 'a>>
+ Send
+ Sync,
>;
pub fn generate_entity_default_insert_input_object<T>(
builder_context: &'static BuilderContext,
) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
entity_input_builder.insert_input_object::<T>()
}
pub fn generate_entity_default_update_input_object<T>(
builder_context: &'static BuilderContext,
) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
entity_input_builder.update_input_object::<T>()
}
pub fn generate_entity_default_basic_entity_object<T>(
builder_context: &'static BuilderContext,
) -> Object
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
entity_object_builder.basic_to_object::<T>()
}
pub fn generate_entity_filtered_mutation_field<E, N, R>(
builder_context: &'static BuilderContext, builder_context: &'static BuilderContext,
field_name: N, field_name: N,
type_ref: R, type_ref: R,
mutation_fn: FilterMutationFn, mutation_fn: FilterMutationFn,
) -> Field ) -> Field
where where
T: EntityTrait, E: EntityTrait,
<T as EntityTrait>::Model: Sync, <E as EntityTrait>::Model: Sync,
N: Into<String>, N: Into<String>,
R: Into<TypeRef>, R: Into<TypeRef>,
{ {
let entity_filter_input_builder = FilterInputBuilder { let object_name: String = get_entity_name::<E>(builder_context);
context: builder_context,
};
let entity_object_builder = EntityObjectBuilder {
context: builder_context,
};
let object_name: String = entity_object_builder.type_name::<T>();
let context = builder_context;
let guard = builder_context.guards.entity_guards.get(&object_name); let guard = builder_context.guards.entity_guards.get(&object_name);
@ -60,7 +154,7 @@ where
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?; let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let filters = ctx.args.get(&context.entity_delete_mutation.filter_field); let filters = ctx.args.get(get_entity_renormalized_filter_field_name());
let result = mutation_fn(&ctx, app_ctx.clone(), filters) let result = mutation_fn(&ctx, app_ctx.clone(), filters)
.await .await
@ -70,7 +164,630 @@ where
}) })
}) })
.argument(InputValue::new( .argument(InputValue::new(
&context.entity_delete_mutation.filter_field, get_entity_renormalized_filter_field_name(),
TypeRef::named(entity_filter_input_builder.type_name(&object_name)), TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
)) ))
} }
pub fn generate_entity_create_one_mutation_field<E, ID>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<ID>,
mutation_fn: CreateOneMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
ID: Into<TypeRef>,
{
let guard = builder_context
.guards
.entity_guards
.get(&get_entity_name::<E>(builder_context));
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_create_one_mutation_field_name::<E>(builder_context),
TypeRef::named_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let value_accessor = ctx
.args
.get(get_entity_create_one_mutation_data_field_name(
builder_context,
))
.unwrap();
let input_object = value_accessor.object()?;
for (column, _) in input_object.iter() {
let field_guard = field_guards.get(
&get_entity_and_column_name_from_column_str::<E>(builder_context, column),
);
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
let result = mutation_fn(&ctx, app_ctx.clone(), input_object)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::owned_any(result)))
})
},
)
.argument(InputValue::new(
get_entity_create_one_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn(get_entity_insert_data_input_type_name::<E>(builder_context))
}),
))
}
pub fn generate_entity_default_create_one_mutation_fn<T, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> CreateOneMutationFn<T::Model>
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync + IntoActiveModel<A>,
A: ActiveModelTrait<Entity = T> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |resolve_context, app_ctx, input_object| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_model = prepare_active_model::<T, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
)
.map_err(SeaographyError::AsyncGraphQLError);
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let active_model = active_model?;
let active_model = active_model.before_save(&transaction, true).await?;
let result: T::Model = active_model.insert(&transaction).await?;
let result = A::after_save(result, &transaction, true).await?;
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_model = active_model?;
let result: T::Model = active_model.insert(db).await?;
Ok(result)
}
})
})
}
pub fn generate_entity_default_create_one_mutation_field<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_create_one_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_create_one_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
}
pub fn generate_entity_create_batch_mutation_field<E, ID>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<ID>,
mutation_fn: CreateBatchMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
ID: Into<TypeRef>,
{
let object_name: String = get_entity_name::<E>(builder_context);
let guard = builder_context.guards.entity_guards.get(&object_name);
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_create_batch_mutation_field_name::<E>(builder_context),
TypeRef::named_nn_list_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
"Entity guard triggered.",
)),
};
}
let mut input_objects: Vec<ObjectAccessor<'_>> = vec![];
let list = ctx
.args
.get(get_entity_create_batch_mutation_data_field_name(
builder_context,
))
.unwrap()
.list()?;
for input in list.iter() {
let input_object = input.object()?;
for (column, _) in input_object.iter() {
let field_guard =
field_guards.get(&get_entity_and_column_name_from_column_str::<E>(
builder_context,
column,
));
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
input_objects.push(input_object);
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let results = mutation_fn(&ctx, app_ctx.clone(), input_objects)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::list(
results.into_iter().map(FieldValue::owned_any),
)))
})
},
)
.argument(InputValue::new(
get_entity_create_batch_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn_list_nn(get_entity_insert_data_input_type_name::<E>(builder_context))
}),
))
}
pub fn generate_entity_default_create_batch_mutation_fn<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> CreateBatchMutationFn<E::Model>
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |resolve_context, app_ctx, input_objects| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_models = input_objects
.into_iter()
.map(|input_object| {
prepare_active_model::<E, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(SeaographyError::AsyncGraphQLError);
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let mut before_save_models = vec![];
for active_model in active_models? {
let before_save_model = active_model.before_save(&transaction, false).await?;
before_save_models.push(before_save_model);
}
let models: Vec<E::Model> = E::insert_many(before_save_models)
.exec_with_returning_many(&transaction)
.await?;
let mut result = vec![];
for model in models {
let after_save_model = A::after_save(model, &transaction, false).await?;
result.push(after_save_model);
}
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_models = active_models?;
let results: Vec<E::Model> = E::insert_many(active_models)
.exec_with_returning_many(db)
.await?;
Ok(results)
}
})
})
}
pub fn generate_entity_default_create_batch_mutation_field<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_create_batch_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_create_batch_mutation_fn::<E, A>(
builder_context,
active_model_hooks,
),
)
}
pub fn generate_entity_update_mutation_field<E, I>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<I>,
mutation_fn: UpdateMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
I: Into<TypeRef>,
{
let guard = builder_context
.guards
.entity_guards
.get(&get_entity_name::<E>(builder_context));
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_update_mutation_field_name::<E>(builder_context),
TypeRef::named_nn_list_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
"Entity guard triggered.",
)),
};
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let filters = ctx.args.get(get_entity_update_mutation_filter_field_name(
builder_context,
));
let filter_condition = get_filter_conditions::<E>(&ctx, builder_context, filters);
let value_accessor = ctx
.args
.get(get_entity_update_mutation_data_field_name(builder_context))
.unwrap();
let input_object = value_accessor.object()?;
for (column, _) in input_object.iter() {
let field_guard = field_guards.get(
&get_entity_and_column_name_from_column_str::<E>(builder_context, column),
);
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
let result = mutation_fn(&ctx, app_ctx.clone(), filter_condition, input_object)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::list(
result.into_iter().map(FieldValue::owned_any),
)))
})
},
)
.argument(InputValue::new(
get_entity_update_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn(get_entity_update_data_input_type_name::<E>(builder_context))
}),
))
.argument(InputValue::new(
get_entity_update_mutation_filter_field_name(builder_context),
TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
))
}
pub fn generate_entity_default_update_mutation_fn<T, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> UpdateMutationFn<T::Model>
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync + IntoActiveModel<A>,
A: ActiveModelTrait<Entity = T> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(
move |resolve_context, app_ctx, filter_condition, input_object| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_model = prepare_active_model::<T, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
)
.map_err(SeaographyError::AsyncGraphQLError);
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let active_model = active_model?;
let active_model = active_model.before_save(&transaction, false).await?;
let models = T::update_many()
.set(active_model)
.filter(filter_condition.clone())
.exec_with_returning(&transaction)
.await?;
let mut result = vec![];
for model in models {
result.push(A::after_save(model, &transaction, false).await?);
}
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_model = active_model?;
let result = T::update_many()
.set(active_model)
.filter(filter_condition.clone())
.exec_with_returning(db)
.await?;
Ok(result)
}
})
},
)
}
pub fn generate_entity_default_update_mutation_field<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_update_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_update_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
}
pub fn generate_entity_delete_mutation_field<E>(
builder_context: &'static BuilderContext,
mutation_fn: DeleteMutationFn,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
{
let object_name: String = get_entity_name::<E>(builder_context);
let guard = builder_context.guards.entity_guards.get(&object_name);
Field::new(
get_entity_delete_mutation_field_name::<E>(builder_context),
TypeRef::named_nn(TypeRef::INT),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let filters = ctx.args.get(get_entity_delete_mutation_filter_field_name(
builder_context,
));
let filter_condition = get_filter_conditions::<E>(&ctx, builder_context, filters);
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let res = mutation_fn(&ctx, app_ctx.clone(), filter_condition)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(async_graphql::Value::from(res)))
})
},
)
.argument(InputValue::new(
get_entity_delete_mutation_filter_field_name(builder_context),
TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
))
}
pub fn generate_entity_default_delete_mutation_fn<E, A>(
_builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> DeleteMutationFn
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |_resolve_context, app_ctx, filter_condition| {
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let models: Vec<E::Model> = E::find()
.filter(filter_condition.clone())
.all(&transaction)
.await?;
let mut active_models: Vec<A> = vec![];
for model in models {
let active_model = model.into_active_model();
active_models.push(active_model.before_delete(&transaction).await?);
}
let result = E::delete_many()
.filter(filter_condition)
.exec(&transaction)
.await?;
for active_model in active_models {
active_model.after_delete(&transaction).await?;
}
transaction.commit().await?;
Ok(result.rows_affected)
} else {
let db = app_ctx.db();
let result = E::delete_many().filter(filter_condition).exec(db).await?;
Ok(result.rows_affected)
}
})
})
}
pub fn generate_entity_default_delete_mutation_field<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_delete_mutation_field::<E>(
builder_context,
generate_entity_default_delete_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
}

View File

@ -17,7 +17,7 @@ use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use crate::{ use crate::{
errors::RecorderResult, graphql::infra::util::get_entity_column_key, errors::RecorderResult, graphql::infra::name::get_entity_and_column_name,
utils::json::convert_json_keys, utils::json::convert_json_keys,
}; };
@ -946,9 +946,8 @@ where
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_column_key = get_entity_column_key::<T>(context, column);
context.filter_types.overwrites.insert( context.filter_types.overwrites.insert(
entity_column_key.clone(), get_entity_and_column_name::<T>(context, column),
Some(FilterType::Custom(JSONB_FILTER_NAME.to_string())), Some(FilterType::Custom(JSONB_FILTER_NAME.to_string())),
); );
} }
@ -959,20 +958,20 @@ where
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
S: DeserializeOwned + Serialize, S: DeserializeOwned + Serialize,
{ {
let entity_column_key = get_entity_column_key::<T>(context, column); let entity_column_name = get_entity_and_column_name::<T>(context, column);
context.types.input_conversions.insert( context.types.input_conversions.insert(
entity_column_key.clone(), entity_column_name.clone(),
Box::new(move |_resolve_context, accessor| { Box::new(move |_resolve_context, accessor| {
let deserialized = accessor.deserialize::<S>().map_err(|err| { let deserialized = accessor.deserialize::<S>().map_err(|err| {
SeaographyError::TypeConversionError( SeaographyError::TypeConversionError(
err.message, err.message,
format!("Json - {entity_column_key}"), format!("Json - {entity_column_name}"),
) )
})?; })?;
let json_value = serde_json::to_value(deserialized).map_err(|err| { let json_value = serde_json::to_value(deserialized).map_err(|err| {
SeaographyError::TypeConversionError( SeaographyError::TypeConversionError(
err.to_string(), err.to_string(),
format!("Json - {entity_column_key}"), format!("Json - {entity_column_name}"),
) )
})?; })?;
Ok(sea_orm::Value::Json(Some(Box::new(json_value)))) Ok(sea_orm::Value::Json(Some(Box::new(json_value))))
@ -980,20 +979,21 @@ where
); );
} }
pub fn convert_jsonb_output_case_for_entity<T>(context: &mut BuilderContext, column: &T::Column) pub fn convert_jsonb_output_case_for_entity<T>(
where context: &mut BuilderContext,
column: &T::Column,
case: Case<'static>,
) where
T: EntityTrait, T: EntityTrait,
<T as EntityTrait>::Model: Sync, <T as EntityTrait>::Model: Sync,
{ {
let entity_column_key = get_entity_column_key::<T>(context, column); let entity_column_key = get_entity_and_column_name::<T>(context, column);
context.types.output_conversions.insert( context.types.output_conversions.insert(
entity_column_key.clone(), entity_column_key.clone(),
Box::new(move |value| { Box::new(move |value| {
if let sea_orm::Value::Json(Some(json)) = value { if let sea_orm::Value::Json(Some(json)) = value {
let result = async_graphql::Value::from_json(convert_json_keys( let result =
json.as_ref().clone(), async_graphql::Value::from_json(convert_json_keys(json.as_ref().clone(), case))
Case::Camel,
))
.map_err(|err| { .map_err(|err| {
SeaographyError::TypeConversionError( SeaographyError::TypeConversionError(
err.to_string(), err.to_string(),

View File

@ -1,4 +1,4 @@
pub mod crypto; pub mod crypto;
pub mod custom; pub mod custom;
pub mod json; pub mod json;
pub mod util; pub mod name;

View File

@ -0,0 +1,203 @@
use std::fmt::Display;
use sea_orm::{EntityName, EntityTrait, IdenStatic};
use seaography::BuilderContext;
pub fn get_entity_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let t = T::default();
let name = <T as EntityName>::table_name(&t);
context.entity_object.type_name.as_ref()(name)
}
pub fn get_column_name<T>(context: &BuilderContext, column: &T::Column) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
context.entity_object.column_name.as_ref()(&entity_name, column.as_str())
}
pub fn get_entity_and_column_name<T>(context: &BuilderContext, column: &T::Column) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
let column_name = get_column_name::<T>(context, column);
format!("{entity_name}.{column_name}")
}
pub fn get_entity_and_column_name_from_column_str<T>(
context: &BuilderContext,
column_str: &str,
) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
format!("{entity_name}.{column_str}")
}
pub fn get_entity_basic_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let t = T::default();
let name = <T as EntityName>::table_name(&t);
format!(
"{}{}",
context.entity_object.type_name.as_ref()(name),
context.entity_object.basic_type_suffix
)
}
pub fn get_entity_query_field_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
context.entity_query_field.type_name.as_ref()(&entity_name)
}
pub fn get_entity_filter_input_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
context.filter_input.type_name.as_ref()(&entity_name)
}
pub fn get_entity_insert_data_input_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
format!("{entity_name}{}", context.entity_input.insert_suffix)
}
pub fn get_entity_update_data_input_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_name::<T>(context);
format!("{entity_name}{}", context.entity_input.update_suffix)
}
pub fn get_entity_create_one_mutation_field_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let query_field_name = get_entity_query_field_name::<T>(context);
format!(
"{}{}",
query_field_name, context.entity_create_one_mutation.mutation_suffix
)
}
pub fn get_entity_create_batch_mutation_field_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let query_field_name = get_entity_query_field_name::<T>(context);
format!(
"{}{}",
query_field_name, context.entity_create_batch_mutation.mutation_suffix
)
}
pub fn get_entity_delete_mutation_field_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let query_field_name = get_entity_query_field_name::<T>(context);
format!(
"{}{}",
query_field_name, context.entity_delete_mutation.mutation_suffix
)
}
pub fn get_entity_update_mutation_field_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let query_field_name = get_entity_query_field_name::<T>(context);
format!(
"{}{}",
query_field_name, context.entity_update_mutation.mutation_suffix
)
}
pub fn get_entity_custom_mutation_field_name<T>(
context: &BuilderContext,
mutation_suffix: impl Display,
) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let query_field_name = get_entity_query_field_name::<T>(context);
format!("{query_field_name}{mutation_suffix}")
}
pub fn get_entity_renormalized_filter_field_name() -> &'static str {
"filter"
}
pub fn get_entity_query_filter_field_name(context: &BuilderContext) -> &str {
&context.entity_query_field.filters
}
pub fn get_entity_update_mutation_filter_field_name(context: &BuilderContext) -> &str {
&context.entity_update_mutation.filter_field
}
pub fn get_entity_delete_mutation_filter_field_name(context: &BuilderContext) -> &str {
&context.entity_delete_mutation.filter_field
}
pub fn renormalize_filter_field_names_to_schema_context(context: &mut BuilderContext) {
let renormalized_filter_field_name = get_entity_renormalized_filter_field_name();
context.entity_query_field.filters = renormalized_filter_field_name.to_string();
context.entity_update_mutation.filter_field = renormalized_filter_field_name.to_string();
context.entity_delete_mutation.filter_field = renormalized_filter_field_name.to_string();
}
pub fn get_entity_renormalized_data_field_name() -> &'static str {
"data"
}
pub fn get_entity_create_one_mutation_data_field_name(context: &BuilderContext) -> &str {
&context.entity_create_one_mutation.data_field
}
pub fn get_entity_create_batch_mutation_data_field_name(context: &BuilderContext) -> &str {
&context.entity_create_batch_mutation.data_field
}
pub fn get_entity_update_mutation_data_field_name(context: &BuilderContext) -> &str {
&context.entity_update_mutation.data_field
}
pub fn renormalize_data_field_names_to_schema_context(context: &mut BuilderContext) {
let renormalized_data_field_name = get_entity_renormalized_data_field_name();
context.entity_create_one_mutation.data_field = renormalized_data_field_name.to_string();
context.entity_create_batch_mutation.data_field = renormalized_data_field_name.to_string();
context.entity_update_mutation.data_field = renormalized_data_field_name.to_string();
}

View File

@ -1,30 +0,0 @@
use sea_orm::{EntityName, EntityTrait, IdenStatic};
use seaography::BuilderContext;
pub fn get_entity_key<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
context.entity_object.type_name.as_ref()(<T as EntityName>::table_name(&T::default()))
}
pub fn get_column_key<T>(context: &BuilderContext, column: &T::Column) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_key::<T>(context);
context.entity_object.column_name.as_ref()(&entity_name, column.as_str())
}
pub fn get_entity_column_key<T>(context: &BuilderContext, column: &T::Column) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_name = get_entity_key::<T>(context);
let column_name = get_column_key::<T>(context, column);
format!("{}.{}", &entity_name, &column_name)
}

View File

@ -39,7 +39,13 @@ use crate::{
register_subscriptions_to_schema_builder, register_subscriptions_to_schema_context, register_subscriptions_to_schema_builder, register_subscriptions_to_schema_context,
}, },
}, },
infra::json::register_jsonb_input_filter_to_schema_builder, infra::{
json::register_jsonb_input_filter_to_schema_builder,
name::{
renormalize_data_field_names_to_schema_context,
renormalize_filter_field_names_to_schema_context,
},
},
}, },
}; };
@ -55,6 +61,9 @@ pub fn build_schema(
let context = CONTEXT.get_or_init(|| { let context = CONTEXT.get_or_init(|| {
let mut context = BuilderContext::default(); let mut context = BuilderContext::default();
renormalize_filter_field_names_to_schema_context(&mut context);
renormalize_data_field_names_to_schema_context(&mut context);
{ {
// domains // domains
register_feeds_to_schema_context(&mut context); register_feeds_to_schema_context(&mut context);

View File

@ -12,7 +12,6 @@
)] )]
#![allow(clippy::enum_variant_names)] #![allow(clippy::enum_variant_names)]
pub use downloader; pub use downloader;
pub mod app; pub mod app;
pub mod auth; pub mod auth;
pub mod cache; pub mod cache;

View File

@ -175,7 +175,6 @@ pub enum Feeds {
pub enum Cron { pub enum Cron {
Table, Table,
Id, Id,
CronSource,
SubscriberId, SubscriberId,
SubscriptionId, SubscriptionId,
CronExpr, CronExpr,
@ -190,6 +189,7 @@ pub enum Cron {
MaxAttempts, MaxAttempts,
Priority, Priority,
Status, Status,
SubscriberTask,
} }
macro_rules! create_postgres_enum_for_active_enum { macro_rules! create_postgres_enum_for_active_enum {

View File

@ -52,8 +52,7 @@ impl MigrationTrait for Migration {
subscriptions::SubscriptionCategoryEnum, subscriptions::SubscriptionCategoryEnum,
subscriptions::SubscriptionCategory::MikanSubscriber, subscriptions::SubscriptionCategory::MikanSubscriber,
subscriptions::SubscriptionCategory::MikanBangumi, subscriptions::SubscriptionCategory::MikanBangumi,
subscriptions::SubscriptionCategory::MikanSeason, subscriptions::SubscriptionCategory::MikanSeason
subscriptions::SubscriptionCategory::Manual
) )
.await?; .await?;

View File

@ -17,8 +17,8 @@ SELECT
job, job,
job_type, job_type,
status, status,
(job ->> 'subscriber_id'::text)::integer AS subscriber_id, (job ->> 'subscriber_id')::integer AS subscriber_id,
job ->> 'task_type'::text AS task_type, job ->> 'task_type' AS task_type,
id, id,
attempts, attempts,
max_attempts, max_attempts,
@ -28,7 +28,7 @@ SELECT
lock_by, lock_by,
done_at, done_at,
priority, priority,
(job ->> 'subscription_id'::text)::integer AS subscription_id (job ->> 'subscription_id')::integer AS subscription_id
FROM apalis.jobs FROM apalis.jobs
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}' WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")') AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')

View File

@ -7,9 +7,9 @@ use crate::{
}, },
models::cron::{ models::cron::{
CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME,
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronStatus, CronStatusEnum,
CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME,
}, },
}; };
@ -19,9 +19,6 @@ pub struct Migration;
#[async_trait] #[async_trait]
impl MigrationTrait for Migration { impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
create_postgres_enum_for_active_enum!(manager, CronSourceEnum, CronSource::Subscription)
.await?;
create_postgres_enum_for_active_enum!( create_postgres_enum_for_active_enum!(
manager, manager,
CronStatusEnum, CronStatusEnum,
@ -37,11 +34,6 @@ impl MigrationTrait for Migration {
table_auto_z(Cron::Table) table_auto_z(Cron::Table)
.col(pk_auto(Cron::Id)) .col(pk_auto(Cron::Id))
.col(string(Cron::CronExpr)) .col(string(Cron::CronExpr))
.col(enumeration(
Cron::CronSource,
CronSourceEnum,
CronSource::iden_values(),
))
.col(integer_null(Cron::SubscriberId)) .col(integer_null(Cron::SubscriberId))
.col(integer_null(Cron::SubscriptionId)) .col(integer_null(Cron::SubscriptionId))
.col(timestamp_with_time_zone_null(Cron::NextRun)) .col(timestamp_with_time_zone_null(Cron::NextRun))
@ -59,13 +51,14 @@ impl MigrationTrait for Migration {
CronStatusEnum, CronStatusEnum,
CronStatus::iden_values(), CronStatus::iden_values(),
)) ))
.col(json_binary_null(Cron::SubscriberTask))
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
.name("fk_cron_subscriber_id") .name("fk_cron_subscriber_id")
.from(Cron::Table, Cron::SubscriberId) .from(Cron::Table, Cron::SubscriberId)
.to(Subscribers::Table, Subscribers::Id) .to(Subscribers::Table, Subscribers::Id)
.on_delete(ForeignKeyAction::Cascade) .on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade), .on_update(ForeignKeyAction::Restrict),
) )
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
@ -73,7 +66,7 @@ impl MigrationTrait for Migration {
.from(Cron::Table, Cron::SubscriptionId) .from(Cron::Table, Cron::SubscriptionId)
.to(Subscriptions::Table, Subscriptions::Id) .to(Subscriptions::Table, Subscriptions::Id)
.on_delete(ForeignKeyAction::Cascade) .on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade), .on_update(ForeignKeyAction::Restrict),
) )
.to_owned(), .to_owned(),
) )
@ -83,17 +76,6 @@ impl MigrationTrait for Migration {
.create_postgres_auto_update_ts_trigger_for_col(Cron::Table, GeneralIds::UpdatedAt) .create_postgres_auto_update_ts_trigger_for_col(Cron::Table, GeneralIds::UpdatedAt)
.await?; .await?;
manager
.create_index(
IndexCreateStatement::new()
.if_not_exists()
.name("idx_cron_cron_source")
.table(Cron::Table)
.col(Cron::CronSource)
.to_owned(),
)
.await?;
manager manager
.create_index( .create_index(
IndexCreateStatement::new() IndexCreateStatement::new()
@ -107,6 +89,32 @@ impl MigrationTrait for Migration {
let db = manager.get_connection(); let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$
BEGIN
IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscriber_id ? (@.type() == "number")') THEN
NEW.{subscriber_id} = (NEW.{subscriber_task} ->> 'subscriber_id')::integer;
END IF;
IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscription_id ? (@.type() == "number")') THEN
NEW.{subscription_id} = (NEW.{subscriber_task} ->> 'subscription_id')::integer;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;"#,
subscriber_task = &Cron::SubscriberTask.to_string(),
subscriber_id = &Cron::SubscriberId.to_string(),
subscription_id = &Cron::SubscriptionId.to_string(),
)).await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE TRIGGER {SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME}
BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}();"#,
table = &Cron::Table.to_string(),
))
.await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}() RETURNS trigger AS $$ r#"CREATE OR REPLACE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}() RETURNS trigger AS $$
BEGIN BEGIN
@ -150,7 +158,7 @@ impl MigrationTrait for Migration {
.await?; .await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"CREATE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} r#"CREATE OR REPLACE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME}
AFTER INSERT OR UPDATE ON {table} AFTER INSERT OR UPDATE ON {table}
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#,
@ -265,10 +273,6 @@ impl MigrationTrait for Migration {
) )
.await?; .await?;
manager
.drop_postgres_enum_for_active_enum(CronSourceEnum)
.await?;
manager manager
.drop_postgres_enum_for_active_enum(CronStatusEnum) .drop_postgres_enum_for_active_enum(CronStatusEnum)
.await?; .await?;

View File

@ -64,7 +64,9 @@ impl Model {
.one(db) .one(db)
.await? .await?
.ok_or_else(|| { .ok_or_else(|| {
RecorderError::from_model_not_found_detail("auth", format!("pid {pid} not found")) RecorderError::from_entity_not_found_detail::<Entity, _>(format!(
"pid {pid} not found"
))
})?; })?;
Ok(subscriber_auth) Ok(subscriber_auth)
} }

View File

@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};
pub const CRON_DUE_EVENT: &str = "cron_due"; pub const CRON_DUE_EVENT: &str = "cron_due";
pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str = pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str =
@ -7,3 +9,15 @@ pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_d
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating";
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str = pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str =
"notify_due_cron_when_mutating_trigger"; "notify_due_cron_when_mutating_trigger";
pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME: &str = "setup_cron_extra_foreign_keys";
pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME: &str =
"setup_cron_extra_foreign_keys_trigger";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CronCreateOptions {
pub cron_expr: String,
pub priority: Option<i32>,
pub timeout_ms: Option<i32>,
pub max_attempts: Option<i32>,
pub enabled: Option<bool>,
}

View File

@ -3,8 +3,9 @@ mod registry;
pub use core::{ pub use core::{
CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME,
CRON_DUE_EVENT, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, CRON_DUE_EVENT, CronCreateOptions, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME,
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME,
SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@ -17,21 +18,7 @@ use sea_orm::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks};
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
models::subscriptions::{self},
};
#[derive(
Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_source")]
#[serde(rename_all = "snake_case")]
pub enum CronSource {
#[sea_orm(string_value = "subscription")]
Subscription,
}
#[derive( #[derive(
Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize,
@ -58,7 +45,6 @@ pub struct Model {
pub updated_at: DateTimeUtc, pub updated_at: DateTimeUtc,
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: i32, pub id: i32,
pub cron_source: CronSource,
pub subscriber_id: Option<i32>, pub subscriber_id: Option<i32>,
pub subscription_id: Option<i32>, pub subscription_id: Option<i32>,
pub cron_expr: String, pub cron_expr: String,
@ -67,6 +53,7 @@ pub struct Model {
pub last_error: Option<String>, pub last_error: Option<String>,
pub locked_by: Option<String>, pub locked_by: Option<String>,
pub locked_at: Option<DateTimeUtc>, pub locked_at: Option<DateTimeUtc>,
#[sea_orm(default_expr = "5000")]
pub timeout_ms: i32, pub timeout_ms: i32,
#[sea_orm(default_expr = "0")] #[sea_orm(default_expr = "0")]
pub attempts: i32, pub attempts: i32,
@ -77,6 +64,7 @@ pub struct Model {
pub status: CronStatus, pub status: CronStatus,
#[sea_orm(default_expr = "true")] #[sea_orm(default_expr = "true")]
pub enabled: bool, pub enabled: bool,
pub subscriber_task: Option<subscriber_tasks::SubscriberTask>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -119,6 +107,38 @@ pub enum RelatedEntity {
Subscription, Subscription,
} }
impl ActiveModel {
pub fn from_subscriber_task(
subscriber_task: subscriber_tasks::SubscriberTask,
cron_options: CronCreateOptions,
) -> RecorderResult<Self> {
let mut active_model = Self {
next_run: Set(Some(Model::calculate_next_run(&cron_options.cron_expr)?)),
cron_expr: Set(cron_options.cron_expr),
subscriber_task: Set(Some(subscriber_task)),
..Default::default()
};
if let Some(priority) = cron_options.priority {
active_model.priority = Set(priority);
}
if let Some(timeout_ms) = cron_options.timeout_ms {
active_model.timeout_ms = Set(timeout_ms);
}
if let Some(max_attempts) = cron_options.max_attempts {
active_model.max_attempts = Set(max_attempts);
}
if let Some(enabled) = cron_options.enabled {
active_model.enabled = Set(enabled);
}
Ok(active_model)
}
}
#[async_trait] #[async_trait]
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}
@ -196,19 +216,13 @@ impl Model {
} }
async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
match self.cron_source { if let Some(subscriber_task) = self.subscriber_task.as_ref() {
CronSource::Subscription => { let task_service = ctx.task();
let subscription_id = self.subscription_id.unwrap_or_else(|| { task_service
unreachable!("Subscription cron must have a subscription id") .add_subscriber_task(subscriber_task.clone())
}); .await?;
} else {
let subscription = subscriptions::Entity::find_by_id(subscription_id) unimplemented!("Cron without subscriber task is not supported now");
.one(ctx.db())
.await?
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
subscription.exec_cron(ctx).await?;
}
} }
Ok(()) Ok(())
@ -217,7 +231,7 @@ impl Model {
async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
let db = ctx.db(); let db = ctx.db();
let next_run = self.calculate_next_run(&self.cron_expr)?; let next_run = Self::calculate_next_run(&self.cron_expr)?;
ActiveModel { ActiveModel {
id: Set(self.id), id: Set(self.id),
@ -250,7 +264,7 @@ impl Model {
let next_run = if should_retry { let next_run = if should_retry {
Some(Utc::now() + chrono::Duration::seconds(5)) Some(Utc::now() + chrono::Duration::seconds(5))
} else { } else {
Some(self.calculate_next_run(&self.cron_expr)?) Some(Self::calculate_next_run(&self.cron_expr)?)
}; };
ActiveModel { ActiveModel {
@ -295,7 +309,7 @@ impl Model {
Ok(()) Ok(())
} }
fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult<DateTime<Utc>> { pub fn calculate_next_run(cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
let cron_expr = Cron::new(cron_expr).parse()?; let cron_expr = Cron::new(cron_expr).parse()?;
let next = cron_expr.find_next_occurrence(&Utc::now(), false)?; let next = cron_expr.find_next_occurrence(&Utc::now(), false)?;

View File

@ -122,7 +122,7 @@ impl Model {
.filter(Column::FeedType.eq(FeedType::Rss)) .filter(Column::FeedType.eq(FeedType::Rss))
.one(db) .one(db)
.await? .await?
.ok_or(RecorderError::from_model_not_found("Feed"))?; .ok_or(RecorderError::from_entity_not_found::<Entity>())?;
let feed = Feed::from_model(ctx, feed_model).await?; let feed = Feed::from_model(ctx, feed_model).await?;

View File

@ -44,7 +44,7 @@ impl Feed {
.await?; .await?;
(subscription, episodes) (subscription, episodes)
} else { } else {
return Err(RecorderError::from_model_not_found("Subscription")); return Err(RecorderError::from_entity_not_found::<subscriptions::Entity>());
}; };
Ok(Feed::SubscritpionEpisodes( Ok(Feed::SubscritpionEpisodes(

View File

@ -131,7 +131,7 @@ impl Model {
let db = ctx.db(); let db = ctx.db();
let subscriber = Entity::find_by_id(id).one(db).await?.ok_or_else(|| { let subscriber = Entity::find_by_id(id).one(db).await?.ok_or_else(|| {
RecorderError::from_model_not_found_detail("subscribers", format!("id {id} not found")) RecorderError::from_entity_not_found_detail::<Entity, _>(format!("id {id} not found"))
})?; })?;
Ok(subscriber) Ok(subscriber)
} }

View File

@ -11,10 +11,7 @@ pub use registry::{
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{app::AppContextTrait, errors::RecorderResult};
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscriptions")] #[sea_orm(table_name = "subscriptions")]
@ -155,50 +152,6 @@ impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {} impl ActiveModel {}
impl Model { impl Model {
pub async fn toggle_with_ids(
ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = i32>,
enabled: bool,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::update_many()
.col_expr(Column::Enabled, Expr::value(enabled))
.filter(Column::Id.is_in(ids))
.exec(db)
.await?;
Ok(())
}
pub async fn delete_with_ids(
ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = i32>,
) -> RecorderResult<()> {
let db = ctx.db();
Entity::delete_many()
.filter(Column::Id.is_in(ids))
.exec(db)
.await?;
Ok(())
}
pub async fn find_by_id_and_subscriber_id(
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
) -> RecorderResult<Self> {
let db = ctx.db();
let subscription_model = Entity::find_by_id(subscription_id)
.one(db)
.await?
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::from_model_not_found("Subscription"))?;
}
Ok(subscription_model)
}
pub async fn exec_cron(&self, _ctx: &dyn AppContextTrait) -> RecorderResult<()> { pub async fn exec_cron(&self, _ctx: &dyn AppContextTrait) -> RecorderResult<()> {
todo!() todo!()
} }

View File

@ -1,129 +1,147 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter}; use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
app::AppContextTrait, errors::RecorderResult,
errors::{RecorderError, RecorderResult},
extract::mikan::{ extract::mikan::{
MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription, MikanBangumiSubscription, MikanSeasonSubscription, MikanSubscriberSubscription,
}, },
models::subscriptions::{self, SubscriptionTrait}, models::subscriptions::{self, SubscriptionTrait},
}; };
#[derive( macro_rules! register_subscription_type {
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, DeriveDisplay, (
)] subscription_category_enum: {
#[sea_orm( $(#[$subscription_category_enum_meta:meta])*
pub enum $type_enum_name:ident {
$(
$(#[$variant_meta:meta])*
$variant:ident => $string_value:literal
),* $(,)?
}
}$(,)?
subscription_enum: {
$(#[$subscription_enum_meta:meta])*
pub enum $subscription_enum_name:ident {
$(
$subscription_variant:ident($subscription_type:ty)
),* $(,)?
}
}
) => {
$(#[$subscription_category_enum_meta])*
#[sea_orm(
rs_type = "String", rs_type = "String",
db_type = "Enum", db_type = "Enum",
enum_name = "subscription_category" enum_name = "subscription_category"
)] )]
#[serde(rename_all = "snake_case")] pub enum $type_enum_name {
pub enum SubscriptionCategory { $(
#[sea_orm(string_value = "mikan_subscriber")] $(#[$variant_meta])*
MikanSubscriber, #[serde(rename = $string_value)]
#[sea_orm(string_value = "mikan_season")] #[sea_orm(string_value = $string_value)]
MikanSeason, $variant,
#[sea_orm(string_value = "mikan_bangumi")] )*
MikanBangumi, }
#[sea_orm(string_value = "manual")]
Manual,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "category")]
pub enum Subscription {
#[serde(rename = "mikan_subscriber")]
MikanSubscriber(MikanSubscriberSubscription),
#[serde(rename = "mikan_season")]
MikanSeason(MikanSeasonSubscription),
#[serde(rename = "mikan_bangumi")]
MikanBangumi(MikanBangumiSubscription),
#[serde(rename = "manual")]
Manual,
}
impl Subscription { $(#[$subscription_enum_meta])*
pub fn category(&self) -> SubscriptionCategory { #[serde(tag = "category")]
pub enum $subscription_enum_name {
$(
#[serde(rename = $string_value)]
$subscription_variant($subscription_type),
)*
}
impl $subscription_enum_name {
pub fn category(&self) -> $type_enum_name {
match self { match self {
Self::MikanSubscriber(_) => SubscriptionCategory::MikanSubscriber, $(Self::$subscription_variant(_) => $type_enum_name::$variant,)*
Self::MikanSeason(_) => SubscriptionCategory::MikanSeason, }
Self::MikanBangumi(_) => SubscriptionCategory::MikanBangumi,
Self::Manual => SubscriptionCategory::Manual,
} }
} }
}
#[async_trait] #[async_trait::async_trait]
impl SubscriptionTrait for Subscription { impl $crate::models::subscriptions::SubscriptionTrait for $subscription_enum_name {
fn get_subscriber_id(&self) -> i32 { fn get_subscriber_id(&self) -> i32 {
match self { match self {
Self::MikanSubscriber(subscription) => subscription.get_subscriber_id(), $(Self::$subscription_variant(subscription) => subscription.get_subscriber_id(),)*
Self::MikanSeason(subscription) => subscription.get_subscriber_id(),
Self::MikanBangumi(subscription) => subscription.get_subscriber_id(),
Self::Manual => unreachable!(),
} }
} }
fn get_subscription_id(&self) -> i32 { fn get_subscription_id(&self) -> i32 {
match self { match self {
Self::MikanSubscriber(subscription) => subscription.get_subscription_id(), $(Self::$subscription_variant(subscription) => subscription.get_subscription_id(),)*
Self::MikanSeason(subscription) => subscription.get_subscription_id(),
Self::MikanBangumi(subscription) => subscription.get_subscription_id(),
Self::Manual => unreachable!(),
} }
} }
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> { async fn sync_feeds_incremental(&self, ctx: Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
match self { match self {
Self::MikanSubscriber(subscription) => subscription.sync_feeds_incremental(ctx).await, $(Self::$subscription_variant(subscription) => subscription.sync_feeds_incremental(ctx).await,)*
Self::MikanSeason(subscription) => subscription.sync_feeds_incremental(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds_incremental(ctx).await,
Self::Manual => Ok(()),
} }
} }
async fn sync_feeds_full(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> { async fn sync_feeds_full(&self, ctx: Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
match self { match self {
Self::MikanSubscriber(subscription) => subscription.sync_feeds_full(ctx).await, $(Self::$subscription_variant(subscription) => subscription.sync_feeds_full(ctx).await,)*
Self::MikanSeason(subscription) => subscription.sync_feeds_full(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds_full(ctx).await,
Self::Manual => Ok(()),
} }
} }
async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> { async fn sync_sources(&self, ctx: Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
match self { match self {
Self::MikanSubscriber(subscription) => subscription.sync_sources(ctx).await, $(Self::$subscription_variant(subscription) => subscription.sync_sources(ctx).await,)*
Self::MikanSeason(subscription) => subscription.sync_sources(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_sources(ctx).await,
Self::Manual => Ok(()),
} }
} }
fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> { fn try_from_model(model: &subscriptions::Model) -> RecorderResult<Self> {
match model.category { match model.category {
SubscriptionCategory::MikanSubscriber => { $($type_enum_name::$variant => {
MikanSubscriberSubscription::try_from_model(model).map(Self::MikanSubscriber) <$subscription_type as $crate::models::subscriptions::SubscriptionTrait>::try_from_model(model).map(Self::$subscription_variant)
} })*
SubscriptionCategory::MikanSeason => { }
MikanSeasonSubscription::try_from_model(model).map(Self::MikanSeason)
}
SubscriptionCategory::MikanBangumi => {
MikanBangumiSubscription::try_from_model(model).map(Self::MikanBangumi)
}
SubscriptionCategory::Manual => Ok(Self::Manual),
} }
} }
}
impl TryFrom<&subscriptions::Model> for Subscription { impl TryFrom<&$crate::models::subscriptions::Model> for $subscription_enum_name {
type Error = RecorderError; type Error = $crate::errors::RecorderError;
fn try_from(model: &subscriptions::Model) -> Result<Self, Self::Error> { fn try_from(model: &$crate::models::subscriptions::Model) -> Result<Self, Self::Error> {
Self::try_from_model(model) Self::try_from_model(model)
} }
}
};
}
register_subscription_type! {
subscription_category_enum: {
#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter,
)]
pub enum SubscriptionCategory {
MikanSubscriber => "mikan_subscriber",
MikanSeason => "mikan_season",
MikanBangumi => "mikan_bangumi",
}
}
subscription_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Subscription {
MikanSubscriber(MikanSubscriberSubscription),
MikanSeason(MikanSeasonSubscription),
MikanBangumi(MikanBangumiSubscription)
}
}
} }

View File

@ -1,34 +1,56 @@
use std::sync::Arc; use std::sync::Arc;
use futures::Stream; use async_trait::async_trait;
use serde::{Serialize, de::DeserializeOwned}; use futures::{Stream, StreamExt, pin_mut};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::{app::AppContextTrait, errors::RecorderResult}; use crate::{app::AppContextTrait, errors::RecorderResult};
pub const SYSTEM_TASK_APALIS_NAME: &str = "system_task"; pub const SYSTEM_TASK_APALIS_NAME: &str = "system_task";
pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task"; pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task";
#[async_trait::async_trait] #[async_trait]
pub trait AsyncTaskTrait: Serialize + DeserializeOwned + Sized { pub trait AsyncTaskTrait: Serialize + DeserializeOwned + Sized {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>; async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.run_async(ctx).await?;
Ok(())
}
} }
#[async_trait::async_trait] pub trait StreamTaskTrait {
pub trait StreamTaskTrait: Serialize + DeserializeOwned + Sized {
type Yield: Serialize + DeserializeOwned + Send; type Yield: Serialize + DeserializeOwned + Send;
fn run_stream( fn run_stream(
self, self,
ctx: Arc<dyn AppContextTrait>, ctx: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = RecorderResult<Self::Yield>> + Send; ) -> impl Stream<Item = RecorderResult<Self::Yield>> + Send;
}
async fn run(self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> { #[async_trait]
unimplemented!() impl<T> AsyncTaskTrait for T
where
T: StreamTaskTrait + Serialize + DeserializeOwned + Sized + Send,
{
async fn run_async(self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let s = self.run_stream(_ctx);
pin_mut!(s);
while let Some(item) = s.next().await {
item?;
}
Ok(())
} }
} }
pub trait SubscriberTaskTrait: AsyncTaskTrait {
fn get_subscriber_id(&self) -> i32;
fn get_cron_id(&self) -> Option<i32>;
}
pub trait SystemTaskTrait: AsyncTaskTrait {}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct SubscriberTaskBase {
pub subscriber_id: i32,
pub cron_id: Option<i32>,
}

View File

@ -6,6 +6,7 @@ mod service;
pub use core::{ pub use core::{
AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, StreamTaskTrait, AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, StreamTaskTrait,
SubscriberTaskBase, SubscriberTaskTrait, SystemTaskTrait,
}; };
pub use config::TaskConfig; pub use config::TaskConfig;

View File

@ -1,18 +1,12 @@
mod media;
mod subscriber; mod subscriber;
mod subscription;
mod system; mod system;
pub use media::OptimizeImageTask;
pub use subscriber::{ pub use subscriber::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter, SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,
}; SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask,
pub use subscription::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
}; };
pub use system::{ pub use system::{
SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, OptimizeImageTask, SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant,
SystemTaskTypeVariantIter, SystemTaskTypeVariantIter,
}; };

View File

@ -1,100 +0,0 @@
use std::sync::Arc;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
models::subscriptions::SubscriptionTrait,
task::{
AsyncTaskTrait,
registry::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
},
},
};
#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter,
)]
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum SubscriberTaskType {
#[serde(rename = "sync_one_subscription_feeds_incremental")]
#[sea_orm(string_value = "sync_one_subscription_feeds_incremental")]
SyncOneSubscriptionFeedsIncremental,
#[serde(rename = "sync_one_subscription_feeds_full")]
#[sea_orm(string_value = "sync_one_subscription_feeds_full")]
SyncOneSubscriptionFeedsFull,
#[serde(rename = "sync_one_subscription_sources")]
#[sea_orm(string_value = "sync_one_subscription_sources")]
SyncOneSubscriptionSources,
}
impl TryFrom<&SubscriberTask> for serde_json::Value {
type Error = RecorderError;
fn try_from(value: &SubscriberTask) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task must be an json object");
}
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
#[serde(tag = "task_type")]
pub enum SubscriberTask {
#[serde(rename = "sync_one_subscription_feeds_incremental")]
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
#[serde(rename = "sync_one_subscription_feeds_full")]
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
#[serde(rename = "sync_one_subscription_sources")]
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
}
impl SubscriberTask {
pub fn get_subscriber_id(&self) -> i32 {
match self {
Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(),
Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(),
Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(),
}
}
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await,
Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await,
Self::SyncOneSubscriptionSources(task) => task.run(ctx).await,
}
}
pub fn task_type(&self) -> SubscriberTaskType {
match self {
Self::SyncOneSubscriptionFeedsIncremental(_) => {
SubscriberTaskType::SyncOneSubscriptionFeedsIncremental
}
Self::SyncOneSubscriptionFeedsFull(_) => {
SubscriberTaskType::SyncOneSubscriptionFeedsFull
}
Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources,
}
}
}

View File

@ -0,0 +1,29 @@
macro_rules! register_subscriber_task_type {
(
$(#[$type_meta:meta])*
$task_vis:vis struct $task_name:ident {
$($(#[$field_meta:meta])* pub $field_name:ident: $field_type:ty),* $(,)?
}
) => {
$(#[$type_meta])*
#[derive(typed_builder::TypedBuilder)]
$task_vis struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)*
pub subscriber_id: i32,
#[builder(default = None)]
pub cron_id: Option<i32>,
}
impl $crate::task::SubscriberTaskTrait for $task_name {
fn get_subscriber_id(&self) -> i32 {
self.subscriber_id
}
fn get_cron_id(&self) -> Option<i32> {
self.cron_id
}
}
}
}
pub(crate) use register_subscriber_task_type;

View File

@ -0,0 +1,140 @@
mod base;
mod subscription;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
pub use subscription::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
};
macro_rules! register_subscriber_task_types {
(
task_type_enum: {
$(#[$type_enum_meta:meta])*
pub enum $type_enum_name:ident {
$(
$(#[$variant_meta:meta])*
$variant:ident => $string_value:literal
),* $(,)?
}
},
task_enum: {
$(#[$task_enum_meta:meta])*
pub enum $task_enum_name:ident {
$(
$task_variant:ident($task_type:ty)
),* $(,)?
}
}
) => {
$(#[$type_enum_meta])*
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum $type_enum_name {
$(
$(#[$variant_meta])*
#[serde(rename = $string_value)]
#[sea_orm(string_value = $string_value)]
$variant,
)*
}
$(#[$task_enum_meta])*
#[serde(tag = "task_type")]
pub enum $task_enum_name {
$(
$task_variant($task_type),
)*
}
impl TryFrom<$task_enum_name> for serde_json::Value {
type Error = $crate::errors::RecorderError;
fn try_from(value: $task_enum_name) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task must be an json object");
}
})
}
}
impl $task_enum_name {
pub fn task_type(&self) -> $type_enum_name {
match self {
$(Self::$task_variant(_) => $type_enum_name::$variant,)*
}
}
}
#[async_trait::async_trait]
impl $crate::task::AsyncTaskTrait for $task_enum_name {
async fn run_async(self, ctx: std::sync::Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
match self {
$(Self::$task_variant(t) =>
<$task_type as $crate::task::AsyncTaskTrait>::run_async(t, ctx).await,)*
}
}
}
impl $crate::task::SubscriberTaskTrait for $task_enum_name {
fn get_subscriber_id(&self) -> i32 {
match self {
$(Self::$task_variant(t) =>
<$task_type as $crate::task::SubscriberTaskTrait>::get_subscriber_id(t),)*
}
}
fn get_cron_id(&self) -> Option<i32> {
match self {
$(Self::$task_variant(t) =>
<$task_type as $crate::task::SubscriberTaskTrait>::get_cron_id(t),)*
}
}
}
$(
impl From<$task_type> for $task_enum_name {
fn from(task: $task_type) -> Self {
Self::$task_variant(task)
}
}
)*
};
}
register_subscriber_task_types!(
task_type_enum: {
#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter,
)]
pub enum SubscriberTaskType {
SyncOneSubscriptionFeedsIncremental => "sync_one_subscription_feeds_incremental",
SyncOneSubscriptionFeedsFull => "sync_one_subscription_feeds_full",
SyncOneSubscriptionSources => "sync_one_subscription_sources"
}
},
task_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
pub enum SubscriberTask {
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
}
}
);

View File

@ -0,0 +1,67 @@
use sea_orm::prelude::*;
use serde::{Deserialize, Serialize};
use super::base::register_subscriber_task_type;
use crate::{errors::RecorderResult, models::subscriptions::SubscriptionTrait};
macro_rules! register_subscription_task_type {
(
$(#[$type_meta:meta])* pub struct $task_name:ident {
$($(#[$field_meta:meta])* pub $field_name:ident: $field_type:ty),* $(,)?
} => async |$subscription_param:ident, $ctx_param:ident| -> $task_return_type:ty $method_body:block
) => {
register_subscriber_task_type! {
$(#[$type_meta])*
pub struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)*
pub subscription_id: i32,
}
}
#[async_trait::async_trait]
impl $crate::task::AsyncTaskTrait for $task_name {
async fn run_async(self, ctx: std::sync::Arc<dyn $crate::app::AppContextTrait>) -> $task_return_type {
use $crate::models::subscriptions::{
Entity, Column, Subscription,
};
let subscription_model = Entity::find()
.filter(Column::Id.eq(self.subscription_id))
.filter(Column::SubscriberId.eq(self.subscriber_id))
.one(ctx.db())
.await?
.ok_or_else(|| $crate::errors::RecorderError::from_entity_not_found::<Entity>())?;
let $subscription_param = Subscription::try_from_model(&subscription_model)?;
let $ctx_param = ctx;
$method_body
}
}
}
}
register_subscription_task_type! {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsIncrementalTask {
} => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_feeds_incremental(ctx).await?;
Ok(())
}
}
register_subscription_task_type! {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsFullTask {
} => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_feeds_full(ctx).await?;
Ok(())
}
}
register_subscription_task_type! {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionSourcesTask {
} => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_sources(ctx).await?;
Ok(())
}
}

View File

@ -1,62 +0,0 @@
use std::sync::Arc;
use sea_orm::prelude::*;
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
models::subscriptions::{self, SubscriptionTrait},
task::AsyncTaskTrait,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsIncrementalTask(pub subscriptions::Subscription);
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsIncrementalTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}
#[async_trait::async_trait]
impl AsyncTaskTrait for SyncOneSubscriptionFeedsIncrementalTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_feeds_incremental(ctx).await?;
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsFullTask(pub subscriptions::Subscription);
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsFullTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}
#[async_trait::async_trait]
impl AsyncTaskTrait for SyncOneSubscriptionFeedsFullTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_feeds_full(ctx).await?;
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionSourcesTask(pub subscriptions::Subscription);
#[async_trait::async_trait]
impl AsyncTaskTrait for SyncOneSubscriptionSourcesTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_sources(ctx).await?;
Ok(())
}
}
impl From<subscriptions::Subscription> for SyncOneSubscriptionSourcesTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}

View File

@ -1,43 +0,0 @@
use std::sync::Arc;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
task::{AsyncTaskTrait, registry::media::OptimizeImageTask},
};
#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter,
)]
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum SystemTaskType {
#[serde(rename = "optimize_image")]
#[sea_orm(string_value = "optimize_image")]
OptimizeImage,
}
#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)]
pub enum SystemTask {
#[serde(rename = "optimize_image")]
OptimizeImage(OptimizeImageTask),
}
impl SystemTask {
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::OptimizeImage(task) => task.run(ctx).await,
}
}
}

View File

@ -0,0 +1,108 @@
mod media;
pub use media::OptimizeImageTask;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
macro_rules! register_system_task_types {
(
task_type_enum: {
$(#[$type_enum_meta:meta])*
pub enum $type_enum_name:ident {
$(
$(#[$variant_meta:meta])*
$variant:ident => $string_value:literal
),* $(,)?
}
},
task_enum: {
$(#[$task_enum_meta:meta])*
pub enum $task_enum_name:ident {
$(
$task_variant:ident($task_type:ty)
),* $(,)?
}
}
) => {
$(#[$type_enum_meta])*
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum $type_enum_name {
$(
$(#[$variant_meta])*
#[serde(rename = $string_value)]
#[sea_orm(string_value = $string_value)]
$variant,
)*
}
$(#[$task_enum_meta])*
#[serde(tag = "task_type")]
pub enum $task_enum_name {
$(
$task_variant($task_type),
)*
}
impl TryFrom<$task_enum_name> for serde_json::Value {
type Error = $crate::errors::RecorderError;
fn try_from(value: $task_enum_name) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task must be an json object");
}
})
}
}
impl $task_enum_name {
pub fn task_type(&self) -> $type_enum_name {
match self {
$(Self::$task_variant(_) => $type_enum_name::$variant,)*
}
}
}
#[async_trait::async_trait]
impl $crate::task::AsyncTaskTrait for $task_enum_name {
async fn run_async(self, ctx: std::sync::Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
match self {
$(Self::$task_variant(t) =>
<$task_type as $crate::task::AsyncTaskTrait>::run_async(t, ctx).await,)*
}
}
}
};
}
register_system_task_types! {
task_type_enum: {
#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter,
)]
pub enum SystemTaskType {
OptimizeImage => "optimize_image"
}
},
task_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)]
pub enum SystemTask {
OptimizeImage(OptimizeImageTask),
}
}
}

View File

@ -6,15 +6,16 @@ use apalis_sql::{
context::SqlContext, context::SqlContext,
postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage}, postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage},
}; };
use sea_orm::sqlx::postgres::PgListener; use sea_orm::{ActiveModelTrait, sqlx::postgres::PgListener};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::{RecorderError, RecorderResult}, errors::{RecorderError, RecorderResult},
models::cron::{self, CRON_DUE_EVENT}, models::cron::{self, CRON_DUE_EVENT, CronCreateOptions},
task::{ task::{
SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, TaskConfig, AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask,
TaskConfig,
config::{default_subscriber_task_workers, default_system_task_workers}, config::{default_subscriber_task_workers, default_system_task_workers},
registry::SystemTask, registry::SystemTask,
}, },
@ -65,7 +66,7 @@ impl TaskService {
) -> RecorderResult<()> { ) -> RecorderResult<()> {
let ctx = data.deref().clone(); let ctx = data.deref().clone();
job.run(ctx).await job.run_async(ctx).await
} }
async fn run_system_task( async fn run_system_task(
@ -73,7 +74,7 @@ impl TaskService {
data: Data<Arc<dyn AppContextTrait>>, data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<()> { ) -> RecorderResult<()> {
let ctx = data.deref().clone(); let ctx = data.deref().clone();
job.run(ctx).await job.run_async(ctx).await
} }
pub async fn retry_subscriber_task(&self, job_id: String) -> RecorderResult<()> { pub async fn retry_subscriber_task(&self, job_id: String) -> RecorderResult<()> {
@ -104,7 +105,6 @@ impl TaskService {
pub async fn add_subscriber_task( pub async fn add_subscriber_task(
&self, &self,
_subscriber_id: i32,
subscriber_task: SubscriberTask, subscriber_task: SubscriberTask,
) -> RecorderResult<TaskId> { ) -> RecorderResult<TaskId> {
let task_id = { let task_id = {
@ -121,6 +121,18 @@ impl TaskService {
Ok(task_id) Ok(task_id)
} }
pub async fn add_subscriber_task_cron(
&self,
subscriber_task: SubscriberTask,
cron_options: CronCreateOptions,
) -> RecorderResult<cron::Model> {
let c = cron::ActiveModel::from_subscriber_task(subscriber_task, cron_options)?;
let c = c.insert(self.ctx.db()).await?;
Ok(c)
}
pub async fn add_system_task(&self, system_task: SystemTask) -> RecorderResult<TaskId> { pub async fn add_system_task(&self, system_task: SystemTask) -> RecorderResult<TaskId> {
let task_id = { let task_id = {
let mut storage = self.system_task_storage.write().await; let mut storage = self.system_task_storage.write().await;