feat: add tasks manage view
This commit is contained in:
@@ -87,6 +87,7 @@ seaography = { version = "1.1", features = [
|
||||
"with-decimal",
|
||||
"with-bigdecimal",
|
||||
"with-postgres-array",
|
||||
"with-json-as-scalar",
|
||||
] }
|
||||
base64 = "0.22.1"
|
||||
tower = "0.5.2"
|
||||
|
||||
@@ -142,7 +142,7 @@ async fn sync_mikan_feeds_from_rss_item_list(
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct MikanSubscriberSubscription {
|
||||
pub id: i32,
|
||||
pub subscription_id: i32,
|
||||
pub mikan_subscription_token: String,
|
||||
pub subscriber_id: i32,
|
||||
}
|
||||
@@ -154,7 +154,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription {
|
||||
}
|
||||
|
||||
fn get_subscription_id(&self) -> i32 {
|
||||
self.id
|
||||
self.subscription_id
|
||||
}
|
||||
|
||||
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
@@ -204,7 +204,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription {
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
id: model.id,
|
||||
subscription_id: model.id,
|
||||
mikan_subscription_token: meta.mikan_subscription_token,
|
||||
subscriber_id: model.subscriber_id,
|
||||
})
|
||||
@@ -243,7 +243,8 @@ impl MikanSubscriberSubscription {
|
||||
ctx: &dyn AppContextTrait,
|
||||
) -> RecorderResult<Vec<MikanRssEpisodeItem>> {
|
||||
let subscribed_bangumi_list =
|
||||
bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.id).await?;
|
||||
bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.subscription_id)
|
||||
.await?;
|
||||
|
||||
let mut rss_item_list = vec![];
|
||||
for subscribed_bangumi in subscribed_bangumi_list {
|
||||
@@ -252,7 +253,7 @@ impl MikanSubscriberSubscription {
|
||||
.with_whatever_context::<_, String, RecorderError>(|| {
|
||||
format!(
|
||||
"rss link is required, subscription_id = {:?}, bangumi_name = {}",
|
||||
self.id, subscribed_bangumi.display_name
|
||||
self.subscription_id, subscribed_bangumi.display_name
|
||||
)
|
||||
})?;
|
||||
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
|
||||
@@ -273,7 +274,7 @@ impl MikanSubscriberSubscription {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)]
|
||||
pub struct MikanSeasonSubscription {
|
||||
pub id: i32,
|
||||
pub subscription_id: i32,
|
||||
pub year: i32,
|
||||
pub season_str: MikanSeasonStr,
|
||||
pub credential_id: i32,
|
||||
@@ -287,7 +288,7 @@ impl SubscriptionTrait for MikanSeasonSubscription {
|
||||
}
|
||||
|
||||
fn get_subscription_id(&self) -> i32 {
|
||||
self.id
|
||||
self.subscription_id
|
||||
}
|
||||
|
||||
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
@@ -363,7 +364,7 @@ impl SubscriptionTrait for MikanSeasonSubscription {
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
id: model.id,
|
||||
subscription_id: model.id,
|
||||
year: source_url_meta.year,
|
||||
season_str: source_url_meta.season_str,
|
||||
credential_id,
|
||||
@@ -400,7 +401,10 @@ impl MikanSeasonSubscription {
|
||||
let db = ctx.db();
|
||||
|
||||
let subscribed_bangumi_list = bangumi::Entity::find()
|
||||
.filter(Condition::all().add(subscription_bangumi::Column::SubscriptionId.eq(self.id)))
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(subscription_bangumi::Column::SubscriptionId.eq(self.subscription_id)),
|
||||
)
|
||||
.join_rev(
|
||||
JoinType::InnerJoin,
|
||||
subscription_bangumi::Relation::Bangumi.def(),
|
||||
@@ -415,7 +419,7 @@ impl MikanSeasonSubscription {
|
||||
.with_whatever_context::<_, String, RecorderError>(|| {
|
||||
format!(
|
||||
"rss_link is required, subscription_id = {}, bangumi_name = {}",
|
||||
self.id, subscribed_bangumi.display_name
|
||||
self.subscription_id, subscribed_bangumi.display_name
|
||||
)
|
||||
})?;
|
||||
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
|
||||
@@ -436,7 +440,7 @@ impl MikanSeasonSubscription {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)]
|
||||
pub struct MikanBangumiSubscription {
|
||||
pub id: i32,
|
||||
pub subscription_id: i32,
|
||||
pub mikan_bangumi_id: String,
|
||||
pub mikan_fansub_id: String,
|
||||
pub subscriber_id: i32,
|
||||
@@ -449,7 +453,7 @@ impl SubscriptionTrait for MikanBangumiSubscription {
|
||||
}
|
||||
|
||||
fn get_subscription_id(&self) -> i32 {
|
||||
self.id
|
||||
self.subscription_id
|
||||
}
|
||||
|
||||
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
@@ -487,7 +491,7 @@ impl SubscriptionTrait for MikanBangumiSubscription {
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
id: model.id,
|
||||
subscription_id: model.id,
|
||||
mikan_bangumi_id: meta.mikan_bangumi_id,
|
||||
mikan_fansub_id: meta.mikan_fansub_id,
|
||||
subscriber_id: model.subscriber_id,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::ValueAccessor;
|
||||
use async_graphql::dynamic::{ResolverContext, ValueAccessor};
|
||||
use sea_orm::{EntityTrait, Value as SeaValue};
|
||||
use seaography::{BuilderContext, SeaResult};
|
||||
|
||||
@@ -25,11 +25,15 @@ fn register_crypto_column_input_conversion_to_schema_context<T>(
|
||||
|
||||
context.types.input_conversions.insert(
|
||||
format!("{entity_name}.{column_name}"),
|
||||
Box::new(move |value: &ValueAccessor| -> SeaResult<sea_orm::Value> {
|
||||
let source = value.string()?;
|
||||
let encrypted = ctx.crypto().encrypt_string(source.into())?;
|
||||
Ok(encrypted.into())
|
||||
}),
|
||||
Box::new(
|
||||
move |_resolve_context: &ResolverContext<'_>,
|
||||
value: &ValueAccessor|
|
||||
-> SeaResult<sea_orm::Value> {
|
||||
let source = value.string()?;
|
||||
let encrypted = ctx.crypto().encrypt_string(source.into())?;
|
||||
Ok(encrypted.into())
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,23 +1,10 @@
|
||||
use async_graphql::dynamic::Scalar;
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext, ConvertedType};
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext};
|
||||
|
||||
use crate::{
|
||||
graphql::infra::{
|
||||
json::restrict_jsonb_filter_input_for_entity,
|
||||
util::{get_column_key, get_entity_key},
|
||||
},
|
||||
models::subscriber_tasks::{self, SubscriberTask},
|
||||
graphql::infra::json::restrict_jsonb_filter_input_for_entity, models::subscriber_tasks,
|
||||
};
|
||||
|
||||
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
|
||||
let entity_key = get_entity_key::<subscriber_tasks::Entity>(context);
|
||||
let column_name =
|
||||
get_column_key::<subscriber_tasks::Entity>(context, &subscriber_tasks::Column::Job);
|
||||
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name);
|
||||
context.types.overwrites.insert(
|
||||
column_name,
|
||||
ConvertedType::Custom(String::from("SubscriberTask")),
|
||||
);
|
||||
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
|
||||
context,
|
||||
&subscriber_tasks::Column::Job,
|
||||
@@ -27,16 +14,6 @@ pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext)
|
||||
pub fn register_subscriber_tasks_to_schema_builder(
|
||||
mut builder: SeaographyBuilder,
|
||||
) -> SeaographyBuilder {
|
||||
let subscriber_tasks_scalar = Scalar::new("SubscriberTasks")
|
||||
.description("The subscriber tasks")
|
||||
.validator(|value| -> bool {
|
||||
if let Ok(json) = value.clone().into_json() {
|
||||
serde_json::from_value::<SubscriberTask>(json).is_ok()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
builder.schema = builder.schema.register(subscriber_tasks_scalar);
|
||||
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
|
||||
builder
|
||||
}
|
||||
|
||||
@@ -1,14 +1,29 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_graphql::dynamic::{ResolverContext, ValueAccessor};
|
||||
use sea_orm::EntityTrait;
|
||||
use seaography::{BuilderContext, FnGuard, GuardAction};
|
||||
use async_graphql::dynamic::{ObjectAccessor, ResolverContext, TypeRef, ValueAccessor};
|
||||
use lazy_static::lazy_static;
|
||||
use maplit::btreeset;
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, Iterable, Value as SeaValue};
|
||||
use seaography::{
|
||||
Builder as SeaographyBuilder, BuilderContext, FilterInfo,
|
||||
FilterOperation as SeaographqlFilterOperation, FilterType, FilterTypesMapHelper,
|
||||
FnFilterCondition, FnGuard, FnInputTypeNoneConversion, GuardAction, SeaResult, SeaographyError,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
auth::{AuthError, AuthUserInfo},
|
||||
graphql::infra::util::{get_column_key, get_entity_key},
|
||||
graphql::infra::util::{get_column_key, get_entity_column_key, get_entity_key},
|
||||
models::subscribers,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SUBSCRIBER_ID_FILTER_INFO: FilterInfo = FilterInfo {
|
||||
type_name: String::from("SubscriberIdFilterInput"),
|
||||
base_type: TypeRef::INT.into(),
|
||||
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
||||
};
|
||||
}
|
||||
|
||||
fn guard_data_object_accessor_with_subscriber_id(
|
||||
value: ValueAccessor<'_>,
|
||||
column_name: &str,
|
||||
@@ -181,3 +196,161 @@ where
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn generate_subscriber_id_filter_condition<T>(
|
||||
_context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnFilterCondition
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(
|
||||
move |context: &ResolverContext,
|
||||
mut condition: Condition,
|
||||
filter: Option<&ObjectAccessor<'_>>|
|
||||
-> SeaResult<Condition> {
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
|
||||
if let Some(filter) = filter {
|
||||
for operation in &SUBSCRIBER_ID_FILTER_INFO.supported_operations {
|
||||
match operation {
|
||||
SeaographqlFilterOperation::Equals => {
|
||||
if let Some(value) = filter.get("eq") {
|
||||
let value: i32 = value.i64()?.try_into()?;
|
||||
if value != subscriber_id {
|
||||
return Err(SeaographyError::AsyncGraphQLError(
|
||||
async_graphql::Error::new(
|
||||
"subscriber_id and auth_info does not match",
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!("unreachable filter operation for subscriber_id"),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
condition = condition.add(column.eq(subscriber_id));
|
||||
}
|
||||
|
||||
Ok(condition)
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn generate_default_subscriber_id_input_conversion<T>(
|
||||
context: &BuilderContext,
|
||||
_column: &T::Column,
|
||||
) -> FnInputTypeNoneConversion
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key);
|
||||
let entity_create_one_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name, context.entity_create_one_mutation.mutation_suffix
|
||||
));
|
||||
let entity_create_batch_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name,
|
||||
context.entity_create_batch_mutation.mutation_suffix.clone()
|
||||
));
|
||||
Box::new(
|
||||
move |context: &ResolverContext| -> SeaResult<Option<SeaValue>> {
|
||||
let field_name = context.field().name();
|
||||
if field_name == entity_create_one_mutation_field_name.as_str()
|
||||
|| field_name == entity_create_batch_mutation_field_name.as_str()
|
||||
{
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
Ok(Some(SeaValue::Int(Some(subscriber_id))))
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn restrict_subscriber_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_column_key = get_entity_column_key::<T>(context, column);
|
||||
let column_name = context.entity_object.column_name.as_ref()(&entity_key, &entity_column_key);
|
||||
context.guards.entity_guards.insert(
|
||||
entity_key.clone(),
|
||||
guard_entity_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.guards.field_guards.insert(
|
||||
entity_column_key.clone(),
|
||||
guard_field_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.filter_types.overwrites.insert(
|
||||
entity_column_key.clone(),
|
||||
Some(FilterType::Custom(
|
||||
SUBSCRIBER_ID_FILTER_INFO.type_name.clone(),
|
||||
)),
|
||||
);
|
||||
context.filter_types.condition_functions.insert(
|
||||
entity_column_key.clone(),
|
||||
generate_subscriber_id_filter_condition::<T>(context, column),
|
||||
);
|
||||
context.types.input_none_conversions.insert(
|
||||
column_name.clone(),
|
||||
generate_default_subscriber_id_input_conversion::<T>(context, column),
|
||||
);
|
||||
context
|
||||
.entity_input
|
||||
.insert_skips
|
||||
.push(entity_column_key.clone());
|
||||
|
||||
context.entity_input.update_skips.push(entity_column_key);
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) {
|
||||
for column in subscribers::Column::iter() {
|
||||
if !matches!(column, subscribers::Column::Id) {
|
||||
let key = get_entity_column_key::<subscribers::Entity>(context, &column);
|
||||
context.filter_types.overwrites.insert(key, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder {
|
||||
{
|
||||
let filter_types_map_helper = FilterTypesMapHelper {
|
||||
context: builder.context,
|
||||
};
|
||||
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO));
|
||||
}
|
||||
|
||||
{
|
||||
builder.register_entity::<subscribers::Entity>(
|
||||
<subscribers::RelatedEntity as sea_orm::Iterable>::iter()
|
||||
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
|
||||
.collect(),
|
||||
);
|
||||
builder = builder.register_entity_dataloader_one_to_one(subscribers::Entity, tokio::spawn);
|
||||
builder = builder.register_entity_dataloader_one_to_many(subscribers::Entity, tokio::spawn);
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
use async_graphql::dynamic::TypeRef;
|
||||
use lazy_static::lazy_static;
|
||||
use maplit::btreeset;
|
||||
use sea_orm::{ColumnTrait, EntityTrait};
|
||||
use seaography::{BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation};
|
||||
|
||||
use crate::graphql::infra::filter::FnFilterCondition;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SUBSCRIBER_ID_FILTER_INFO: FilterInfo = FilterInfo {
|
||||
type_name: String::from("SubscriberIdFilterInput"),
|
||||
base_type: TypeRef::INT.into(),
|
||||
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn generate_subscriber_id_condition_function<T>(
|
||||
_context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnFilterCondition
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(move |mut condition, filter| {
|
||||
for operation in &SUBSCRIBER_ID_FILTER_INFO.supported_operations {
|
||||
match operation {
|
||||
SeaographqlFilterOperation::Equals => {
|
||||
if let Some(value) = filter.get("eq") {
|
||||
let value: i32 = value.i64()?.try_into()?;
|
||||
let value = sea_orm::Value::Int(Some(value));
|
||||
condition = condition.add(column.eq(value));
|
||||
}
|
||||
}
|
||||
_ => unreachable!("unreachable filter operation for subscriber_id"),
|
||||
}
|
||||
}
|
||||
Ok(condition)
|
||||
})
|
||||
}
|
||||
@@ -1,94 +0,0 @@
|
||||
use sea_orm::{EntityTrait, Iterable};
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext, FilterType, FilterTypesMapHelper};
|
||||
|
||||
mod filter;
|
||||
mod guard;
|
||||
mod transformer;
|
||||
|
||||
use filter::{SUBSCRIBER_ID_FILTER_INFO, generate_subscriber_id_condition_function};
|
||||
use guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id};
|
||||
use transformer::{
|
||||
generate_subscriber_id_filter_condition_transformer,
|
||||
generate_subscriber_id_mutation_input_object_transformer,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
graphql::infra::util::{get_entity_column_key, get_entity_key},
|
||||
models::subscribers,
|
||||
};
|
||||
|
||||
pub fn restrict_subscriber_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_column_key = get_entity_column_key::<T>(context, column);
|
||||
context.guards.entity_guards.insert(
|
||||
entity_key.clone(),
|
||||
guard_entity_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.guards.field_guards.insert(
|
||||
entity_column_key.clone(),
|
||||
guard_field_with_subscriber_id::<T>(context, column),
|
||||
);
|
||||
context.filter_types.overwrites.insert(
|
||||
entity_column_key.clone(),
|
||||
Some(FilterType::Custom(
|
||||
SUBSCRIBER_ID_FILTER_INFO.type_name.clone(),
|
||||
)),
|
||||
);
|
||||
context.filter_types.condition_functions.insert(
|
||||
entity_column_key.clone(),
|
||||
generate_subscriber_id_condition_function::<T>(context, column),
|
||||
);
|
||||
context.transformers.filter_conditions_transformers.insert(
|
||||
entity_key.clone(),
|
||||
generate_subscriber_id_filter_condition_transformer::<T>(context, column),
|
||||
);
|
||||
context
|
||||
.transformers
|
||||
.mutation_input_object_transformers
|
||||
.insert(
|
||||
entity_key,
|
||||
generate_subscriber_id_mutation_input_object_transformer::<T>(context, column),
|
||||
);
|
||||
context
|
||||
.entity_input
|
||||
.insert_skips
|
||||
.push(entity_column_key.clone());
|
||||
context.entity_input.update_skips.push(entity_column_key);
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) {
|
||||
for column in subscribers::Column::iter() {
|
||||
if !matches!(column, subscribers::Column::Id) {
|
||||
let key = get_entity_column_key::<subscribers::Entity>(context, &column);
|
||||
context.filter_types.overwrites.insert(key, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder {
|
||||
{
|
||||
let filter_types_map_helper = FilterTypesMapHelper {
|
||||
context: builder.context,
|
||||
};
|
||||
|
||||
builder.schema = builder
|
||||
.schema
|
||||
.register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO));
|
||||
}
|
||||
|
||||
{
|
||||
builder.register_entity::<subscribers::Entity>(
|
||||
<subscribers::RelatedEntity as sea_orm::Iterable>::iter()
|
||||
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
|
||||
.collect(),
|
||||
);
|
||||
builder = builder.register_entity_dataloader_one_to_one(subscribers::Entity, tokio::spawn);
|
||||
builder = builder.register_entity_dataloader_one_to_many(subscribers::Entity, tokio::spawn);
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use async_graphql::dynamic::ResolverContext;
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, Value as SeaValue};
|
||||
use seaography::{BuilderContext, FnFilterConditionsTransformer, FnMutationInputObjectTransformer};
|
||||
|
||||
use crate::{
|
||||
auth::AuthUserInfo,
|
||||
graphql::infra::util::{get_column_key, get_entity_key},
|
||||
};
|
||||
|
||||
pub fn generate_subscriber_id_filter_condition_transformer<T>(
|
||||
_context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnFilterConditionsTransformer
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(
|
||||
move |context: &ResolverContext, condition: Condition| -> Condition {
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
condition.add(column.eq(subscriber_id))
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn generate_subscriber_id_mutation_input_object_transformer<T>(
|
||||
context: &BuilderContext,
|
||||
column: &T::Column,
|
||||
) -> FnMutationInputObjectTransformer
|
||||
where
|
||||
T: EntityTrait,
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let entity_key = get_entity_key::<T>(context);
|
||||
let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key);
|
||||
let column_key = get_column_key::<T>(context, column);
|
||||
let column_name = Arc::new(context.entity_object.column_name.as_ref()(
|
||||
&entity_key,
|
||||
&column_key,
|
||||
));
|
||||
let entity_create_one_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name, context.entity_create_one_mutation.mutation_suffix
|
||||
));
|
||||
let entity_create_batch_mutation_field_name = Arc::new(format!(
|
||||
"{}{}",
|
||||
entity_name,
|
||||
context.entity_create_batch_mutation.mutation_suffix.clone()
|
||||
));
|
||||
Box::new(
|
||||
move |context: &ResolverContext,
|
||||
mut input: BTreeMap<String, SeaValue>|
|
||||
-> BTreeMap<String, SeaValue> {
|
||||
let field_name = context.field().name();
|
||||
if field_name == entity_create_one_mutation_field_name.as_str()
|
||||
|| field_name == entity_create_batch_mutation_field_name.as_str()
|
||||
{
|
||||
match context.ctx.data::<AuthUserInfo>() {
|
||||
Ok(user_info) => {
|
||||
let subscriber_id = user_info.subscriber_auth.subscriber_id;
|
||||
let value = input.get_mut(column_name.as_str());
|
||||
if value.is_none() {
|
||||
input.insert(
|
||||
column_name.as_str().to_string(),
|
||||
SeaValue::Int(Some(subscriber_id)),
|
||||
);
|
||||
}
|
||||
input
|
||||
}
|
||||
Err(err) => unreachable!("auth user info must be guarded: {:?}", err),
|
||||
}
|
||||
} else {
|
||||
input
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
use async_graphql::dynamic::ObjectAccessor;
|
||||
use sea_orm::Condition;
|
||||
use seaography::SeaResult;
|
||||
|
||||
pub type FnFilterCondition =
|
||||
Box<dyn Fn(Condition, &ObjectAccessor) -> SeaResult<Condition> + Send + Sync>;
|
||||
@@ -1,6 +1,6 @@
|
||||
use async_graphql::{
|
||||
Error as GraphqlError,
|
||||
dynamic::{Scalar, SchemaError},
|
||||
dynamic::{ResolverContext, Scalar, SchemaError},
|
||||
to_value,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
@@ -9,13 +9,12 @@ use sea_orm::{
|
||||
Condition, EntityTrait,
|
||||
sea_query::{ArrayType, Expr, ExprTrait, IntoLikeExpr, SimpleExpr, Value as DbValue},
|
||||
};
|
||||
use seaography::{Builder as SeaographyBuilder, BuilderContext, FilterType, SeaographyError};
|
||||
use seaography::{
|
||||
Builder as SeaographyBuilder, BuilderContext, FilterType, FnFilterCondition, SeaographyError,
|
||||
};
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
use crate::{
|
||||
errors::RecorderResult,
|
||||
graphql::infra::{filter::FnFilterCondition, util::get_entity_column_key},
|
||||
};
|
||||
use crate::{errors::RecorderResult, graphql::infra::util::get_entity_column_key};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
|
||||
pub enum JsonbFilterOperation {
|
||||
@@ -904,20 +903,29 @@ where
|
||||
<T as EntityTrait>::Model: Sync,
|
||||
{
|
||||
let column = *column;
|
||||
Box::new(move |mut condition, filter| {
|
||||
let filter_value = to_value(filter.as_index_map())
|
||||
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?;
|
||||
Box::new(
|
||||
move |_resolve_context: &ResolverContext<'_>, condition, filter| {
|
||||
if let Some(filter) = filter {
|
||||
let filter_value = to_value(filter.as_index_map()).map_err(|e| {
|
||||
SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e))
|
||||
})?;
|
||||
|
||||
let filter_json: JsonValue = filter_value
|
||||
.into_json()
|
||||
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}"))))?;
|
||||
let filter_json: JsonValue = filter_value.into_json().map_err(|e| {
|
||||
SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}")))
|
||||
})?;
|
||||
|
||||
let cond_where = prepare_jsonb_filter_input(&Expr::col(column), filter_json)
|
||||
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?;
|
||||
let cond_where = prepare_jsonb_filter_input(&Expr::col(column), filter_json)
|
||||
.map_err(|e| {
|
||||
SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e))
|
||||
})?;
|
||||
|
||||
condition = condition.add(cond_where);
|
||||
Ok(condition)
|
||||
})
|
||||
let condition = condition.add(cond_where);
|
||||
Ok(condition)
|
||||
} else {
|
||||
Ok(condition)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn register_jsonb_input_filter_to_schema_builder(
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
pub mod json;
|
||||
pub mod util;
|
||||
pub mod filter;
|
||||
|
||||
@@ -12,13 +12,13 @@ impl MigrationTrait for Migration {
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE OR REPLACE VIEW subscriber_task AS
|
||||
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
|
||||
SELECT
|
||||
job,
|
||||
job_type,
|
||||
status,
|
||||
(job->'subscriber_id')::integer AS subscriber_id,
|
||||
(job->'task_type')::text AS task_type,
|
||||
(job ->> 'subscriber_id'::text)::integer AS subscriber_id,
|
||||
job ->> 'task_type'::text AS task_type,
|
||||
id,
|
||||
attempts,
|
||||
max_attempts,
|
||||
@@ -56,7 +56,7 @@ AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared("DROP VIEW IF EXISTS subscriber_task")
|
||||
db.execute_unprepared("DROP VIEW IF EXISTS subscriber_tasks")
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use sea_orm::entity::prelude::*;
|
||||
|
||||
pub use crate::task::SubscriberTask;
|
||||
pub use crate::task::{SubscriberTask, SubscriberTaskType};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
|
||||
#[sea_orm(table_name = "subscriber_tasks")]
|
||||
@@ -9,6 +10,7 @@ pub struct Model {
|
||||
pub id: String,
|
||||
pub subscriber_id: i32,
|
||||
pub job: SubscriberTask,
|
||||
pub task_type: SubscriberTaskType,
|
||||
pub status: String,
|
||||
pub attempts: i32,
|
||||
pub max_attempts: i32,
|
||||
@@ -44,4 +46,5 @@ pub enum RelatedEntity {
|
||||
Subscriber,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
@@ -7,6 +7,8 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber
|
||||
|
||||
pub use config::TaskConfig;
|
||||
pub use registry::{
|
||||
SubscriberTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask,
|
||||
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
|
||||
SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,
|
||||
SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask,
|
||||
};
|
||||
pub use service::TaskService;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
mod subscription;
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::FromJsonQueryResult;
|
||||
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use subscription::{
|
||||
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
|
||||
@@ -15,16 +15,28 @@ use crate::{
|
||||
models::subscriptions::SubscriptionTrait,
|
||||
};
|
||||
|
||||
#[derive(async_graphql::Enum, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)]
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Copy,
|
||||
DeriveActiveEnum,
|
||||
DeriveDisplay,
|
||||
EnumIter,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||
pub enum SubscriberTaskType {
|
||||
#[serde(rename = "sync_one_subscription_feeds_incremental")]
|
||||
#[graphql(name = "sync_one_subscription_feeds_incremental")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_incremental")]
|
||||
SyncOneSubscriptionFeedsIncremental,
|
||||
#[serde(rename = "sync_one_subscription_feeds_full")]
|
||||
#[graphql(name = "sync_one_subscription_feeds_full")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_full")]
|
||||
SyncOneSubscriptionFeedsFull,
|
||||
#[serde(rename = "sync_one_subscription_sources")]
|
||||
#[graphql(name = "sync_one_subscription_sources")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_sources")]
|
||||
SyncOneSubscriptionSources,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user