fix: temp save

This commit is contained in:
2025-07-01 03:45:56 +08:00
parent bacfe99ef2
commit d06acde882
35 changed files with 493 additions and 1049 deletions

View File

@@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { SyncOneSubscriptionFeedsFullTask } from "./SyncOneSubscriptionFeedsFullTask";
import type { SyncOneSubscriptionFeedsIncrementalTask } from "./SyncOneSubscriptionFeedsIncrementalTask";
import type { SyncOneSubscriptionSourcesTask } from "./SyncOneSubscriptionSourcesTask";
export type SubscriberTask = { "taskType": "sync_one_subscription_feeds_incremental" } & SyncOneSubscriptionFeedsIncrementalTask | { "taskType": "sync_one_subscription_feeds_full" } & SyncOneSubscriptionFeedsFullTask | { "taskType": "sync_one_subscription_sources" } & SyncOneSubscriptionSourcesTask;

View File

@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SyncOneSubscriptionFeedsFullTask = { subscriptionId: number, subscriberId: number, cronId: number | null, };

View File

@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SyncOneSubscriptionFeedsIncrementalTask = { subscriptionId: number, subscriberId: number, cronId: number | null, };

View File

@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SyncOneSubscriptionSourcesTask = { subscriptionId: number, subscriberId: number, cronId: number | null, };

View File

@@ -0,0 +1,6 @@
{
"name": "recorder",
"version": "0.0.1",
"private": true,
"type": "module"
}

View File

@@ -101,14 +101,14 @@ pub fn register_credential3rd_to_schema_builder(
.schema
.register(Credential3rdCheckAvailableInfo::generate_output_object());
let builder_context = builder.context;
let builder_context = &builder.context;
{
let check_available_mutation_name = get_entity_custom_mutation_field_name::<
credential_3rd::Entity,
>(builder_context, "CheckAvailable");
>(&builder_context, "CheckAvailable");
let check_available_mutation =
generate_entity_filtered_mutation_field::<credential_3rd::Entity, _, _>(
builder_context,
builder_context.clone(),
check_available_mutation_name,
TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()),
Arc::new(|_resolver_ctx, app_ctx, filters| {

View File

@@ -4,17 +4,13 @@ use seaography::{Builder as SeaographyBuilder, BuilderContext};
use crate::{
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::register_entity_default_writable,
json::{
convert_jsonb_output_for_entity, restrict_jsonb_filter_input_for_entity,
try_convert_jsonb_input_for_entity,
},
name::get_entity_and_column_name,
domains::{
subscriber_tasks::restrict_subscriber_tasks_for_entity,
subscribers::restrict_subscriber_for_entity,
},
infra::{custom::register_entity_default_writable, name::get_entity_and_column_name},
},
models::{cron, subscriber_tasks},
models::cron,
};
fn skip_columns_for_entity_input(context: &mut BuilderContext) {
@@ -22,7 +18,6 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
if matches!(
column,
cron::Column::SubscriberTask
| cron::Column::Id
| cron::Column::CronExpr
| cron::Column::Enabled
| cron::Column::TimeoutMs
@@ -49,16 +44,9 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
pub fn register_cron_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<cron::Entity>(context, &cron::Column::SubscriberId);
restrict_jsonb_filter_input_for_entity::<cron::Entity>(context, &cron::Column::SubscriberTask);
convert_jsonb_output_for_entity::<cron::Entity>(
restrict_subscriber_tasks_for_entity::<cron::Entity>(
context,
&cron::Column::SubscriberTask,
Some(Case::Camel),
);
try_convert_jsonb_input_for_entity::<cron::Entity, Option<subscriber_tasks::SubscriberTask>>(
context,
&cron::Column::SubscriberTask,
subscriber_tasks::subscriber_task_schema(),
Some(Case::Snake),
);
skip_columns_for_entity_input(context);

View File

@@ -29,7 +29,7 @@ pub fn register_feeds_to_schema_context(context: &mut BuilderContext) {
context.types.input_none_conversions.insert(
get_entity_and_column_name::<feeds::Entity>(context, &feeds::Column::Token),
Box::new(
Arc::new(
move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> {
let field_name = context.field().name();
if field_name == entity_create_one_mutation_field_name.as_str()

View File

@@ -3,15 +3,15 @@ use std::{ops::Deref, sync::Arc};
use async_graphql::dynamic::{FieldValue, TypeRef};
use convert_case::Case;
use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter, QuerySelect, QueryTrait,
prelude::Expr, sea_query::Query,
ActiveModelBehavior, ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter,
QuerySelect, QueryTrait, prelude::Expr, sea_query::Query,
};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, EntityInputBuilder, EntityObjectBuilder,
SeaographyError, prepare_active_model,
Builder as SeaographyBuilder, BuilderContext, SeaographyError, prepare_active_model,
};
use crate::{
app::AppContextTrait,
auth::AuthUserInfo,
errors::RecorderError,
graphql::{
@@ -64,13 +64,17 @@ pub fn restrict_subscriber_tasks_for_entity<T>(
let entity_column_name = get_entity_and_column_name::<T>(context, column);
context.types.input_conversions.insert(
entity_column_name.clone(),
Box::new(move |resolve_context, accessor| {
let mut json_value = accessor.as_value().clone().into_json().map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
Arc::new(move |resolve_context, value_accessor| {
let mut json_value = value_accessor
.as_value()
.clone()
.into_json()
.map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
if let Some(case) = case {
json_value = convert_json_keys(json_value, case);
@@ -107,6 +111,11 @@ pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext)
context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_subscriber_tasks_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
Some(Case::Snake),
);
skip_columns_for_entity_input(context);
}
@@ -118,18 +127,18 @@ pub fn register_subscriber_tasks_to_schema_builder(
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
builder = register_entity_default_readonly!(builder, subscriber_tasks);
let builder_context = builder.context.clone();
let builder_context = builder.context;
{
builder
.outputs
.push(generate_entity_default_basic_entity_object::<
subscriber_tasks::Entity,
>(builder_context));
>(builder_context.clone()));
}
{
let delete_mutation = generate_entity_delete_mutation_field::<subscriber_tasks::Entity>(
builder_context,
builder_context.clone(),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
@@ -160,13 +169,13 @@ pub fn register_subscriber_tasks_to_schema_builder(
{
let entity_retry_one_mutation_name = get_entity_custom_mutation_field_name::<
subscriber_tasks::Entity,
>(builder_context, "RetryOne");
>(&builder_context, "RetryOne");
let retry_one_mutation =
generate_entity_filtered_mutation_field::<subscriber_tasks::Entity, _, _>(
builder_context,
builder_context.clone(),
entity_retry_one_mutation_name,
TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
builder_context,
&builder_context,
)),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
@@ -205,32 +214,23 @@ pub fn register_subscriber_tasks_to_schema_builder(
.inputs
.push(generate_entity_default_insert_input_object::<
subscriber_tasks::Entity,
>(builder_context));
>(&builder.context));
let create_one_mutation =
generate_entity_create_one_mutation_field::<subscriber_tasks::Entity, TypeRef>(
builder_context,
None,
Arc::new(|_resolver_ctx, app_ctx, input_object| {
let entity_input_builder = EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = EntityObjectBuilder {
context: builder_context,
};
generate_entity_create_one_mutation_field::<subscriber_tasks::Entity>(
builder.context.clone(),
Arc::new(move |resolver_ctx, app_ctx, input_object| {
let active_model: Result<subscriber_tasks::ActiveModel, _> =
prepare_active_model(
&entity_input_builder,
&entity_object_builder,
&input_object,
_resolver_ctx,
);
prepare_active_model(&builder_context.clone(), &input_object, resolver_ctx);
Box::pin(async move {
let task_service = app_ctx.task();
let active_model = active_model?;
let db = app_ctx.db();
let active_model = active_model.before_save(db, true).await?;
let task = active_model.job.unwrap();
let subscriber_id = active_model.subscriber_id.unwrap();

View File

@@ -79,7 +79,7 @@ where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
Box::new(move |context: &ResolverContext| -> GuardAction {
Arc::new(move |context: &ResolverContext| -> GuardAction {
match context.ctx.data::<AuthUserInfo>() {
Ok(_) => GuardAction::Allow,
Err(err) => GuardAction::Block(Some(err.message)),
@@ -106,7 +106,7 @@ where
let entity_update_mutation_data_field_name =
Arc::new(get_entity_update_mutation_data_field_name(context).to_string());
Box::new(move |context: &ResolverContext| -> GuardAction {
Arc::new(move |context: &ResolverContext| -> GuardAction {
match context.ctx.data::<AuthUserInfo>() {
Ok(user_info) => {
let subscriber_id = user_info.subscriber_auth.subscriber_id;
@@ -253,7 +253,7 @@ where
Arc::new(get_entity_create_one_mutation_field_name::<T>(context));
let entity_create_batch_mutation_field_name =
Arc::new(get_entity_create_batch_mutation_field_name::<T>(context));
Box::new(
Arc::new(
move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> {
let field_name = context.field().name();
if field_name == entity_create_one_mutation_field_name.as_str()
@@ -318,13 +318,11 @@ pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) {
pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder {
{
let filter_types_map_helper = FilterTypesMapHelper {
context: builder.context,
};
builder.schema = builder
.schema
.register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO));
.register(FilterTypesMapHelper::generate_filter_input(
&SUBSCRIBER_ID_FILTER_INFO,
));
}
builder = register_entity_default_readonly!(builder, subscribers);

View File

@@ -1,23 +1,11 @@
use std::sync::Arc;
use async_graphql::dynamic::{FieldValue, TypeRef};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use seaography::{Builder as SeaographyBuilder, BuilderContext};
use crate::{
errors::RecorderError,
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::{generate_entity_filtered_mutation_field, register_entity_default_writable},
name::{get_entity_basic_type_name, get_entity_custom_mutation_field_name},
},
},
models::{subscriber_tasks, subscriptions},
task::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
domains::subscribers::restrict_subscriber_for_entity, infra,
infra::custom::register_entity_default_writable,
},
models::subscriptions,
};
pub fn register_subscriptions_to_schema_context(context: &mut BuilderContext) {
@@ -32,162 +20,5 @@ pub fn register_subscriptions_to_schema_builder(
) -> SeaographyBuilder {
builder.register_enumeration::<subscriptions::SubscriptionCategory>();
builder = register_entity_default_writable!(builder, subscriptions, false);
let context = builder.context;
{
let sync_one_feeds_incremental_mutation_name = get_entity_custom_mutation_field_name::<
subscriptions::Entity,
>(context, "SyncOneFeedsIncremental");
let sync_one_feeds_incremental_mutation =
generate_entity_filtered_mutation_field::<subscriptions::Entity, _, _>(
builder.context,
sync_one_feeds_incremental_mutation_name,
TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Entity::find()
.filter(filters)
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
})?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
SyncOneSubscriptionFeedsIncrementalTask::builder()
.subscriber_id(subscription_model.subscriber_id)
.subscription_id(subscription_model.id)
.build()
.into(),
)
.await?;
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
}),
);
builder.mutations.push(sync_one_feeds_incremental_mutation);
}
{
let sync_one_feeds_full_mutation_name = get_entity_custom_mutation_field_name::<
subscriptions::Entity,
>(builder.context, "SyncOneFeedsFull");
let sync_one_feeds_full_mutation =
generate_entity_filtered_mutation_field::<subscriptions::Entity, _, _>(
builder.context,
sync_one_feeds_full_mutation_name,
TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Entity::find()
.filter(filters)
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
})?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
SyncOneSubscriptionFeedsFullTask::builder()
.subscriber_id(subscription_model.subscriber_id)
.subscription_id(subscription_model.id)
.build()
.into(),
)
.await?;
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
}),
);
builder.mutations.push(sync_one_feeds_full_mutation);
}
{
let sync_one_sources_mutation_name = get_entity_custom_mutation_field_name::<
subscriptions::Entity,
>(context, "SyncOneSources");
let sync_one_sources_mutation =
generate_entity_filtered_mutation_field::<subscriptions::Entity, _, _>(
builder.context,
sync_one_sources_mutation_name,
TypeRef::named_nn(get_entity_basic_type_name::<subscriber_tasks::Entity>(
context,
)),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Entity::find()
.filter(filters)
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriptions::Entity>()
})?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
SyncOneSubscriptionSourcesTask::builder()
.subscriber_id(subscription_model.subscriber_id)
.subscription_id(subscription_model.id)
.build()
.into(),
)
.await?;
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<subscriber_tasks::Entity>()
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
}),
);
builder.mutations.push(sync_one_sources_mutation);
}
builder
}

View File

@@ -16,7 +16,7 @@ pub fn register_crypto_column_input_conversion_to_schema_context<T>(
{
context.types.input_conversions.insert(
get_entity_and_column_name::<T>(context, column),
Box::new(
Arc::new(
move |_resolve_context: &ResolverContext<'_>,
value: &ValueAccessor|
-> SeaResult<sea_orm::Value> {
@@ -38,7 +38,7 @@ pub fn register_crypto_column_output_conversion_to_schema_context<T>(
{
context.types.output_conversions.insert(
get_entity_and_column_name::<T>(context, column),
Box::new(
Arc::new(
move |value: &sea_orm::Value| -> SeaResult<async_graphql::Value> {
if let SeaValue::String(s) = value {
if let Some(s) = s {

View File

@@ -4,27 +4,20 @@ use async_graphql::dynamic::{
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, ObjectAccessor,
ResolverContext, TypeRef,
};
use sea_orm::{
ActiveModelTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
};
use sea_orm::{ActiveModelTrait, Condition, EntityTrait, IntoActiveModel};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, GuardAction, RelationBuilder,
get_filter_conditions, prepare_active_model,
Builder as SeaographyBuilder, BuilderContext, EntityCreateBatchMutationBuilder,
EntityCreateOneMutationBuilder, EntityDeleteMutationBuilder, EntityInputBuilder,
EntityObjectBuilder, EntityUpdateMutationBuilder, GuardAction, RelationBuilder,
get_filter_conditions,
};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
graphql::infra::name::{
get_entity_and_column_name_from_column_str, get_entity_basic_type_name,
get_entity_create_batch_mutation_data_field_name,
get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_data_field_name, get_entity_create_one_mutation_field_name,
get_entity_delete_mutation_field_name, get_entity_delete_mutation_filter_field_name,
get_entity_filter_input_type_name, get_entity_insert_data_input_type_name, get_entity_name,
get_entity_renormalized_filter_field_name, get_entity_update_data_input_type_name,
get_entity_update_mutation_data_field_name, get_entity_update_mutation_field_name,
get_entity_update_mutation_filter_field_name,
get_entity_filter_input_type_name, get_entity_name,
get_entity_renormalized_filter_field_name,
},
};
@@ -80,50 +73,47 @@ pub type DeleteMutationFn = Arc<
+ Sync,
>;
pub fn generate_entity_default_insert_input_object<T>(
builder_context: &'static BuilderContext,
pub fn generate_entity_default_insert_input_object<T>(context: &BuilderContext) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
EntityInputBuilder::insert_input_object::<T>(context)
}
pub fn generate_entity_default_update_input_object<T>(context: &BuilderContext) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
EntityInputBuilder::update_input_object::<T>(context)
}
pub fn generate_entity_default_basic_entity_object<T>(context: Arc<BuilderContext>) -> Object
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
EntityObjectBuilder::basic_to_object::<T>(context)
}
pub fn generate_entity_input_object<T>(
context: &'static BuilderContext,
is_insert: bool,
) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
entity_input_builder.insert_input_object::<T>()
}
pub fn generate_entity_default_update_input_object<T>(
builder_context: &'static BuilderContext,
) -> InputObject
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
entity_input_builder.update_input_object::<T>()
}
pub fn generate_entity_default_basic_entity_object<T>(
builder_context: &'static BuilderContext,
) -> Object
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
entity_object_builder.basic_to_object::<T>()
if is_insert {
EntityInputBuilder::insert_input_object::<T>(context)
} else {
EntityInputBuilder::update_input_object::<T>(context)
}
}
pub fn generate_entity_filtered_mutation_field<E, N, R>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
field_name: N,
type_ref: R,
mutation_fn: FilterMutationFn,
@@ -134,19 +124,28 @@ where
N: Into<String>,
R: Into<TypeRef>,
{
let object_name: String = get_entity_name::<E>(builder_context);
let object_name: String = get_entity_name::<E>(&builder_context);
let guard = builder_context.guards.entity_guards.get(&object_name);
let guard = builder_context
.guards
.entity_guards
.get(&object_name)
.cloned();
let filter_input_value = InputValue::new(
get_entity_renormalized_filter_field_name(),
TypeRef::named(get_entity_filter_input_type_name::<E>(&builder_context)),
);
Field::new(field_name, type_ref, move |ctx| {
let mutation_fn = mutation_fn.clone();
let builder_context = builder_context.clone();
let guard_flag = if let Some(guard) = guard.as_ref() {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
@@ -157,7 +156,7 @@ where
let filters = ctx.args.get(get_entity_renormalized_filter_field_name());
let filters = get_filter_conditions::<E>(&ctx, builder_context, filters);
let filters = get_filter_conditions::<E>(&ctx, &builder_context, filters);
let result = mutation_fn(&ctx, app_ctx.clone(), filters)
.await
@@ -166,146 +165,30 @@ where
Ok(result)
})
})
.argument(InputValue::new(
get_entity_renormalized_filter_field_name(),
TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
))
.argument(filter_input_value)
}
pub fn generate_entity_create_one_mutation_field<E, ID>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<ID>,
pub fn generate_entity_create_one_mutation_field<E>(
builder_context: Arc<BuilderContext>,
mutation_fn: CreateOneMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
ID: Into<TypeRef>,
{
let guard = builder_context
.guards
.entity_guards
.get(&get_entity_name::<E>(builder_context));
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_create_one_mutation_field_name::<E>(builder_context),
TypeRef::named_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let value_accessor = ctx
.args
.get(get_entity_create_one_mutation_data_field_name(
builder_context,
))
.unwrap();
let input_object = value_accessor.object()?;
for (column, _) in input_object.iter() {
let field_guard = field_guards.get(
&get_entity_and_column_name_from_column_str::<E>(builder_context, column),
);
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
let result = mutation_fn(&ctx, app_ctx.clone(), input_object)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::owned_any(result)))
})
},
)
.argument(InputValue::new(
get_entity_create_one_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn(get_entity_insert_data_input_type_name::<E>(builder_context))
EntityCreateOneMutationBuilder::to_field_with_mutation_fn::<E>(
builder_context.clone(),
Arc::new(move |resolver_ctx, input_object| {
let result = resolver_ctx
.data::<Arc<dyn AppContextTrait>>()
.map(|app_ctx| mutation_fn(&resolver_ctx, app_ctx.clone(), input_object));
Box::pin(async move { result?.await.map_err(async_graphql::Error::new_with_source) })
}),
))
}
pub fn generate_entity_default_create_one_mutation_fn<T, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> CreateOneMutationFn<T::Model>
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync + IntoActiveModel<A>,
A: ActiveModelTrait<Entity = T> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |resolve_context, app_ctx, input_object| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_model = prepare_active_model::<T, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
);
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let active_model = active_model?;
let active_model = active_model.before_save(&transaction, true).await?;
let result: T::Model = active_model.insert(&transaction).await?;
let result = A::after_save(result, &transaction, true).await?;
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_model = active_model?;
let result: T::Model = active_model.insert(db).await?;
Ok(result)
}
})
})
)
}
pub fn generate_entity_default_create_one_mutation_field<E, A>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
active_model_hooks: bool,
) -> Field
where
@@ -314,174 +197,30 @@ where
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_create_one_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_create_one_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
EntityCreateOneMutationBuilder::to_field::<E, A>(builder_context, active_model_hooks)
}
pub fn generate_entity_create_batch_mutation_field<E, ID>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<ID>,
builder_context: Arc<BuilderContext>,
mutation_fn: CreateBatchMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
ID: Into<TypeRef>,
{
let object_name: String = get_entity_name::<E>(builder_context);
let guard = builder_context.guards.entity_guards.get(&object_name);
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_create_batch_mutation_field_name::<E>(builder_context),
TypeRef::named_nn_list_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
"Entity guard triggered.",
)),
};
}
let mut input_objects: Vec<ObjectAccessor<'_>> = vec![];
let list = ctx
.args
.get(get_entity_create_batch_mutation_data_field_name(
builder_context,
))
.unwrap()
.list()?;
for input in list.iter() {
let input_object = input.object()?;
for (column, _) in input_object.iter() {
let field_guard =
field_guards.get(&get_entity_and_column_name_from_column_str::<E>(
builder_context,
column,
));
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
input_objects.push(input_object);
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let results = mutation_fn(&ctx, app_ctx.clone(), input_objects)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::list(
results.into_iter().map(FieldValue::owned_any),
)))
})
},
)
.argument(InputValue::new(
get_entity_create_batch_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn_list_nn(get_entity_insert_data_input_type_name::<E>(builder_context))
EntityCreateBatchMutationBuilder::to_field_with_mutation_fn::<E>(
builder_context,
Arc::new(move |resolver_ctx, input_objects| {
let result = resolver_ctx
.data::<Arc<dyn AppContextTrait>>()
.map(|app_ctx| mutation_fn(&resolver_ctx, app_ctx.clone(), input_objects));
Box::pin(async move { result?.await.map_err(async_graphql::Error::new_with_source) })
}),
))
}
pub fn generate_entity_default_create_batch_mutation_fn<E, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> CreateBatchMutationFn<E::Model>
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |resolve_context, app_ctx, input_objects| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_models = input_objects
.into_iter()
.map(|input_object| {
prepare_active_model::<E, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
)
})
.collect::<Result<Vec<_>, _>>();
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let mut before_save_models = vec![];
for active_model in active_models? {
let before_save_model = active_model.before_save(&transaction, false).await?;
before_save_models.push(before_save_model);
}
let models: Vec<E::Model> = E::insert_many(before_save_models)
.exec_with_returning_many(&transaction)
.await?;
let mut result = vec![];
for model in models {
let after_save_model = A::after_save(model, &transaction, false).await?;
result.push(after_save_model);
}
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_models = active_models?;
let results: Vec<E::Model> = E::insert_many(active_models)
.exec_with_returning_many(db)
.await?;
Ok(results)
}
})
})
)
}
pub fn generate_entity_default_create_batch_mutation_field<E, A>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
active_model_hooks: bool,
) -> Field
where
@@ -490,178 +229,37 @@ where
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_create_batch_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_create_batch_mutation_fn::<E, A>(
builder_context,
active_model_hooks,
),
)
EntityCreateBatchMutationBuilder::to_field::<E, A>(builder_context, active_model_hooks)
}
pub fn generate_entity_update_mutation_field<E, I>(
builder_context: &'static BuilderContext,
input_data_type_ref: Option<I>,
pub fn generate_entity_update_mutation_field<E>(
builder_context: Arc<BuilderContext>,
mutation_fn: UpdateMutationFn<E::Model>,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
I: Into<TypeRef>,
{
let guard = builder_context
.guards
.entity_guards
.get(&get_entity_name::<E>(builder_context));
let field_guards = &builder_context.guards.field_guards;
Field::new(
get_entity_update_mutation_field_name::<E>(builder_context),
TypeRef::named_nn_list_nn(get_entity_basic_type_name::<E>(builder_context)),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
"Entity guard triggered.",
)),
};
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let filters = ctx.args.get(get_entity_update_mutation_filter_field_name(
builder_context,
));
let filter_condition = get_filter_conditions::<E>(&ctx, builder_context, filters);
let value_accessor = ctx
.args
.get(get_entity_update_mutation_data_field_name(builder_context))
.unwrap();
let input_object = value_accessor.object()?;
for (column, _) in input_object.iter() {
let field_guard = field_guards.get(
&get_entity_and_column_name_from_column_str::<E>(builder_context, column),
);
let field_guard_flag = if let Some(field_guard) = field_guard {
(*field_guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = field_guard_flag {
return match reason {
Some(reason) => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new(reason),
),
None => Err::<Option<_>, async_graphql::Error>(
async_graphql::Error::new("Field guard triggered."),
),
};
}
}
let result = mutation_fn(&ctx, app_ctx.clone(), filter_condition, input_object)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(FieldValue::list(
result.into_iter().map(FieldValue::owned_any),
)))
})
},
)
.argument(InputValue::new(
get_entity_update_mutation_data_field_name(builder_context),
input_data_type_ref.map(|t| t.into()).unwrap_or_else(|| {
TypeRef::named_nn(get_entity_update_data_input_type_name::<E>(builder_context))
EntityUpdateMutationBuilder::to_field_with_mutation_fn::<E>(
builder_context.clone(),
Arc::new(move |resolver_ctx, filters, input_object| {
let result = resolver_ctx
.data::<Arc<dyn AppContextTrait>>()
.map(|app_ctx| {
mutation_fn(
&resolver_ctx,
app_ctx.clone(),
get_filter_conditions::<E>(&resolver_ctx, &builder_context, filters),
input_object,
)
});
Box::pin(async move { result?.await.map_err(async_graphql::Error::new_with_source) })
}),
))
.argument(InputValue::new(
get_entity_update_mutation_filter_field_name(builder_context),
TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
))
}
pub fn generate_entity_default_update_mutation_fn<T, A>(
builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> UpdateMutationFn<T::Model>
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync + IntoActiveModel<A>,
A: ActiveModelTrait<Entity = T> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(
move |resolve_context, app_ctx, filter_condition, input_object| {
let entity_input_builder = seaography::EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = seaography::EntityObjectBuilder {
context: builder_context,
};
let active_model = prepare_active_model::<T, A>(
&entity_input_builder,
&entity_object_builder,
&input_object,
resolve_context,
);
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let active_model = active_model?;
let active_model = active_model.before_save(&transaction, false).await?;
let models = T::update_many()
.set(active_model)
.filter(filter_condition.clone())
.exec_with_returning(&transaction)
.await?;
let mut result = vec![];
for model in models {
result.push(A::after_save(model, &transaction, false).await?);
}
transaction.commit().await?;
Ok(result)
} else {
let db = app_ctx.db();
let active_model = active_model?;
let result = T::update_many()
.set(active_model)
.filter(filter_condition.clone())
.exec_with_returning(db)
.await?;
Ok(result)
}
})
},
)
}
pub fn generate_entity_default_update_mutation_field<E, A>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
active_model_hooks: bool,
) -> Field
where
@@ -670,114 +268,36 @@ where
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_update_mutation_field::<E, TypeRef>(
builder_context,
None,
generate_entity_default_update_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
EntityUpdateMutationBuilder::to_field::<E, A>(builder_context, active_model_hooks)
}
pub fn generate_entity_delete_mutation_field<E>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
mutation_fn: DeleteMutationFn,
) -> Field
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
{
let object_name: String = get_entity_name::<E>(builder_context);
let guard = builder_context.guards.entity_guards.get(&object_name);
Field::new(
get_entity_delete_mutation_field_name::<E>(builder_context),
TypeRef::named_nn(TypeRef::INT),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let filters = ctx.args.get(get_entity_delete_mutation_filter_field_name(
builder_context,
));
let filter_condition = get_filter_conditions::<E>(&ctx, builder_context, filters);
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let res = mutation_fn(&ctx, app_ctx.clone(), filter_condition)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(Some(async_graphql::Value::from(res)))
})
},
EntityDeleteMutationBuilder::to_field_with_mutation_fn::<E>(
builder_context.clone(),
Arc::new(move |resolver_ctx, filters| {
let result = resolver_ctx
.data::<Arc<dyn AppContextTrait>>()
.map(|app_ctx| {
mutation_fn(
&resolver_ctx,
app_ctx.clone(),
get_filter_conditions::<E>(&resolver_ctx, &builder_context, filters),
)
});
Box::pin(async move { result?.await.map_err(async_graphql::Error::new_with_source) })
}),
)
.argument(InputValue::new(
get_entity_delete_mutation_filter_field_name(builder_context),
TypeRef::named(get_entity_filter_input_type_name::<E>(builder_context)),
))
}
pub fn generate_entity_default_delete_mutation_fn<E, A>(
_builder_context: &'static BuilderContext,
active_model_hooks: bool,
) -> DeleteMutationFn
where
E: EntityTrait,
<E as EntityTrait>::Model: Sync,
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
Arc::new(move |_resolve_context, app_ctx, filter_condition| {
Box::pin(async move {
if active_model_hooks {
let transaction = app_ctx.db().begin().await?;
let models: Vec<E::Model> = E::find()
.filter(filter_condition.clone())
.all(&transaction)
.await?;
let mut active_models: Vec<A> = vec![];
for model in models {
let active_model = model.into_active_model();
active_models.push(active_model.before_delete(&transaction).await?);
}
let result = E::delete_many()
.filter(filter_condition)
.exec(&transaction)
.await?;
for active_model in active_models {
active_model.after_delete(&transaction).await?;
}
transaction.commit().await?;
Ok(result.rows_affected)
} else {
let db = app_ctx.db();
let result = E::delete_many().filter(filter_condition).exec(db).await?;
Ok(result.rows_affected)
}
})
})
}
pub fn generate_entity_default_delete_mutation_field<E, A>(
builder_context: &'static BuilderContext,
builder_context: Arc<BuilderContext>,
active_model_hooks: bool,
) -> Field
where
@@ -786,10 +306,7 @@ where
<E as EntityTrait>::Model: IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
generate_entity_delete_mutation_field::<E>(
builder_context,
generate_entity_default_delete_mutation_fn::<E, A>(builder_context, active_model_hooks),
)
EntityDeleteMutationBuilder::to_field::<E, A>(builder_context, active_model_hooks)
}
pub fn register_entity_default_mutations<E, A>(
@@ -801,28 +318,35 @@ where
<E as EntityTrait>::Model: Sync + IntoActiveModel<A>,
A: ActiveModelTrait<Entity = E> + sea_orm::ActiveModelBehavior + std::marker::Send + 'static,
{
let builder_context = &builder.context;
builder
.outputs
.push(generate_entity_default_basic_entity_object::<E>(
builder.context,
builder_context.clone(),
));
builder.inputs.extend([
generate_entity_default_insert_input_object::<E>(builder.context),
generate_entity_default_update_input_object::<E>(builder.context),
generate_entity_default_insert_input_object::<E>(&builder.context),
generate_entity_default_update_input_object::<E>(&builder.context),
]);
builder.mutations.extend([
generate_entity_default_create_one_mutation_field::<E, A>(
builder.context,
builder_context.clone(),
active_model_hooks,
),
generate_entity_default_create_batch_mutation_field::<E, A>(
builder.context,
builder_context.clone(),
active_model_hooks,
),
generate_entity_default_update_mutation_field::<E, A>(
builder_context.clone(),
active_model_hooks,
),
generate_entity_default_delete_mutation_field::<E, A>(
builder_context.clone(),
active_model_hooks,
),
generate_entity_default_update_mutation_field::<E, A>(builder.context, active_model_hooks),
generate_entity_default_delete_mutation_field::<E, A>(builder.context, active_model_hooks),
]);
builder
@@ -840,7 +364,7 @@ where
{
builder.register_entity::<T>(
<RE as sea_orm::Iterable>::iter()
.map(|rel| RelationBuilder::get_relation(&rel, builder.context))
.map(|rel| RelationBuilder::get_relation(&rel, builder.context.clone()))
.collect(),
);
builder = builder.register_entity_dataloader_one_to_one(entity, tokio::spawn);

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use async_graphql::{
Error as GraphqlError,
dynamic::{ResolverContext, Scalar, SchemaError},
@@ -963,7 +965,7 @@ pub fn try_convert_jsonb_input_for_entity<T, S>(
let entity_column_name = get_entity_and_column_name::<T>(context, column);
context.types.input_conversions.insert(
entity_column_name.clone(),
Box::new(move |_resolve_context, accessor| {
Arc::new(move |_resolve_context, accessor| {
let mut json_value = accessor.as_value().clone().into_json().map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
@@ -998,7 +1000,7 @@ pub fn convert_jsonb_output_for_entity<T>(
let entity_column_key = get_entity_and_column_name::<T>(context, column);
context.types.output_conversions.insert(
entity_column_key.clone(),
Box::new(move |value| {
Arc::new(move |value| {
if let sea_orm::Value::Json(Some(json)) = value {
let mut json_value = json.as_ref().clone();
if let Some(case) = case {

View File

@@ -78,7 +78,7 @@ where
context.filter_input.type_name.as_ref()(&entity_name)
}
pub fn get_entity_insert_data_input_type_name<T>(context: &BuilderContext) -> String
pub fn get_entity_insert_input_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
@@ -87,7 +87,7 @@ where
format!("{entity_name}{}", context.entity_input.insert_suffix)
}
pub fn get_entity_update_data_input_type_name<T>(context: &BuilderContext) -> String
pub fn get_entity_update_input_type_name<T>(context: &BuilderContext) -> String
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,

View File

@@ -59,27 +59,24 @@ pub fn build_schema(
) -> Result<Schema, SchemaError> {
let database = app_ctx.db().as_ref().clone();
let context = CONTEXT.get_or_init(|| {
let context = Arc::new({
let mut context = BuilderContext::default();
// basic
renormalize_filter_field_names_to_schema_context(&mut context);
renormalize_data_field_names_to_schema_context(&mut context);
{
// domains
register_feeds_to_schema_context(&mut context);
register_subscribers_to_schema_context(&mut context);
register_subscriptions_to_schema_context(&mut context);
register_subscriber_tasks_to_schema_context(&mut context);
register_credential3rd_to_schema_context(&mut context, app_ctx.clone());
register_downloaders_to_schema_context(&mut context);
register_downloads_to_schema_context(&mut context);
register_episodes_to_schema_context(&mut context);
register_subscription_bangumi_to_schema_context(&mut context);
register_subscription_episode_to_schema_context(&mut context);
register_bangumi_to_schema_context(&mut context);
register_cron_to_schema_context(&mut context);
}
// domains
register_feeds_to_schema_context(&mut context);
register_subscribers_to_schema_context(&mut context);
register_subscriptions_to_schema_context(&mut context);
register_subscriber_tasks_to_schema_context(&mut context);
register_credential3rd_to_schema_context(&mut context, app_ctx.clone());
register_downloaders_to_schema_context(&mut context);
register_downloads_to_schema_context(&mut context);
register_episodes_to_schema_context(&mut context);
register_subscription_bangumi_to_schema_context(&mut context);
register_subscription_episode_to_schema_context(&mut context);
register_bangumi_to_schema_context(&mut context);
register_cron_to_schema_context(&mut context);
context
});

View File

@@ -128,19 +128,13 @@ impl ActiveModelBehavior for ActiveModel {
Model::calculate_next_run(cron_expr).map_err(|e| DbErr::Custom(e.to_string()))?;
self.next_run = Set(Some(next_run));
}
if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id {
if let ActiveValue::Set(Some(ref subscriber_task)) = self.subscriber_task {
if subscriber_task.get_subscriber_id() != subscriber_id {
return Err(DbErr::Custom(
"Subscriber task subscriber_id does not match cron subscriber_id"
.to_string(),
));
}
} else {
return Err(DbErr::Custom(
"Cron subscriber_id is set but subscriber_task is not set".to_string(),
));
}
if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id
&& let ActiveValue::Set(Some(ref subscriber_task)) = self.subscriber_task
&& subscriber_task.get_subscriber_id() != subscriber_id
{
return Err(DbErr::Custom(
"Cron subscriber_id does not match subscriber_task.subscriber_id".to_string(),
));
}
Ok(self)

View File

@@ -1,6 +1,7 @@
use async_trait::async_trait;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue, entity::prelude::*};
use crate::task::SubscriberTaskTrait;
pub use crate::task::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter, subscriber_task_schema,
@@ -84,4 +85,19 @@ pub enum RelatedEntity {
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModelBehavior for ActiveModel {
async fn before_save<C>(mut self, _db: &C, _insert: bool) -> Result<Self, DbErr>
where
C: ConnectionTrait,
{
if let ActiveValue::Set(subscriber_id) = self.subscriber_id
&& let ActiveValue::Set(ref job) = self.job
&& job.get_subscriber_id() != subscriber_id
{
return Err(DbErr::Custom(
"SubscriberTask subscriber_id does not match job.subscriber_id".to_string(),
));
}
Ok(self)
}
}

View File

@@ -6,7 +6,8 @@ macro_rules! register_subscriber_task_type {
}
) => {
$(#[$type_meta])*
#[derive(typed_builder::TypedBuilder, schemars::JsonSchema)]
#[derive(typed_builder::TypedBuilder, schemars::JsonSchema, ts_rs::TS)]
#[ts(export, rename_all = "camelCase")]
$task_vis struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)*
pub subscriber_id: i32,

View File

@@ -10,6 +10,7 @@ pub use subscription::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
};
use ts_rs::TS;
macro_rules! register_subscriber_task_types {
(
@@ -46,6 +47,7 @@ macro_rules! register_subscriber_task_types {
$(#[$task_enum_meta])*
#[serde(tag = "task_type")]
#[ts(export, rename_all = "camelCase", tag = "taskType")]
pub enum $task_enum_name {
$(
$(#[$task_variant_meta])*
@@ -136,7 +138,7 @@ register_subscriber_task_types!(
}
},
task_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult, JsonSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult, JsonSchema, TS)]
pub enum SubscriberTask {
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),

View File

@@ -0,0 +1,10 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"rootDir": ".",
"composite": true,
"module": "ESNext",
"moduleResolution": "bundler"
},
"include": ["bindings"]
}