refactor: refactor graphql
This commit is contained in:
115
apps/recorder/src/graphql/domains/credential_3rd.rs
Normal file
115
apps/recorder/src/graphql/domains/credential_3rd.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::{
|
||||
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef,
|
||||
};
|
||||
use seaography::Builder as SeaographyBuilder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use util_derive::DynamicGraphql;
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait, auth::AuthUserInfo, errors::RecorderError, models::credential_3rd,
|
||||
};
|
||||
|
||||
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||
struct Credential3rdCheckAvailableInput {
|
||||
pub id: i32,
|
||||
}
|
||||
|
||||
impl Credential3rdCheckAvailableInput {
|
||||
fn input_type_name() -> &'static str {
|
||||
"Credential3rdCheckAvailableInput"
|
||||
}
|
||||
|
||||
fn arg_name() -> &'static str {
|
||||
"filter"
|
||||
}
|
||||
|
||||
fn generate_input_object() -> InputObject {
|
||||
InputObject::new(Self::input_type_name())
|
||||
.description("The input of the credential3rdCheckAvailable query")
|
||||
.field(InputValue::new(
|
||||
Credential3rdCheckAvailableInputFieldEnum::Id.as_str(),
|
||||
TypeRef::named_nn(TypeRef::INT),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Credential3rdCheckAvailableInfo {
|
||||
pub available: bool,
|
||||
}
|
||||
|
||||
impl Credential3rdCheckAvailableInfo {
|
||||
fn object_type_name() -> &'static str {
|
||||
"Credential3rdCheckAvailableInfo"
|
||||
}
|
||||
|
||||
fn generate_output_object() -> Object {
|
||||
Object::new(Self::object_type_name())
|
||||
.description("The output of the credential3rdCheckAvailable query")
|
||||
.field(Field::new(
|
||||
Credential3rdCheckAvailableInfoFieldEnum::Available,
|
||||
TypeRef::named_nn(TypeRef::BOOLEAN),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let subscription_info = ctx.parent_value.try_downcast_ref::<Self>()?;
|
||||
Ok(Some(async_graphql::Value::from(
|
||||
subscription_info.available,
|
||||
)))
|
||||
})
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_credential3rd_to_schema_builder(
|
||||
mut builder: SeaographyBuilder,
|
||||
) -> SeaographyBuilder {
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(Credential3rdCheckAvailableInput::generate_input_object());
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(Credential3rdCheckAvailableInfo::generate_output_object());
|
||||
|
||||
builder.queries.push(
|
||||
Field::new(
|
||||
"credential3rdCheckAvailable",
|
||||
TypeRef::named_nn(Credential3rdCheckAvailableInfo::object_type_name()),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||
let input: Credential3rdCheckAvailableInput = ctx
|
||||
.args
|
||||
.get(Credential3rdCheckAvailableInput::arg_name())
|
||||
.unwrap()
|
||||
.deserialize()?;
|
||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||
|
||||
let credential_model = credential_3rd::Model::find_by_id_and_subscriber_id(
|
||||
app_ctx.as_ref(),
|
||||
input.id,
|
||||
auth_user_info.subscriber_auth.subscriber_id,
|
||||
)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::Credential3rdError {
|
||||
message: format!("credential = {} not found", input.id),
|
||||
source: None.into(),
|
||||
})?;
|
||||
|
||||
let available = credential_model.check_available(app_ctx.as_ref()).await?;
|
||||
Ok(Some(FieldValue::owned_any(
|
||||
Credential3rdCheckAvailableInfo { available },
|
||||
)))
|
||||
})
|
||||
},
|
||||
)
|
||||
.argument(InputValue::new(
|
||||
Credential3rdCheckAvailableInput::arg_name(),
|
||||
TypeRef::named_nn(Credential3rdCheckAvailableInput::input_type_name()),
|
||||
)),
|
||||
);
|
||||
|
||||
builder
|
||||
}
|
||||
102
apps/recorder/src/graphql/domains/crypto.rs
Normal file
102
apps/recorder/src/graphql/domains/crypto.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::ValueAccessor;
|
||||
use sea_orm::{EntityTrait, Value as SeaValue};
|
||||
use seaography::{BuilderContext, SeaResult};
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
graphql::infra::util::{get_column_key, get_entity_key},
|
||||
models::credential_3rd,
|
||||
};
|
||||
|
||||
fn register_crypto_column_input_conversion_to_schema_context<T>(
|
||||
context: &mut BuilderContext,
|
||||
ctx: Arc<dyn AppContextTrait>,
|
||||
column: &T::Column,
|
||||
) where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let column_name = get_column_key::<T>(context, column);
|
||||
let entity_name = context.entity_object.type_name.as_ref()(&entity_key);
|
||||
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);
|
||||
|
||||
context.types.input_conversions.insert(
|
||||
format!("{entity_name}.{column_name}"),
|
||||
Box::new(move |value: &ValueAccessor| -> SeaResult<sea_orm::Value> {
|
||||
let source = value.string()?;
|
||||
let encrypted = ctx.crypto().encrypt_string(source.into())?;
|
||||
Ok(encrypted.into())
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
fn register_crypto_column_output_conversion_to_schema_context<T>(
|
||||
context: &mut BuilderContext,
|
||||
ctx: Arc<dyn AppContextTrait>,
|
||||
column: &T::Column,
|
||||
) where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let column_name = get_column_key::<T>(context, column);
|
||||
let entity_name = context.entity_object.type_name.as_ref()(&entity_key);
|
||||
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);
|
||||
|
||||
context.types.output_conversions.insert(
|
||||
format!("{entity_name}.{column_name}"),
|
||||
Box::new(
|
||||
move |value: &sea_orm::Value| -> SeaResult<async_graphql::Value> {
|
||||
if let SeaValue::String(s) = value {
|
||||
if let Some(s) = s {
|
||||
let decrypted = ctx.crypto().decrypt_string(s)?;
|
||||
Ok(async_graphql::Value::String(decrypted))
|
||||
} else {
|
||||
Ok(async_graphql::Value::Null)
|
||||
}
|
||||
} else {
|
||||
Err(async_graphql::Error::new("crypto column must be string column").into())
|
||||
}
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn register_crypto_to_schema_context(
|
||||
context: &mut BuilderContext,
|
||||
ctx: Arc<dyn AppContextTrait>,
|
||||
) {
|
||||
register_crypto_column_input_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx.clone(),
|
||||
&credential_3rd::Column::Cookies,
|
||||
);
|
||||
register_crypto_column_input_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx.clone(),
|
||||
&credential_3rd::Column::Username,
|
||||
);
|
||||
register_crypto_column_input_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx.clone(),
|
||||
&credential_3rd::Column::Password,
|
||||
);
|
||||
register_crypto_column_output_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx.clone(),
|
||||
&credential_3rd::Column::Cookies,
|
||||
);
|
||||
register_crypto_column_output_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx.clone(),
|
||||
&credential_3rd::Column::Username,
|
||||
);
|
||||
register_crypto_column_output_conversion_to_schema_context::<credential_3rd::Entity>(
|
||||
context,
|
||||
ctx,
|
||||
&credential_3rd::Column::Password,
|
||||
);
|
||||
}
|
||||
5
apps/recorder/src/graphql/domains/mod.rs
Normal file
5
apps/recorder/src/graphql/domains/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod credential_3rd;
|
||||
pub mod crypto;
|
||||
pub mod subscriber_tasks;
|
||||
pub mod subscribers;
|
||||
pub mod subscriptions;
|
||||
42
apps/recorder/src/graphql/domains/subscriber_tasks.rs
Normal file
42
apps/recorder/src/graphql/domains/subscriber_tasks.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use async_graphql::dynamic::Scalar;
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext, ConvertedType};
|
||||
|
||||
use crate::{
|
||||
graphql::infra::{
|
||||
json::restrict_jsonb_filter_input_for_entity,
|
||||
util::{get_column_key, get_entity_key},
|
||||
},
|
||||
models::subscriber_tasks::{self, SubscriberTask},
|
||||
};
|
||||
|
||||
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
|
||||
let entity_key = get_entity_key::<subscriber_tasks::Entity>(context);
|
||||
let column_name =
|
||||
get_column_key::<subscriber_tasks::Entity>(context, &subscriber_tasks::Column::Job);
|
||||
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);
|
||||
context.types.overwrites.insert(
|
||||
column_name,
|
||||
ConvertedType::Custom(String::from("SubscriberTask")),
|
||||
);
|
||||
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
|
||||
context,
|
||||
&subscriber_tasks::Column::Job,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn register_subscriber_tasks_to_schema_builder(
|
||||
mut builder: SeaographyBuilder,
|
||||
) -> SeaographyBuilder {
|
||||
let subscriber_tasks_scalar = Scalar::new("SubscriberTasks")
|
||||
.description("The subscriber tasks")
|
||||
.validator(|value| -> bool {
|
||||
if let Ok(json) = value.clone().into_json() {
|
||||
serde_json::from_value::<SubscriberTask>(json).is_ok()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
builder.schema = builder.schema.register(subscriber_tasks_scalar);
|
||||
builder
|
||||
}
|
||||
41
apps/recorder/src/graphql/domains/subscribers/filter.rs
Normal file
41
apps/recorder/src/graphql/domains/subscribers/filter.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use async_graphql::dynamic::TypeRef;
|
||||
use lazy_static::lazy_static;
|
||||
use maplit::btreeset;
|
||||
use sea_orm::{ColumnTrait, EntityTrait};
|
||||
use seaography::{BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation};
|
||||
|
||||
use crate::graphql::infra::filter::FnFilterCondition;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SUBSCRIBER_ID_FILTER_INFO: FilterInfo = FilterInfo {
|
||||
type_name: String::from("SubscriberIdFilterInput"),
|
||||
base_type: TypeRef::INT.into(),
|
||||
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn generate_subscriber_id_condition_function<T>(
|
||||
_context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnFilterCondition
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(move |mut condition, filter| {
|
||||
for operation in &SUBSCRIBER_ID_FILTER_INFO.supported_operations {
|
||||
match operation {
|
||||
SeaographqlFilterOperation::Equals => {
|
||||
if let Some(value) = filter.get("eq") {
|
||||
let value: i32 = value.i64()?.try_into()?;
|
||||
let value = sea_orm::Value::Int(Some(value));
|
||||
condition = condition.add(column.eq(value));
|
||||
}
|
||||
}
|
||||
_ => unreachable!("unreachable filter operation for subscriber_id"),
|
||||
}
|
||||
}
|
||||
Ok(condition)
|
||||
})
|
||||
}
|
||||
183
apps/recorder/src/graphql/domains/subscribers/guard.rs
Normal file
183
apps/recorder/src/graphql/domains/subscribers/guard.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::{ResolverContext, ValueAccessor};
|
||||
use sea_orm::EntityTrait;
|
||||
use seaography::{BuilderContext, FnGuard, GuardAction};
|
||||
|
||||
use crate::{
|
||||
auth::{AuthError, AuthUserInfo},
|
||||
graphql::infra::util::{get_column_key, get_entity_key},
|
||||
};
|
||||
|
||||
fn guard_data_object_accessor_with_subscriber_id(
|
||||
value: ValueAccessor<'_>,
|
||||
column_name: &str,
|
||||
subscriber_id: i32,
|
||||
) -> async_graphql::Result<()> {
|
||||
let obj = value.object()?;
|
||||
|
||||
let subscriber_id_value = obj.try_get(column_name)?;
|
||||
|
||||
let id = subscriber_id_value.i64()?;
|
||||
|
||||
if id == subscriber_id as i64 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(async_graphql::Error::new("subscriber not match"))
|
||||
}
|
||||
}
|
||||
|
||||
fn guard_data_object_accessor_with_optional_subscriber_id(
|
||||
value: ValueAccessor<'_>,
|
||||
column_name: &str,
|
||||
subscriber_id: i32,
|
||||
) -> async_graphql::Result<()> {
|
||||
if value.is_null() {
|
||||
return Ok(());
|
||||
}
|
||||
let obj = value.object()?;
|
||||
|
||||
if let Some(subscriber_id_value) = obj.get(column_name) {
|
||||
let id = subscriber_id_value.i64()?;
|
||||
if id == subscriber_id as i64 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(async_graphql::Error::new("subscriber not match"))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn guard_entity_with_subscriber_id<T>(_context: &BuilderContext, _column: &T::Column) -> FnGuard
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
Box::new(move |context: &ResolverContext| -> GuardAction {
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(_) => GuardAction::Allow,
|
||||
Err(err) => GuardAction::Block(Some(err.message)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn guard_field_with_subscriber_id<T>(context: &BuilderContext, column: &T::Column) -> FnGuard
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key);
|
||||
let column_key = get_column_key::<T>(context, column);
|
||||
let column_name = Arc::new(context.entity_object.column_name.as_ref()(
|
||||
&entity_key,
|
||||
&column_key,
|
||||
));
|
||||
let entity_create_one_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name, context.entity_create_one_mutation.mutation_suffix
|
||||
));
|
||||
let entity_create_one_mutation_data_field_name =
|
||||
Arc::new(context.entity_create_one_mutation.data_field.clone());
|
||||
let entity_create_batch_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name,
|
||||
context.entity_create_batch_mutation.mutation_suffix.clone()
|
||||
));
|
||||
let entity_create_batch_mutation_data_field_name =
|
||||
Arc::new(context.entity_create_batch_mutation.data_field.clone());
|
||||
let entity_update_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name, context.entity_update_mutation.mutation_suffix
|
||||
));
|
||||
let entity_update_mutation_data_field_name =
|
||||
Arc::new(context.entity_update_mutation.data_field.clone());
|
||||
|
||||
Box::new(move |context: &ResolverContext| -> GuardAction {
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
let validation_result = match context.field().name() {
|
||||
field if field == entity_create_one_mutation_field_name.as_str() => {
|
||||
if let Some(data_value) = context
|
||||
.args
|
||||
.get(&entity_create_one_mutation_data_field_name)
|
||||
{
|
||||
guard_data_object_accessor_with_subscriber_id(
|
||||
data_value,
|
||||
&column_name,
|
||||
subscriber_id,
|
||||
)
|
||||
.map_err(|inner_error| {
|
||||
AuthError::from_graphql_dynamic_subscribe_id_guard(
|
||||
inner_error,
|
||||
context,
|
||||
&entity_create_one_mutation_data_field_name,
|
||||
&column_name,
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
field if field == entity_create_batch_mutation_field_name.as_str() => {
|
||||
if let Some(data_value) = context
|
||||
.args
|
||||
.get(&entity_create_batch_mutation_data_field_name)
|
||||
{
|
||||
data_value
|
||||
.list()
|
||||
.and_then(|data_list| {
|
||||
data_list.iter().try_for_each(|data_item_value| {
|
||||
guard_data_object_accessor_with_optional_subscriber_id(
|
||||
data_item_value,
|
||||
&column_name,
|
||||
subscriber_id,
|
||||
)
|
||||
})
|
||||
})
|
||||
.map_err(|inner_error| {
|
||||
AuthError::from_graphql_dynamic_subscribe_id_guard(
|
||||
inner_error,
|
||||
context,
|
||||
&entity_create_batch_mutation_data_field_name,
|
||||
&column_name,
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
field if field == entity_update_mutation_field_name.as_str() => {
|
||||
if let Some(data_value) =
|
||||
context.args.get(&entity_update_mutation_data_field_name)
|
||||
{
|
||||
guard_data_object_accessor_with_optional_subscriber_id(
|
||||
data_value,
|
||||
&column_name,
|
||||
subscriber_id,
|
||||
)
|
||||
.map_err(|inner_error| {
|
||||
AuthError::from_graphql_dynamic_subscribe_id_guard(
|
||||
inner_error,
|
||||
context,
|
||||
&entity_update_mutation_data_field_name,
|
||||
&column_name,
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
_ => Ok(()),
|
||||
};
|
||||
match validation_result {
|
||||
Ok(_) => GuardAction::Allow,
|
||||
Err(err) => GuardAction::Block(Some(err.to_string())),
|
||||
}
|
||||
}
|
||||
Err(err) => GuardAction::Block(Some(err.message)),
|
||||
}
|
||||
})
|
||||
}
|
||||
94
apps/recorder/src/graphql/domains/subscribers/mod.rs
Normal file
94
apps/recorder/src/graphql/domains/subscribers/mod.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use sea_orm::{EntityTrait, Iterable};
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext, FilterType, FilterTypesMapHelper};
|
||||
|
||||
mod filter;
|
||||
mod guard;
|
||||
mod transformer;
|
||||
|
||||
use filter::{SUBSCRIBER_ID_FILTER_INFO, generate_subscriber_id_condition_function};
|
||||
use guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id};
|
||||
use transformer::{
|
||||
generate_subscriber_id_filter_condition_transformer,
|
||||
generate_subscriber_id_mutation_input_object_transformer,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
graphql::infra::util::{get_entity_column_key, get_entity_key},
|
||||
models::subscribers,
|
||||
};
|
||||
|
||||
pub fn restrict_subscriber_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_column_key = get_entity_column_key::<T>(context, column);
|
||||
context.guards.entity_guards.insert(
|
||||
entity_key.clone(),
|
||||
guard_entity_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.guards.field_guards.insert(
|
||||
entity_column_key.clone(),
|
||||
guard_field_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.filter_types.overwrites.insert(
|
||||
entity_column_key.clone(),
|
||||
Some(FilterType::Custom(
|
||||
SUBSCRIBER_ID_FILTER_INFO.type_name.clone(),
|
||||
)),
|
||||
);
|
||||
context.filter_types.condition_functions.insert(
|
||||
entity_column_key.clone(),
|
||||
generate_subscriber_id_condition_function::<T>(context, column),
|
||||
);
|
||||
context.transformers.filter_conditions_transformers.insert(
|
||||
entity_key.clone(),
|
||||
generate_subscriber_id_filter_condition_transformer::<T>(context, column),
|
||||
);
|
||||
context
|
||||
.transformers
|
||||
.mutation_input_object_transformers
|
||||
.insert(
|
||||
entity_key,
|
||||
generate_subscriber_id_mutation_input_object_transformer::<T>(context, column),
|
||||
);
|
||||
context
|
||||
.entity_input
|
||||
.insert_skips
|
||||
.push(entity_column_key.clone());
|
||||
context.entity_input.update_skips.push(entity_column_key);
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) {
|
||||
for column in subscribers::Column::iter() {
|
||||
if !matches!(column, subscribers::Column::Id) {
|
||||
let key = get_entity_column_key::<subscribers::Entity>(context, &column);
|
||||
context.filter_types.overwrites.insert(key, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder {
|
||||
{
|
||||
let filter_types_map_helper = FilterTypesMapHelper {
|
||||
context: builder.context,
|
||||
};
|
||||
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO));
|
||||
}
|
||||
|
||||
{
|
||||
builder.register_entity::<subscribers::Entity>(
|
||||
<subscribers::RelatedEntity as sea_orm::Iterable>::iter()
|
||||
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
|
||||
.collect(),
|
||||
);
|
||||
builder = builder.register_entity_dataloader_one_to_one(subscribers::Entity, tokio::spawn);
|
||||
builder = builder.register_entity_dataloader_one_to_many(subscribers::Entity, tokio::spawn);
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
85
apps/recorder/src/graphql/domains/subscribers/transformer.rs
Normal file
85
apps/recorder/src/graphql/domains/subscribers/transformer.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use async_graphql::dynamic::ResolverContext;
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, Value as SeaValue};
|
||||
use seaography::{BuilderContext, FnFilterConditionsTransformer, FnMutationInputObjectTransformer};
|
||||
|
||||
use crate::{
|
||||
auth::AuthUserInfo,
|
||||
graphql::infra::util::{get_column_key, get_entity_key},
|
||||
};
|
||||
|
||||
pub fn generate_subscriber_id_filter_condition_transformer<T>(
|
||||
_context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnFilterConditionsTransformer
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(
|
||||
move |context: &ResolverContext, condition: Condition| -> Condition {
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
condition.add(column.eq(subscriber_id))
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn generate_subscriber_id_mutation_input_object_transformer<T>(
|
||||
context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnMutationInputObjectTransformer
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key);
|
||||
let column_key = get_column_key::<T>(context, column);
|
||||
let column_name = Arc::new(context.entity_object.column_name.as_ref()(
|
||||
&entity_key,
|
||||
&column_key,
|
||||
));
|
||||
let entity_create_one_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name, context.entity_create_one_mutation.mutation_suffix
|
||||
));
|
||||
let entity_create_batch_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name,
|
||||
context.entity_create_batch_mutation.mutation_suffix.clone()
|
||||
));
|
||||
Box::new(
|
||||
move |context: &ResolverContext,
|
||||
mut input: BTreeMap<String, SeaValue>|
|
||||
-> BTreeMap<String, SeaValue> {
|
||||
let field_name = context.field().name();
|
||||
if field_name == entity_create_one_mutation_field_name.as_str()
|
||||
|| field_name == entity_create_batch_mutation_field_name.as_str()
|
||||
{
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
let value = input.get_mut(column_name.as_str());
|
||||
if value.is_none() {
|
||||
input.insert(
|
||||
column_name.as_str().to_string(),
|
||||
SeaValue::Int(Some(subscriber_id)),
|
||||
);
|
||||
}
|
||||
input
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
} else {
|
||||
input
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
226
apps/recorder/src/graphql/domains/subscriptions.rs
Normal file
226
apps/recorder/src/graphql/domains/subscriptions.rs
Normal file
@@ -0,0 +1,226 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::{
|
||||
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef,
|
||||
};
|
||||
use seaography::Builder as SeaographyBuilder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use util_derive::DynamicGraphql;
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
auth::AuthUserInfo,
|
||||
models::subscriptions::{self, SubscriptionTrait},
|
||||
task::SubscriberTask,
|
||||
};
|
||||
|
||||
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||
struct SyncOneSubscriptionFilterInput {
|
||||
pub id: i32,
|
||||
}
|
||||
|
||||
impl SyncOneSubscriptionFilterInput {
|
||||
fn input_type_name() -> &'static str {
|
||||
"SyncOneSubscriptionFilterInput"
|
||||
}
|
||||
|
||||
fn arg_name() -> &'static str {
|
||||
"filter"
|
||||
}
|
||||
|
||||
fn generate_input_object() -> InputObject {
|
||||
InputObject::new(Self::input_type_name())
|
||||
.description("The input of the subscriptionSyncOne series of mutations")
|
||||
.field(InputValue::new(
|
||||
SyncOneSubscriptionFilterInputFieldEnum::Id.as_str(),
|
||||
TypeRef::named_nn(TypeRef::INT),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SyncOneSubscriptionInfo {
|
||||
pub task_id: String,
|
||||
}
|
||||
|
||||
impl SyncOneSubscriptionInfo {
|
||||
fn object_type_name() -> &'static str {
|
||||
"SyncOneSubscriptionInfo"
|
||||
}
|
||||
|
||||
fn generate_output_object() -> Object {
|
||||
Object::new(Self::object_type_name())
|
||||
.description("The output of the subscriptionSyncOne series of mutations")
|
||||
.field(Field::new(
|
||||
SyncOneSubscriptionInfoFieldEnum::TaskId,
|
||||
TypeRef::named_nn(TypeRef::STRING),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let subscription_info = ctx.parent_value.try_downcast_ref::<Self>()?;
|
||||
Ok(Some(async_graphql::Value::from(
|
||||
subscription_info.task_id.as_str(),
|
||||
)))
|
||||
})
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_subscriptions_to_schema_builder(
|
||||
mut builder: SeaographyBuilder,
|
||||
) -> SeaographyBuilder {
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(SyncOneSubscriptionFilterInput::generate_input_object());
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(SyncOneSubscriptionInfo::generate_output_object());
|
||||
|
||||
builder.mutations.push(
|
||||
Field::new(
|
||||
"subscriptionSyncOneFeedsIncremental",
|
||||
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||
|
||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||
|
||||
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||
.args
|
||||
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||
.unwrap()
|
||||
.deserialize()?;
|
||||
|
||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||
app_ctx.as_ref(),
|
||||
filter_input.id,
|
||||
subscriber_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
|
||||
let task_service = app_ctx.task();
|
||||
|
||||
let task_id = task_service
|
||||
.add_subscriber_task(
|
||||
auth_user_info.subscriber_auth.subscriber_id,
|
||||
SubscriberTask::SyncOneSubscriptionFeedsIncremental(
|
||||
subscription.into(),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||
task_id: task_id.to_string(),
|
||||
})))
|
||||
})
|
||||
},
|
||||
)
|
||||
.argument(InputValue::new(
|
||||
SyncOneSubscriptionFilterInput::arg_name(),
|
||||
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||
)),
|
||||
);
|
||||
|
||||
builder.mutations.push(
|
||||
Field::new(
|
||||
"subscriptionSyncOneFeedsFull",
|
||||
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||
|
||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||
|
||||
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||
.args
|
||||
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||
.unwrap()
|
||||
.deserialize()?;
|
||||
|
||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||
app_ctx.as_ref(),
|
||||
filter_input.id,
|
||||
subscriber_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
|
||||
let task_service = app_ctx.task();
|
||||
|
||||
let task_id = task_service
|
||||
.add_subscriber_task(
|
||||
auth_user_info.subscriber_auth.subscriber_id,
|
||||
SubscriberTask::SyncOneSubscriptionFeedsFull(subscription.into()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||
task_id: task_id.to_string(),
|
||||
})))
|
||||
})
|
||||
},
|
||||
)
|
||||
.argument(InputValue::new(
|
||||
SyncOneSubscriptionFilterInput::arg_name(),
|
||||
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||
)),
|
||||
);
|
||||
|
||||
builder.mutations.push(
|
||||
Field::new(
|
||||
"subscriptionSyncOneSources",
|
||||
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||
move |ctx| {
|
||||
FieldFuture::new(async move {
|
||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||
|
||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||
|
||||
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||
.args
|
||||
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||
.unwrap()
|
||||
.deserialize()?;
|
||||
|
||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||
app_ctx.as_ref(),
|
||||
filter_input.id,
|
||||
subscriber_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
|
||||
let task_service = app_ctx.task();
|
||||
|
||||
let task_id = task_service
|
||||
.add_subscriber_task(
|
||||
auth_user_info.subscriber_auth.subscriber_id,
|
||||
SubscriberTask::SyncOneSubscriptionSources(subscription.into()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||
task_id: task_id.to_string(),
|
||||
})))
|
||||
})
|
||||
},
|
||||
)
|
||||
.argument(InputValue::new(
|
||||
SyncOneSubscriptionFilterInput::arg_name(),
|
||||
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||
)),
|
||||
);
|
||||
|
||||
builder
|
||||
}
|
||||
Reference in New Issue
Block a user