fix: fix issues

This commit is contained in:
2025-06-30 02:05:23 +08:00
parent b4090e74c0
commit bacfe99ef2
19 changed files with 403 additions and 224 deletions

View File

@@ -166,6 +166,9 @@ quick-xml = { version = "0.37.5", features = [
] }
croner = "2.2.0"
ts-rs = "11.0.1"
secrecy = { version = "0.10.3", features = ["serde"] }
schemars = "1.0.3"
jsonschema = "0.30.0"
[dev-dependencies]
inquire = { workspace = true }

View File

@@ -313,4 +313,10 @@ impl From<http::method::InvalidMethod> for RecorderError {
}
}
impl From<async_graphql::Error> for RecorderError {
fn from(error: async_graphql::Error) -> Self {
seaography::SeaographyError::AsyncGraphQLError(error).into()
}
}
pub type RecorderResult<T> = Result<T, RecorderError>;

View File

@@ -8,8 +8,8 @@ use crate::{
infra::{
custom::register_entity_default_writable,
json::{
convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity,
validate_jsonb_input_for_entity,
convert_jsonb_output_for_entity, restrict_jsonb_filter_input_for_entity,
try_convert_jsonb_input_for_entity,
},
name::get_entity_and_column_name,
},
@@ -50,14 +50,16 @@ pub fn register_cron_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<cron::Entity>(context, &cron::Column::SubscriberId);
restrict_jsonb_filter_input_for_entity::<cron::Entity>(context, &cron::Column::SubscriberTask);
convert_jsonb_output_case_for_entity::<cron::Entity>(
convert_jsonb_output_for_entity::<cron::Entity>(
context,
&cron::Column::SubscriberTask,
Case::Camel,
Some(Case::Camel),
);
validate_jsonb_input_for_entity::<cron::Entity, Option<subscriber_tasks::SubscriberTask>>(
try_convert_jsonb_input_for_entity::<cron::Entity, Option<subscriber_tasks::SubscriberTask>>(
context,
&cron::Column::SubscriberTask,
subscriber_tasks::subscriber_task_schema(),
Some(Case::Snake),
);
skip_columns_for_entity_input(context);
}

View File

@@ -1,12 +1,15 @@
use std::{ops::Deref, sync::Arc};
use async_graphql::dynamic::{FieldValue, TypeRef, ValueAccessor};
use async_graphql::dynamic::{FieldValue, TypeRef};
use convert_case::Case;
use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter, QuerySelect, QueryTrait,
prelude::Expr, sea_query::Query,
};
use seaography::{Builder as SeaographyBuilder, BuilderContext, GuardAction};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, EntityInputBuilder, EntityObjectBuilder,
SeaographyError, prepare_active_model,
};
use crate::{
auth::AuthUserInfo,
@@ -20,59 +23,23 @@ use crate::{
generate_entity_default_insert_input_object, generate_entity_delete_mutation_field,
generate_entity_filtered_mutation_field, register_entity_default_readonly,
},
json::{
convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity,
validate_jsonb_input_for_entity,
},
json::{convert_jsonb_output_for_entity, restrict_jsonb_filter_input_for_entity},
name::{
get_column_name, get_entity_and_column_name, get_entity_basic_type_name,
get_entity_create_batch_mutation_data_field_name,
get_entity_create_batch_mutation_field_name,
get_entity_create_one_mutation_data_field_name,
get_entity_create_one_mutation_field_name, get_entity_custom_mutation_field_name,
get_entity_update_mutation_field_name,
get_entity_and_column_name, get_entity_basic_type_name,
get_entity_custom_mutation_field_name,
},
},
},
models::subscriber_tasks,
task::{ApalisJobs, ApalisSchema},
task::{ApalisJobs, ApalisSchema, SubscriberTaskTrait},
utils::json::convert_json_keys,
};
pub fn check_entity_and_task_subscriber_id_matches(
value_accessor: &ValueAccessor<'_>,
subscriber_id: i32,
subscriber_id_column_name: &str,
subscriber_task_column_name: &str,
) -> bool {
value_accessor.object().is_ok_and(|input_object| {
input_object
.get(subscriber_task_column_name)
.and_then(|subscriber_task_value| subscriber_task_value.object().ok())
.and_then(|subscriber_task_object| {
subscriber_task_object
.get("subscriber_id")
.and_then(|job_subscriber_id| job_subscriber_id.i64().ok())
})
.is_some_and(|subscriber_task_subscriber_id| {
subscriber_task_subscriber_id as i32
== input_object
.get(subscriber_id_column_name)
.and_then(|subscriber_id_object| subscriber_id_object.i64().ok())
.map(|subscriber_id| subscriber_id as i32)
.unwrap_or(subscriber_id)
})
})
}
fn skip_columns_for_entity_input(context: &mut BuilderContext) {
for column in subscriber_tasks::Column::iter() {
if matches!(
column,
subscriber_tasks::Column::Job
| subscriber_tasks::Column::Id
| subscriber_tasks::Column::SubscriberId
| subscriber_tasks::Column::Priority
| subscriber_tasks::Column::MaxAttempts
subscriber_tasks::Column::Job | subscriber_tasks::Column::SubscriberId
) {
continue;
}
@@ -82,104 +49,66 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
}
}
pub fn restrict_subscriber_tasks_for_entity<T>(
context: &mut BuilderContext,
column: &T::Column,
case: Option<Case<'static>>,
) where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_and_column = get_entity_and_column_name::<T>(context, column);
restrict_jsonb_filter_input_for_entity::<T>(context, column);
convert_jsonb_output_for_entity::<T>(context, column, Some(Case::Camel));
let entity_column_name = get_entity_and_column_name::<T>(context, column);
context.types.input_conversions.insert(
entity_column_name.clone(),
Box::new(move |resolve_context, accessor| {
let mut json_value = accessor.as_value().clone().into_json().map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
if let Some(case) = case {
json_value = convert_json_keys(json_value, case);
}
let subscriber_id = resolve_context
.data::<AuthUserInfo>()?
.subscriber_auth
.subscriber_id;
if let Some(obj) = json_value.as_object_mut() {
obj.entry("subscriber_id")
.or_insert_with(|| serde_json::Value::from(subscriber_id));
}
subscriber_tasks::subscriber_task_schema()
.validate(&json_value)
.map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
Ok(sea_orm::Value::Json(Some(Box::new(json_value))))
}),
);
context.entity_input.update_skips.push(entity_and_column);
}
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
);
convert_jsonb_output_case_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
Case::Camel,
);
validate_jsonb_input_for_entity::<subscriber_tasks::Entity, subscriber_tasks::SubscriberTask>(
context,
&subscriber_tasks::Column::Job,
);
skip_columns_for_entity_input(context);
context.guards.field_guards.insert(
get_entity_and_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
),
{
let create_one_mutation_field_name =
Arc::new(get_entity_create_one_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let create_one_mutation_data_field_name =
Arc::new(get_entity_create_one_mutation_data_field_name(context).to_string());
let create_batch_mutation_field_name =
Arc::new(get_entity_create_batch_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let create_batch_mutation_data_field_name =
Arc::new(get_entity_create_batch_mutation_data_field_name(context).to_string());
let update_mutation_field_name = Arc::new(get_entity_update_mutation_field_name::<
subscriber_tasks::Entity,
>(context));
let job_column_name = Arc::new(get_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
));
let subscriber_id_column_name = Arc::new(get_column_name::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::SubscriberId,
));
Box::new(move |resolve_context| {
let field_name = resolve_context.field().name();
let subscriber_id = resolve_context
.data_opt::<AuthUserInfo>()
.unwrap()
.subscriber_auth
.subscriber_id;
let matched_subscriber_id = match field_name {
field if field == create_one_mutation_field_name.as_str() => resolve_context
.args
.get(create_one_mutation_data_field_name.as_str())
.is_some_and(|value_accessor| {
check_entity_and_task_subscriber_id_matches(
&value_accessor,
subscriber_id,
subscriber_id_column_name.as_str(),
job_column_name.as_str(),
)
}),
field if field == create_batch_mutation_field_name.as_str() => resolve_context
.args
.get(create_batch_mutation_data_field_name.as_str())
.and_then(|value| value.list().ok())
.is_some_and(|list| {
list.iter().all(|value| {
check_entity_and_task_subscriber_id_matches(
&value,
subscriber_id,
subscriber_id_column_name.as_str(),
job_column_name.as_str(),
)
})
}),
field if field == update_mutation_field_name.as_str() => {
unreachable!("subscriberTask entity do not support update job")
}
_ => true,
};
if matched_subscriber_id {
GuardAction::Allow
} else {
GuardAction::Block(Some(
"subscriber_id mismatch between entity and job".to_string(),
))
}
})
},
);
}
pub fn register_subscriber_tasks_to_schema_builder(
@@ -282,19 +211,35 @@ pub fn register_subscriber_tasks_to_schema_builder(
builder_context,
None,
Arc::new(|_resolver_ctx, app_ctx, input_object| {
let job_column_name = get_column_name::<subscriber_tasks::Entity>(
builder_context,
&subscriber_tasks::Column::Job,
);
let task = input_object
.get(job_column_name.as_str())
.unwrap()
.deserialize::<subscriber_tasks::SubscriberTask>()
.unwrap();
let entity_input_builder = EntityInputBuilder {
context: builder_context,
};
let entity_object_builder = EntityObjectBuilder {
context: builder_context,
};
let active_model: Result<subscriber_tasks::ActiveModel, _> =
prepare_active_model(
&entity_input_builder,
&entity_object_builder,
&input_object,
_resolver_ctx,
);
Box::pin(async move {
let task_service = app_ctx.task();
let active_model = active_model?;
let task = active_model.job.unwrap();
let subscriber_id = active_model.subscriber_id.unwrap();
if task.get_subscriber_id() != subscriber_id {
Err(async_graphql::Error::new(
"subscriber_id does not match with job.subscriber_id",
))?;
}
let task_id = task_service.add_subscriber_task(task).await?.to_string();
let db = app_ctx.db();

View File

@@ -7,7 +7,7 @@ use sea_orm::{ColumnTrait, Condition, EntityTrait, Iterable, Value as SeaValue};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, FilterInfo,
FilterOperation as SeaographqlFilterOperation, FilterType, FilterTypesMapHelper,
FnFilterCondition, FnGuard, FnInputTypeNoneConversion, GuardAction, SeaResult, SeaographyError,
FnFilterCondition, FnGuard, FnInputTypeNoneConversion, GuardAction, SeaResult,
};
use crate::{
@@ -219,11 +219,10 @@ where
if let Some(value) = filter.get("eq") {
let value: i32 = value.i64()?.try_into()?;
if value != subscriber_id {
return Err(SeaographyError::AsyncGraphQLError(
async_graphql::Error::new(
"subscriber_id and auth_info does not match",
),
));
return Err(async_graphql::Error::new(
"subscriber_id and auth_info does not match",
)
.into());
}
}
}

View File

@@ -8,7 +8,7 @@ use sea_orm::{
ActiveModelTrait, Condition, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, GuardAction, RelationBuilder, SeaographyError,
Builder as SeaographyBuilder, BuilderContext, GuardAction, RelationBuilder,
get_filter_conditions, prepare_active_model,
};
@@ -274,8 +274,7 @@ where
&entity_object_builder,
&input_object,
resolve_context,
)
.map_err(SeaographyError::AsyncGraphQLError);
);
Box::pin(async move {
if active_model_hooks {
@@ -442,8 +441,7 @@ where
resolve_context,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(SeaographyError::AsyncGraphQLError);
.collect::<Result<Vec<_>, _>>();
Box::pin(async move {
if active_model_hooks {
@@ -620,8 +618,7 @@ where
&entity_object_builder,
&input_object,
resolve_context,
)
.map_err(SeaographyError::AsyncGraphQLError);
);
Box::pin(async move {
if active_model_hooks {

View File

@@ -5,6 +5,7 @@ use async_graphql::{
};
use convert_case::Case;
use itertools::Itertools;
use jsonschema::Validator;
use rust_decimal::{Decimal, prelude::FromPrimitive};
use sea_orm::{
Condition, EntityTrait,
@@ -911,18 +912,15 @@ where
Box::new(
move |_resolve_context: &ResolverContext<'_>, condition, filter| {
if let Some(filter) = filter {
let filter_value = to_value(filter.as_index_map()).map_err(|e| {
SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e))
})?;
let filter_value =
to_value(filter.as_index_map()).map_err(GraphqlError::new_with_source)?;
let filter_json: JsonValue = filter_value.into_json().map_err(|e| {
SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}")))
})?;
let filter_json: JsonValue = filter_value
.into_json()
.map_err(GraphqlError::new_with_source)?;
let cond_where = prepare_jsonb_filter_input(&Expr::col(column), filter_json)
.map_err(|e| {
SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e))
})?;
.map_err(GraphqlError::new_with_source)?;
let condition = condition.add(cond_where);
Ok(condition)
@@ -952,8 +950,12 @@ where
);
}
pub fn validate_jsonb_input_for_entity<T, S>(context: &mut BuilderContext, column: &T::Column)
where
pub fn try_convert_jsonb_input_for_entity<T, S>(
context: &mut BuilderContext,
column: &T::Column,
validator: &'static Validator,
case: Option<Case<'static>>,
) where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
S: DeserializeOwned + Serialize,
@@ -962,27 +964,33 @@ where
context.types.input_conversions.insert(
entity_column_name.clone(),
Box::new(move |_resolve_context, accessor| {
let deserialized = accessor.deserialize::<S>().map_err(|err| {
SeaographyError::TypeConversionError(
err.message,
format!("Json - {entity_column_name}"),
)
})?;
let json_value = serde_json::to_value(deserialized).map_err(|err| {
let mut json_value = accessor.as_value().clone().into_json().map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
if let Some(case) = case {
json_value = convert_json_keys(json_value, case);
}
validator.validate(&json_value).map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
Ok(sea_orm::Value::Json(Some(Box::new(json_value))))
}),
);
}
pub fn convert_jsonb_output_case_for_entity<T>(
pub fn convert_jsonb_output_for_entity<T>(
context: &mut BuilderContext,
column: &T::Column,
case: Case<'static>,
case: Option<Case<'static>>,
) where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
@@ -992,14 +1000,16 @@ pub fn convert_jsonb_output_case_for_entity<T>(
entity_column_key.clone(),
Box::new(move |value| {
if let sea_orm::Value::Json(Some(json)) = value {
let result =
async_graphql::Value::from_json(convert_json_keys(json.as_ref().clone(), case))
.map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_key}"),
)
})?;
let mut json_value = json.as_ref().clone();
if let Some(case) = case {
json_value = convert_json_keys(json_value, case);
}
let result = async_graphql::Value::from_json(json_value).map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_key}"),
)
})?;
Ok(result)
} else {
Err(SeaographyError::TypeConversionError(

View File

@@ -3,7 +3,7 @@ use sea_orm::entity::prelude::*;
pub use crate::task::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter,
SubscriberTaskTypeVariantIter, subscriber_task_schema,
};
#[derive(Clone, Debug, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay)]

View File

@@ -16,5 +16,6 @@ pub use registry::{
SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,
SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, SystemTask,
SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, SystemTaskTypeVariantIter,
subscriber_task_schema,
};
pub use service::TaskService;

View File

@@ -5,6 +5,7 @@ pub use subscriber::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,
SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask,
subscriber_task_schema,
};
pub use system::{
OptimizeImageTask, SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant,

View File

@@ -6,7 +6,7 @@ macro_rules! register_subscriber_task_type {
}
) => {
$(#[$type_meta])*
#[derive(typed_builder::TypedBuilder)]
#[derive(typed_builder::TypedBuilder, schemars::JsonSchema)]
$task_vis struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)*
pub subscriber_id: i32,

View File

@@ -1,6 +1,9 @@
mod base;
mod subscription;
use jsonschema::Validator;
use once_cell::sync::OnceCell;
use schemars::JsonSchema;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
pub use subscription::{
@@ -133,7 +136,7 @@ register_subscriber_task_types!(
}
},
task_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult, JsonSchema)]
pub enum SubscriberTask {
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
@@ -141,3 +144,15 @@ register_subscriber_task_types!(
}
}
);
static SUBSCRIBER_TASK_SCHEMA: OnceCell<Validator> = OnceCell::new();
pub fn subscriber_task_schema() -> &'static Validator {
SUBSCRIBER_TASK_SCHEMA.get_or_init(|| {
let schema = schemars::schema_for!(SubscriberTask);
jsonschema::options()
.with_draft(jsonschema::Draft::Draft7)
.build(&serde_json::to_value(&schema).unwrap())
.unwrap()
})
}