From 07955286f168471695ce57af825ff0da23af9a97 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Thu, 12 Jun 2025 03:32:18 +0800 Subject: [PATCH] feat: add tasks manage view --- .vscode/settings.json | 10 +- Cargo.lock | 2 +- Cargo.toml | 2 +- apps/recorder/Cargo.toml | 1 + .../src/extract/mikan/subscription.rs | 30 +- apps/recorder/src/graphql/domains/crypto.rs | 16 +- .../src/graphql/domains/subscriber_tasks.rs | 29 +- .../{subscribers/guard.rs => subscribers.rs} | 181 ++++++++- .../src/graphql/domains/subscribers/filter.rs | 41 -- .../src/graphql/domains/subscribers/mod.rs | 94 ----- .../domains/subscribers/transformer.rs | 85 ---- apps/recorder/src/graphql/infra/filter.rs | 6 - apps/recorder/src/graphql/infra/json.rs | 42 +- apps/recorder/src/graphql/infra/mod.rs | 1 - .../m20250520_021135_subscriber_tasks.rs | 8 +- apps/recorder/src/models/subscriber_tasks.rs | 5 +- apps/recorder/src/task/mod.rs | 4 +- apps/recorder/src/task/registry/mod.rs | 22 +- apps/webui/src/app/config/nav.ts | 2 +- .../components/ui/data-table-row-actions.tsx | 11 +- .../domains/recorder/schema/subscriptions.ts | 41 +- .../src/domains/recorder/schema/tasks.ts | 33 ++ .../recorder/services/subscription.service.ts | 6 +- apps/webui/src/infra/auth/oidc/config.ts | 2 +- .../src/infra/auth/oidc/oidc-auth.provider.ts | 14 +- apps/webui/src/infra/graphql/gql/gql.ts | 6 +- apps/webui/src/infra/graphql/gql/graphql.ts | 28 +- .../src/infra/graphql/graphql.service.ts | 3 +- .../routes/_app/subscriptions/create.tsx | 14 +- .../routes/_app/subscriptions/edit.$id.tsx | 10 +- .../routes/_app/tasks/-actions.tsx | 5 + .../presentation/routes/_app/tasks/manage.tsx | 368 +++++++++++++++++- 32 files changed, 774 insertions(+), 348 deletions(-) rename apps/recorder/src/graphql/domains/{subscribers/guard.rs => subscribers.rs} (50%) delete mode 100644 apps/recorder/src/graphql/domains/subscribers/filter.rs delete mode 100644 apps/recorder/src/graphql/domains/subscribers/mod.rs delete mode 100644 apps/recorder/src/graphql/domains/subscribers/transformer.rs delete mode 100644 apps/recorder/src/graphql/infra/filter.rs create mode 100644 apps/webui/src/presentation/routes/_app/tasks/-actions.tsx diff --git a/.vscode/settings.json b/.vscode/settings.json index 44d8160..e060323 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -39,5 +39,13 @@ "username": "konobangu" } ], - "rust-analyzer.cargo.features": "all" + "rust-analyzer.cargo.features": "all", + // https://github.com/rust-lang/rust/issues/141540 + "rust-analyzer.cargo.targetDir": "target/rust-analyzer", + "rust-analyzer.check.extraEnv": { + "CARGO_TARGET_DIR": "target/rust-analyzer" + }, + "rust-analyzer.cargo.extraEnv": { + "CARGO_TARGET_DIR": "target/analyzer" + } } \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 63fb972..61115e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5995,7 +5995,7 @@ checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" [[package]] name = "seaography" version = "1.1.4" -source = "git+https://github.com/dumtruck/seaography.git?rev=10ba248#10ba2487fb356a0385c598290668a01e0ef21734" +source = "git+https://github.com/dumtruck/seaography.git?rev=01d3f99#01d3f99aebd476860b9c061676b2b22083188b03" dependencies = [ "async-graphql", "fnv", diff --git a/Cargo.toml b/Cargo.toml index 9e33509..04a5b82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,4 +67,4 @@ color-eyre = "0.6.5" inquire = "0.7.5" [patch.crates-io] -seaography = { git = "https://github.com/dumtruck/seaography.git", rev = "10ba248" } +seaography = { git = "https://github.com/dumtruck/seaography.git", rev = "01d3f99" } diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 58032e4..e890dd0 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -87,6 +87,7 @@ seaography = { version = "1.1", features = [ "with-decimal", "with-bigdecimal", "with-postgres-array", + "with-json-as-scalar", ] } base64 = "0.22.1" tower = "0.5.2" diff --git a/apps/recorder/src/extract/mikan/subscription.rs b/apps/recorder/src/extract/mikan/subscription.rs index 9bf9f65..1351f3f 100644 --- a/apps/recorder/src/extract/mikan/subscription.rs +++ b/apps/recorder/src/extract/mikan/subscription.rs @@ -142,7 +142,7 @@ async fn sync_mikan_feeds_from_rss_item_list( #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MikanSubscriberSubscription { - pub id: i32, + pub subscription_id: i32, pub mikan_subscription_token: String, pub subscriber_id: i32, } @@ -154,7 +154,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription { } fn get_subscription_id(&self) -> i32 { - self.id + self.subscription_id } async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { @@ -204,7 +204,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription { })?; Ok(Self { - id: model.id, + subscription_id: model.id, mikan_subscription_token: meta.mikan_subscription_token, subscriber_id: model.subscriber_id, }) @@ -243,7 +243,8 @@ impl MikanSubscriberSubscription { ctx: &dyn AppContextTrait, ) -> RecorderResult> { let subscribed_bangumi_list = - bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.id).await?; + bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.subscription_id) + .await?; let mut rss_item_list = vec![]; for subscribed_bangumi in subscribed_bangumi_list { @@ -252,7 +253,7 @@ impl MikanSubscriberSubscription { .with_whatever_context::<_, String, RecorderError>(|| { format!( "rss link is required, subscription_id = {:?}, bangumi_name = {}", - self.id, subscribed_bangumi.display_name + self.subscription_id, subscribed_bangumi.display_name ) })?; let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; @@ -273,7 +274,7 @@ impl MikanSubscriberSubscription { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] pub struct MikanSeasonSubscription { - pub id: i32, + pub subscription_id: i32, pub year: i32, pub season_str: MikanSeasonStr, pub credential_id: i32, @@ -287,7 +288,7 @@ impl SubscriptionTrait for MikanSeasonSubscription { } fn get_subscription_id(&self) -> i32 { - self.id + self.subscription_id } async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { @@ -363,7 +364,7 @@ impl SubscriptionTrait for MikanSeasonSubscription { })?; Ok(Self { - id: model.id, + subscription_id: model.id, year: source_url_meta.year, season_str: source_url_meta.season_str, credential_id, @@ -400,7 +401,10 @@ impl MikanSeasonSubscription { let db = ctx.db(); let subscribed_bangumi_list = bangumi::Entity::find() - .filter(Condition::all().add(subscription_bangumi::Column::SubscriptionId.eq(self.id))) + .filter( + Condition::all() + .add(subscription_bangumi::Column::SubscriptionId.eq(self.subscription_id)), + ) .join_rev( JoinType::InnerJoin, subscription_bangumi::Relation::Bangumi.def(), @@ -415,7 +419,7 @@ impl MikanSeasonSubscription { .with_whatever_context::<_, String, RecorderError>(|| { format!( "rss_link is required, subscription_id = {}, bangumi_name = {}", - self.id, subscribed_bangumi.display_name + self.subscription_id, subscribed_bangumi.display_name ) })?; let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; @@ -436,7 +440,7 @@ impl MikanSeasonSubscription { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] pub struct MikanBangumiSubscription { - pub id: i32, + pub subscription_id: i32, pub mikan_bangumi_id: String, pub mikan_fansub_id: String, pub subscriber_id: i32, @@ -449,7 +453,7 @@ impl SubscriptionTrait for MikanBangumiSubscription { } fn get_subscription_id(&self) -> i32 { - self.id + self.subscription_id } async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { @@ -487,7 +491,7 @@ impl SubscriptionTrait for MikanBangumiSubscription { })?; Ok(Self { - id: model.id, + subscription_id: model.id, mikan_bangumi_id: meta.mikan_bangumi_id, mikan_fansub_id: meta.mikan_fansub_id, subscriber_id: model.subscriber_id, diff --git a/apps/recorder/src/graphql/domains/crypto.rs b/apps/recorder/src/graphql/domains/crypto.rs index 6806cff..36cac8b 100644 --- a/apps/recorder/src/graphql/domains/crypto.rs +++ b/apps/recorder/src/graphql/domains/crypto.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use async_graphql::dynamic::ValueAccessor; +use async_graphql::dynamic::{ResolverContext, ValueAccessor}; use sea_orm::{EntityTrait, Value as SeaValue}; use seaography::{BuilderContext, SeaResult}; @@ -25,11 +25,15 @@ fn register_crypto_column_input_conversion_to_schema_context( context.types.input_conversions.insert( format!("{entity_name}.{column_name}"), - Box::new(move |value: &ValueAccessor| -> SeaResult { - let source = value.string()?; - let encrypted = ctx.crypto().encrypt_string(source.into())?; - Ok(encrypted.into()) - }), + Box::new( + move |_resolve_context: &ResolverContext<'_>, + value: &ValueAccessor| + -> SeaResult { + let source = value.string()?; + let encrypted = ctx.crypto().encrypt_string(source.into())?; + Ok(encrypted.into()) + }, + ), ); } diff --git a/apps/recorder/src/graphql/domains/subscriber_tasks.rs b/apps/recorder/src/graphql/domains/subscriber_tasks.rs index 006afd0..f3a1445 100644 --- a/apps/recorder/src/graphql/domains/subscriber_tasks.rs +++ b/apps/recorder/src/graphql/domains/subscriber_tasks.rs @@ -1,23 +1,10 @@ -use async_graphql::dynamic::Scalar; -use seaography::{Builder as SeaographyBuilder, BuilderContext, ConvertedType}; +use seaography::{Builder as SeaographyBuilder, BuilderContext}; use crate::{ - graphql::infra::{ - json::restrict_jsonb_filter_input_for_entity, - util::{get_column_key, get_entity_key}, - }, - models::subscriber_tasks::{self, SubscriberTask}, + graphql::infra::json::restrict_jsonb_filter_input_for_entity, models::subscriber_tasks, }; pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) { - let entity_key = get_entity_key::(context); - let column_name = - get_column_key::(context, &subscriber_tasks::Column::Job); - let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); - context.types.overwrites.insert( - column_name, - ConvertedType::Custom(String::from("SubscriberTask")), - ); restrict_jsonb_filter_input_for_entity::( context, &subscriber_tasks::Column::Job, @@ -27,16 +14,6 @@ pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) pub fn register_subscriber_tasks_to_schema_builder( mut builder: SeaographyBuilder, ) -> SeaographyBuilder { - let subscriber_tasks_scalar = Scalar::new("SubscriberTasks") - .description("The subscriber tasks") - .validator(|value| -> bool { - if let Ok(json) = value.clone().into_json() { - serde_json::from_value::(json).is_ok() - } else { - false - } - }); - - builder.schema = builder.schema.register(subscriber_tasks_scalar); + builder.register_enumeration::(); builder } diff --git a/apps/recorder/src/graphql/domains/subscribers/guard.rs b/apps/recorder/src/graphql/domains/subscribers.rs similarity index 50% rename from apps/recorder/src/graphql/domains/subscribers/guard.rs rename to apps/recorder/src/graphql/domains/subscribers.rs index a9a4529..663e984 100644 --- a/apps/recorder/src/graphql/domains/subscribers/guard.rs +++ b/apps/recorder/src/graphql/domains/subscribers.rs @@ -1,14 +1,29 @@ use std::sync::Arc; -use async_graphql::dynamic::{ResolverContext, ValueAccessor}; -use sea_orm::EntityTrait; -use seaography::{BuilderContext, FnGuard, GuardAction}; +use async_graphql::dynamic::{ObjectAccessor, ResolverContext, TypeRef, ValueAccessor}; +use lazy_static::lazy_static; +use maplit::btreeset; +use sea_orm::{ColumnTrait, Condition, EntityTrait, Iterable, Value as SeaValue}; +use seaography::{ + Builder as SeaographyBuilder, BuilderContext, FilterInfo, + FilterOperation as SeaographqlFilterOperation, FilterType, FilterTypesMapHelper, + FnFilterCondition, FnGuard, FnInputTypeNoneConversion, GuardAction, SeaResult, SeaographyError, +}; use crate::{ auth::{AuthError, AuthUserInfo}, - graphql::infra::util::{get_column_key, get_entity_key}, + graphql::infra::util::{get_column_key, get_entity_column_key, get_entity_key}, + models::subscribers, }; +lazy_static! { + pub static ref SUBSCRIBER_ID_FILTER_INFO: FilterInfo = FilterInfo { + type_name: String::from("SubscriberIdFilterInput"), + base_type: TypeRef::INT.into(), + supported_operations: btreeset! { SeaographqlFilterOperation::Equals }, + }; +} + fn guard_data_object_accessor_with_subscriber_id( value: ValueAccessor<'_>, column_name: &str, @@ -181,3 +196,161 @@ where } }) } + +pub fn generate_subscriber_id_filter_condition( + _context: &BuilderContext, + column: &T::Column, +) -> FnFilterCondition +where + T: EntityTrait, + ::Model: Sync, +{ + let column = *column; + Box::new( + move |context: &ResolverContext, + mut condition: Condition, + filter: Option<&ObjectAccessor<'_>>| + -> SeaResult { + match context.ctx.data::() { + Ok(user_info) => { + let subscriber_id = user_info.subscriber_auth.subscriber_id; + + if let Some(filter) = filter { + for operation in &SUBSCRIBER_ID_FILTER_INFO.supported_operations { + match operation { + SeaographqlFilterOperation::Equals => { + if let Some(value) = filter.get("eq") { + let value: i32 = value.i64()?.try_into()?; + if value != subscriber_id { + return Err(SeaographyError::AsyncGraphQLError( + async_graphql::Error::new( + "subscriber_id and auth_info does not match", + ), + )); + } + } + } + _ => unreachable!("unreachable filter operation for subscriber_id"), + } + } + } else { + condition = condition.add(column.eq(subscriber_id)); + } + + Ok(condition) + } + Err(err) => unreachable!("auth user info must be guarded: {:?}", err), + } + }, + ) +} + +pub fn generate_default_subscriber_id_input_conversion( + context: &BuilderContext, + _column: &T::Column, +) -> FnInputTypeNoneConversion +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_key = get_entity_key::(context); + let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); + let entity_create_one_mutation_field_name = Arc::new(format!( + "{}{}", + entity_name, context.entity_create_one_mutation.mutation_suffix + )); + let entity_create_batch_mutation_field_name = Arc::new(format!( + "{}{}", + entity_name, + context.entity_create_batch_mutation.mutation_suffix.clone() + )); + Box::new( + move |context: &ResolverContext| -> SeaResult> { + let field_name = context.field().name(); + if field_name == entity_create_one_mutation_field_name.as_str() + || field_name == entity_create_batch_mutation_field_name.as_str() + { + match context.ctx.data::() { + Ok(user_info) => { + let subscriber_id = user_info.subscriber_auth.subscriber_id; + Ok(Some(SeaValue::Int(Some(subscriber_id)))) + } + Err(err) => unreachable!("auth user info must be guarded: {:?}", err), + } + } else { + Ok(None) + } + }, + ) +} + +pub fn restrict_subscriber_for_entity(context: &mut BuilderContext, column: &T::Column) +where + T: EntityTrait, + ::Model: Sync, +{ + let entity_key = get_entity_key::(context); + let entity_column_key = get_entity_column_key::(context, column); + let column_name = context.entity_object.column_name.as_ref()(&entity_key, &entity_column_key); + context.guards.entity_guards.insert( + entity_key.clone(), + guard_entity_with_subscriber_id::(context, column), + ); + context.guards.field_guards.insert( + entity_column_key.clone(), + guard_field_with_subscriber_id::(context, column), + ); + context.filter_types.overwrites.insert( + entity_column_key.clone(), + Some(FilterType::Custom( + SUBSCRIBER_ID_FILTER_INFO.type_name.clone(), + )), + ); + context.filter_types.condition_functions.insert( + entity_column_key.clone(), + generate_subscriber_id_filter_condition::(context, column), + ); + context.types.input_none_conversions.insert( + column_name.clone(), + generate_default_subscriber_id_input_conversion::(context, column), + ); + context + .entity_input + .insert_skips + .push(entity_column_key.clone()); + + context.entity_input.update_skips.push(entity_column_key); +} + +pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) { + for column in subscribers::Column::iter() { + if !matches!(column, subscribers::Column::Id) { + let key = get_entity_column_key::(context, &column); + context.filter_types.overwrites.insert(key, None); + } + } +} + +pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder { + { + let filter_types_map_helper = FilterTypesMapHelper { + context: builder.context, + }; + + builder.schema = builder + .schema + .register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO)); + } + + { + builder.register_entity::( + ::iter() + .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) + .collect(), + ); + builder = builder.register_entity_dataloader_one_to_one(subscribers::Entity, tokio::spawn); + builder = builder.register_entity_dataloader_one_to_many(subscribers::Entity, tokio::spawn); + } + + builder +} diff --git a/apps/recorder/src/graphql/domains/subscribers/filter.rs b/apps/recorder/src/graphql/domains/subscribers/filter.rs deleted file mode 100644 index 3e3ffcc..0000000 --- a/apps/recorder/src/graphql/domains/subscribers/filter.rs +++ /dev/null @@ -1,41 +0,0 @@ -use async_graphql::dynamic::TypeRef; -use lazy_static::lazy_static; -use maplit::btreeset; -use sea_orm::{ColumnTrait, EntityTrait}; -use seaography::{BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation}; - -use crate::graphql::infra::filter::FnFilterCondition; - -lazy_static! { - pub static ref SUBSCRIBER_ID_FILTER_INFO: FilterInfo = FilterInfo { - type_name: String::from("SubscriberIdFilterInput"), - base_type: TypeRef::INT.into(), - supported_operations: btreeset! { SeaographqlFilterOperation::Equals }, - }; -} - -pub fn generate_subscriber_id_condition_function( - _context: &BuilderContext, - column: &T::Column, -) -> FnFilterCondition -where - T: EntityTrait, - ::Model: Sync, -{ - let column = *column; - Box::new(move |mut condition, filter| { - for operation in &SUBSCRIBER_ID_FILTER_INFO.supported_operations { - match operation { - SeaographqlFilterOperation::Equals => { - if let Some(value) = filter.get("eq") { - let value: i32 = value.i64()?.try_into()?; - let value = sea_orm::Value::Int(Some(value)); - condition = condition.add(column.eq(value)); - } - } - _ => unreachable!("unreachable filter operation for subscriber_id"), - } - } - Ok(condition) - }) -} diff --git a/apps/recorder/src/graphql/domains/subscribers/mod.rs b/apps/recorder/src/graphql/domains/subscribers/mod.rs deleted file mode 100644 index 0c9a18f..0000000 --- a/apps/recorder/src/graphql/domains/subscribers/mod.rs +++ /dev/null @@ -1,94 +0,0 @@ -use sea_orm::{EntityTrait, Iterable}; -use seaography::{Builder as SeaographyBuilder, BuilderContext, FilterType, FilterTypesMapHelper}; - -mod filter; -mod guard; -mod transformer; - -use filter::{SUBSCRIBER_ID_FILTER_INFO, generate_subscriber_id_condition_function}; -use guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id}; -use transformer::{ - generate_subscriber_id_filter_condition_transformer, - generate_subscriber_id_mutation_input_object_transformer, -}; - -use crate::{ - graphql::infra::util::{get_entity_column_key, get_entity_key}, - models::subscribers, -}; - -pub fn restrict_subscriber_for_entity(context: &mut BuilderContext, column: &T::Column) -where - T: EntityTrait, - ::Model: Sync, -{ - let entity_key = get_entity_key::(context); - let entity_column_key = get_entity_column_key::(context, column); - context.guards.entity_guards.insert( - entity_key.clone(), - guard_entity_with_subscriber_id::(context, column), - ); - context.guards.field_guards.insert( - entity_column_key.clone(), - guard_field_with_subscriber_id::(context, column), - ); - context.filter_types.overwrites.insert( - entity_column_key.clone(), - Some(FilterType::Custom( - SUBSCRIBER_ID_FILTER_INFO.type_name.clone(), - )), - ); - context.filter_types.condition_functions.insert( - entity_column_key.clone(), - generate_subscriber_id_condition_function::(context, column), - ); - context.transformers.filter_conditions_transformers.insert( - entity_key.clone(), - generate_subscriber_id_filter_condition_transformer::(context, column), - ); - context - .transformers - .mutation_input_object_transformers - .insert( - entity_key, - generate_subscriber_id_mutation_input_object_transformer::(context, column), - ); - context - .entity_input - .insert_skips - .push(entity_column_key.clone()); - context.entity_input.update_skips.push(entity_column_key); -} - -pub fn register_subscribers_to_schema_context(context: &mut BuilderContext) { - for column in subscribers::Column::iter() { - if !matches!(column, subscribers::Column::Id) { - let key = get_entity_column_key::(context, &column); - context.filter_types.overwrites.insert(key, None); - } - } -} - -pub fn register_subscribers_to_schema_builder(mut builder: SeaographyBuilder) -> SeaographyBuilder { - { - let filter_types_map_helper = FilterTypesMapHelper { - context: builder.context, - }; - - builder.schema = builder - .schema - .register(filter_types_map_helper.generate_filter_input(&SUBSCRIBER_ID_FILTER_INFO)); - } - - { - builder.register_entity::( - ::iter() - .map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context)) - .collect(), - ); - builder = builder.register_entity_dataloader_one_to_one(subscribers::Entity, tokio::spawn); - builder = builder.register_entity_dataloader_one_to_many(subscribers::Entity, tokio::spawn); - } - - builder -} diff --git a/apps/recorder/src/graphql/domains/subscribers/transformer.rs b/apps/recorder/src/graphql/domains/subscribers/transformer.rs deleted file mode 100644 index 1c25ff2..0000000 --- a/apps/recorder/src/graphql/domains/subscribers/transformer.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use async_graphql::dynamic::ResolverContext; -use sea_orm::{ColumnTrait, Condition, EntityTrait, Value as SeaValue}; -use seaography::{BuilderContext, FnFilterConditionsTransformer, FnMutationInputObjectTransformer}; - -use crate::{ - auth::AuthUserInfo, - graphql::infra::util::{get_column_key, get_entity_key}, -}; - -pub fn generate_subscriber_id_filter_condition_transformer( - _context: &BuilderContext, - column: &T::Column, -) -> FnFilterConditionsTransformer -where - T: EntityTrait, - ::Model: Sync, -{ - let column = *column; - Box::new( - move |context: &ResolverContext, condition: Condition| -> Condition { - match context.ctx.data::() { - Ok(user_info) => { - let subscriber_id = user_info.subscriber_auth.subscriber_id; - condition.add(column.eq(subscriber_id)) - } - Err(err) => unreachable!("auth user info must be guarded: {:?}", err), - } - }, - ) -} - -pub fn generate_subscriber_id_mutation_input_object_transformer( - context: &BuilderContext, - column: &T::Column, -) -> FnMutationInputObjectTransformer -where - T: EntityTrait, - ::Model: Sync, -{ - let entity_key = get_entity_key::(context); - let entity_name = context.entity_query_field.type_name.as_ref()(&entity_key); - let column_key = get_column_key::(context, column); - let column_name = Arc::new(context.entity_object.column_name.as_ref()( - &entity_key, - &column_key, - )); - let entity_create_one_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, context.entity_create_one_mutation.mutation_suffix - )); - let entity_create_batch_mutation_field_name = Arc::new(format!( - "{}{}", - entity_name, - context.entity_create_batch_mutation.mutation_suffix.clone() - )); - Box::new( - move |context: &ResolverContext, - mut input: BTreeMap| - -> BTreeMap { - let field_name = context.field().name(); - if field_name == entity_create_one_mutation_field_name.as_str() - || field_name == entity_create_batch_mutation_field_name.as_str() - { - match context.ctx.data::() { - Ok(user_info) => { - let subscriber_id = user_info.subscriber_auth.subscriber_id; - let value = input.get_mut(column_name.as_str()); - if value.is_none() { - input.insert( - column_name.as_str().to_string(), - SeaValue::Int(Some(subscriber_id)), - ); - } - input - } - Err(err) => unreachable!("auth user info must be guarded: {:?}", err), - } - } else { - input - } - }, - ) -} diff --git a/apps/recorder/src/graphql/infra/filter.rs b/apps/recorder/src/graphql/infra/filter.rs deleted file mode 100644 index 7ea9e06..0000000 --- a/apps/recorder/src/graphql/infra/filter.rs +++ /dev/null @@ -1,6 +0,0 @@ -use async_graphql::dynamic::ObjectAccessor; -use sea_orm::Condition; -use seaography::SeaResult; - -pub type FnFilterCondition = - Box SeaResult + Send + Sync>; diff --git a/apps/recorder/src/graphql/infra/json.rs b/apps/recorder/src/graphql/infra/json.rs index bebe97d..ae8baaf 100644 --- a/apps/recorder/src/graphql/infra/json.rs +++ b/apps/recorder/src/graphql/infra/json.rs @@ -1,6 +1,6 @@ use async_graphql::{ Error as GraphqlError, - dynamic::{Scalar, SchemaError}, + dynamic::{ResolverContext, Scalar, SchemaError}, to_value, }; use itertools::Itertools; @@ -9,13 +9,12 @@ use sea_orm::{ Condition, EntityTrait, sea_query::{ArrayType, Expr, ExprTrait, IntoLikeExpr, SimpleExpr, Value as DbValue}, }; -use seaography::{Builder as SeaographyBuilder, BuilderContext, FilterType, SeaographyError}; +use seaography::{ + Builder as SeaographyBuilder, BuilderContext, FilterType, FnFilterCondition, SeaographyError, +}; use serde_json::Value as JsonValue; -use crate::{ - errors::RecorderResult, - graphql::infra::{filter::FnFilterCondition, util::get_entity_column_key}, -}; +use crate::{errors::RecorderResult, graphql::infra::util::get_entity_column_key}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)] pub enum JsonbFilterOperation { @@ -904,20 +903,29 @@ where ::Model: Sync, { let column = *column; - Box::new(move |mut condition, filter| { - let filter_value = to_value(filter.as_index_map()) - .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?; + Box::new( + move |_resolve_context: &ResolverContext<'_>, condition, filter| { + if let Some(filter) = filter { + let filter_value = to_value(filter.as_index_map()).map_err(|e| { + SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)) + })?; - let filter_json: JsonValue = filter_value - .into_json() - .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}"))))?; + let filter_json: JsonValue = filter_value.into_json().map_err(|e| { + SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}"))) + })?; - let cond_where = prepare_jsonb_filter_input(&Expr::col(column), filter_json) - .map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?; + let cond_where = prepare_jsonb_filter_input(&Expr::col(column), filter_json) + .map_err(|e| { + SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)) + })?; - condition = condition.add(cond_where); - Ok(condition) - }) + let condition = condition.add(cond_where); + Ok(condition) + } else { + Ok(condition) + } + }, + ) } pub fn register_jsonb_input_filter_to_schema_builder( diff --git a/apps/recorder/src/graphql/infra/mod.rs b/apps/recorder/src/graphql/infra/mod.rs index 921c535..421cdfb 100644 --- a/apps/recorder/src/graphql/infra/mod.rs +++ b/apps/recorder/src/graphql/infra/mod.rs @@ -1,3 +1,2 @@ pub mod json; pub mod util; -pub mod filter; diff --git a/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs b/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs index 354bc2a..7dd086b 100644 --- a/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs +++ b/apps/recorder/src/migrations/m20250520_021135_subscriber_tasks.rs @@ -12,13 +12,13 @@ impl MigrationTrait for Migration { let db = manager.get_connection(); db.execute_unprepared(&format!( - r#"CREATE OR REPLACE VIEW subscriber_task AS + r#"CREATE OR REPLACE VIEW subscriber_tasks AS SELECT job, job_type, status, - (job->'subscriber_id')::integer AS subscriber_id, - (job->'task_type')::text AS task_type, + (job ->> 'subscriber_id'::text)::integer AS subscriber_id, + job ->> 'task_type'::text AS task_type, id, attempts, max_attempts, @@ -56,7 +56,7 @@ AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#, ) .await?; - db.execute_unprepared("DROP VIEW IF EXISTS subscriber_task") + db.execute_unprepared("DROP VIEW IF EXISTS subscriber_tasks") .await?; Ok(()) diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks.rs index bf20822..2eb0fb0 100644 --- a/apps/recorder/src/models/subscriber_tasks.rs +++ b/apps/recorder/src/models/subscriber_tasks.rs @@ -1,6 +1,7 @@ +use async_trait::async_trait; use sea_orm::entity::prelude::*; -pub use crate::task::SubscriberTask; +pub use crate::task::{SubscriberTask, SubscriberTaskType}; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "subscriber_tasks")] @@ -9,6 +10,7 @@ pub struct Model { pub id: String, pub subscriber_id: i32, pub job: SubscriberTask, + pub task_type: SubscriberTaskType, pub status: String, pub attempts: i32, pub max_attempts: i32, @@ -44,4 +46,5 @@ pub enum RelatedEntity { Subscriber, } +#[async_trait] impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/task/mod.rs b/apps/recorder/src/task/mod.rs index 51fea1c..4b57a50 100644 --- a/apps/recorder/src/task/mod.rs +++ b/apps/recorder/src/task/mod.rs @@ -7,6 +7,8 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber pub use config::TaskConfig; pub use registry::{ - SubscriberTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, + SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, + SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask, + SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, }; pub use service::TaskService; diff --git a/apps/recorder/src/task/registry/mod.rs b/apps/recorder/src/task/registry/mod.rs index ca21f4a..76469f8 100644 --- a/apps/recorder/src/task/registry/mod.rs +++ b/apps/recorder/src/task/registry/mod.rs @@ -1,7 +1,7 @@ mod subscription; use std::sync::Arc; -use sea_orm::FromJsonQueryResult; +use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; use serde::{Deserialize, Serialize}; pub use subscription::{ SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, @@ -15,16 +15,28 @@ use crate::{ models::subscriptions::SubscriptionTrait, }; -#[derive(async_graphql::Enum, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)] +#[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, +)] +#[sea_orm(rs_type = "String", db_type = "Text")] pub enum SubscriberTaskType { #[serde(rename = "sync_one_subscription_feeds_incremental")] - #[graphql(name = "sync_one_subscription_feeds_incremental")] + #[sea_orm(string_value = "sync_one_subscription_feeds_incremental")] SyncOneSubscriptionFeedsIncremental, #[serde(rename = "sync_one_subscription_feeds_full")] - #[graphql(name = "sync_one_subscription_feeds_full")] + #[sea_orm(string_value = "sync_one_subscription_feeds_full")] SyncOneSubscriptionFeedsFull, #[serde(rename = "sync_one_subscription_sources")] - #[graphql(name = "sync_one_subscription_sources")] + #[sea_orm(string_value = "sync_one_subscription_sources")] SyncOneSubscriptionSources, } diff --git a/apps/webui/src/app/config/nav.ts b/apps/webui/src/app/config/nav.ts index 2935de4..9d19e0e 100644 --- a/apps/webui/src/app/config/nav.ts +++ b/apps/webui/src/app/config/nav.ts @@ -67,7 +67,7 @@ export const AppNavMainData: NavMainGroup[] = [ { title: 'Manage', link: { - to: '/task/manage', + to: '/tasks/manage', }, }, ], diff --git a/apps/webui/src/components/ui/data-table-row-actions.tsx b/apps/webui/src/components/ui/data-table-row-actions.tsx index 933fcfc..cf630d2 100644 --- a/apps/webui/src/components/ui/data-table-row-actions.tsx +++ b/apps/webui/src/components/ui/data-table-row-actions.tsx @@ -12,10 +12,12 @@ import { DropdownMenuShortcut, DropdownMenuTrigger, } from "@/components/ui/dropdown-menu"; +import type * as DropdownMenuPrimitive from "@radix-ui/react-dropdown-menu"; -import { PropsWithChildren, useMemo } from "react"; +import { ComponentProps, PropsWithChildren, useMemo } from "react"; -interface DataTableRowActionsProps { +interface DataTableRowActionsProps + extends ComponentProps { row: Row; getId: (row: Row) => Id; showDetail?: boolean; @@ -24,7 +26,6 @@ interface DataTableRowActionsProps { onDetail?: (id: Id) => void; onDelete?: (id: Id) => void; onEdit?: (id: Id) => void; - modal?: boolean; } export function DataTableRowActions({ @@ -37,11 +38,11 @@ export function DataTableRowActions({ onDelete, onEdit, children, - modal, + ...rest }: PropsWithChildren>) { const id = useMemo(() => getId(row), [getId, row]); return ( - +