From 9d58d961bd9ee191153da7b6d1c9b444831aa852 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Fri, 9 May 2025 00:56:26 +0800 Subject: [PATCH] feat: add task system --- .cargo/config.toml | 3 +- apps/recorder/Cargo.toml | 2 +- apps/recorder/src/database/service.rs | 5 + apps/recorder/src/errors/app_error.rs | 2 + apps/recorder/src/extract/mikan/web.rs | 33 ++++++- .../src/graphql/{ => infra}/filter.rs | 0 .../recorder/src/graphql/{ => infra}/guard.rs | 0 apps/recorder/src/graphql/infra/mod.rs | 4 + .../src/graphql/{ => infra}/transformer.rs | 0 apps/recorder/src/graphql/{ => infra}/util.rs | 0 apps/recorder/src/graphql/mikan/mod.rs | 1 + apps/recorder/src/graphql/mod.rs | 7 +- apps/recorder/src/graphql/schema_root.rs | 4 +- .../recorder/src/graphql/subscriptions/mod.rs | 0 apps/recorder/src/models/mod.rs | 2 - apps/recorder/src/models/subscribers.rs | 7 +- apps/recorder/src/models/task_stream_item.rs | 62 ------------ apps/recorder/src/models/tasks.rs | 95 ------------------- apps/recorder/src/tasks/mikan/mod.rs | 7 +- ...ption.rs => scrape_season_subscription.rs} | 16 ++-- apps/recorder/src/tasks/service.rs | 10 +- 21 files changed, 70 insertions(+), 190 deletions(-) rename apps/recorder/src/graphql/{ => infra}/filter.rs (100%) rename apps/recorder/src/graphql/{ => infra}/guard.rs (100%) create mode 100644 apps/recorder/src/graphql/infra/mod.rs rename apps/recorder/src/graphql/{ => infra}/transformer.rs (100%) rename apps/recorder/src/graphql/{ => infra}/util.rs (100%) create mode 100644 apps/recorder/src/graphql/mikan/mod.rs delete mode 100644 apps/recorder/src/graphql/subscriptions/mod.rs delete mode 100644 apps/recorder/src/models/task_stream_item.rs delete mode 100644 apps/recorder/src/models/tasks.rs rename apps/recorder/src/tasks/mikan/{extract_season_subscription.rs => scrape_season_subscription.rs} (82%) diff --git a/.cargo/config.toml b/.cargo/config.toml index 0d82fd2..e4f7cc6 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,4 +2,5 @@ recorder-playground = "run -p recorder --example playground -- --environment development" [build] -rustflags = ["-Zthreads=8", "-Zshare-generics=y"] +# rustflags = ["-Zthreads=8", "-Zshare-generics=y"] +rustflags = ["-Zthreads=8"] diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 9b4c57d..b2b29a8 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -72,7 +72,7 @@ scraper = "0.23" jwt-authorizer = "0.15.0" log = "0.4" -async-graphql = { version = "7", features = [] } +async-graphql = { version = "7", features = ["dynamic-schema"] } async-graphql-axum = "7" seaography = { version = "1.1", features = [ "with-json", diff --git a/apps/recorder/src/database/service.rs b/apps/recorder/src/database/service.rs index 12b1b9a..b12fb98 100644 --- a/apps/recorder/src/database/service.rs +++ b/apps/recorder/src/database/service.rs @@ -1,5 +1,6 @@ use std::{ops::Deref, time::Duration}; +use apalis_sql::postgres::PostgresStorage; use sea_orm::{ ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, ExecResult, QueryResult, Statement, @@ -53,6 +54,10 @@ impl DatabaseService { if config.auto_migrate { Migrator::up(&db, None).await?; + { + let pool = db.get_postgres_connection_pool(); + PostgresStorage::setup(pool).await?; + } } Ok(Self { diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs index 1f06b25..117b2bc 100644 --- a/apps/recorder/src/errors/app_error.rs +++ b/apps/recorder/src/errors/app_error.rs @@ -54,6 +54,8 @@ pub enum RecorderError { IOError { source: std::io::Error }, #[snafu(transparent)] DbError { source: sea_orm::DbErr }, + #[snafu(transparent)] + DbSqlxError { source: sea_orm::SqlxError }, #[snafu(transparent, context(false))] FigmentError { #[snafu(source(from(figment::Error, Box::new)))] diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index cfef820..ecf0be5 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -48,6 +48,33 @@ pub struct MikanBangumiMeta { pub fansub: String, } +#[async_graphql::Object] +impl MikanBangumiMeta { + async fn homepage(&self) -> &str { + self.homepage.as_str() + } + + async fn origin_poster_src(&self) -> Option<&str> { + self.origin_poster_src.as_ref().map(|url| url.as_str()) + } + + async fn bangumi_title(&self) -> &str { + &self.bangumi_title + } + + async fn mikan_bangumi_id(&self) -> &str { + &self.mikan_bangumi_id + } + + async fn mikan_fansub_id(&self) -> &str { + &self.mikan_fansub_id + } + + async fn fansub(&self) -> &str { + &self.fansub + } +} + impl MikanBangumiMeta { pub fn from_bangumi_index_and_fansub_meta( bangumi_index_meta: MikanBangumiIndexMeta, @@ -138,15 +165,19 @@ impl MikanEpisodeHomepageUrlMeta { } } -#[derive(Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[derive(async_graphql::Enum, Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum MikanSeasonStr { #[serde(rename = "春")] + #[graphql(name = "spring")] Spring, #[serde(rename = "夏")] + #[graphql(name = "summer")] Summer, #[serde(rename = "秋")] + #[graphql(name = "autumn")] Autumn, #[serde(rename = "冬")] + #[graphql(name = "winter")] Winter, } diff --git a/apps/recorder/src/graphql/filter.rs b/apps/recorder/src/graphql/infra/filter.rs similarity index 100% rename from apps/recorder/src/graphql/filter.rs rename to apps/recorder/src/graphql/infra/filter.rs diff --git a/apps/recorder/src/graphql/guard.rs b/apps/recorder/src/graphql/infra/guard.rs similarity index 100% rename from apps/recorder/src/graphql/guard.rs rename to apps/recorder/src/graphql/infra/guard.rs diff --git a/apps/recorder/src/graphql/infra/mod.rs b/apps/recorder/src/graphql/infra/mod.rs new file mode 100644 index 0000000..c7a4aa3 --- /dev/null +++ b/apps/recorder/src/graphql/infra/mod.rs @@ -0,0 +1,4 @@ +pub mod filter; +pub mod guard; +pub mod transformer; +pub mod util; diff --git a/apps/recorder/src/graphql/transformer.rs b/apps/recorder/src/graphql/infra/transformer.rs similarity index 100% rename from apps/recorder/src/graphql/transformer.rs rename to apps/recorder/src/graphql/infra/transformer.rs diff --git a/apps/recorder/src/graphql/util.rs b/apps/recorder/src/graphql/infra/util.rs similarity index 100% rename from apps/recorder/src/graphql/util.rs rename to apps/recorder/src/graphql/infra/util.rs diff --git a/apps/recorder/src/graphql/mikan/mod.rs b/apps/recorder/src/graphql/mikan/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/recorder/src/graphql/mikan/mod.rs @@ -0,0 +1 @@ + diff --git a/apps/recorder/src/graphql/mod.rs b/apps/recorder/src/graphql/mod.rs index cb0b690..216dda3 100644 --- a/apps/recorder/src/graphql/mod.rs +++ b/apps/recorder/src/graphql/mod.rs @@ -1,11 +1,8 @@ pub mod config; -pub mod filter; -pub mod guard; +pub mod infra; +pub mod mikan; pub mod schema_root; pub mod service; -pub mod subscriptions; -pub mod transformer; -pub mod util; pub use config::GraphQLConfig; pub use schema_root::schema; diff --git a/apps/recorder/src/graphql/schema_root.rs b/apps/recorder/src/graphql/schema_root.rs index 6d81d11..23e1710 100644 --- a/apps/recorder/src/graphql/schema_root.rs +++ b/apps/recorder/src/graphql/schema_root.rs @@ -3,12 +3,12 @@ use once_cell::sync::OnceCell; use sea_orm::{DatabaseConnection, EntityTrait, Iterable}; use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper}; -use super::transformer::{filter_condition_transformer, mutation_input_object_transformer}; -use crate::graphql::{ +use crate::graphql::infra::{ filter::{ SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info, subscriber_id_condition_function, }, guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id}, + transformer::{filter_condition_transformer, mutation_input_object_transformer}, util::{get_entity_column_key, get_entity_key}, }; diff --git a/apps/recorder/src/graphql/subscriptions/mod.rs b/apps/recorder/src/graphql/subscriptions/mod.rs deleted file mode 100644 index e69de29..0000000 diff --git a/apps/recorder/src/models/mod.rs b/apps/recorder/src/models/mod.rs index 5f02cc8..4fd1ce7 100644 --- a/apps/recorder/src/models/mod.rs +++ b/apps/recorder/src/models/mod.rs @@ -9,5 +9,3 @@ pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; pub mod subscriptions; -pub mod task_stream_item; -pub mod tasks; diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index f916674..843c8a9 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -1,4 +1,3 @@ -use async_graphql::SimpleObject; use async_trait::async_trait; use sea_orm::{ActiveValue, FromJsonQueryResult, TransactionTrait, entity::prelude::*}; use serde::{Deserialize, Serialize}; @@ -10,14 +9,12 @@ use crate::{ pub const SEED_SUBSCRIBER: &str = "konobangu"; -#[derive( - Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject, -)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)] pub struct SubscriberBangumiConfig { pub leading_group_tag: Option, } -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize, SimpleObject)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscribers")] pub struct Model { #[sea_orm(default_expr = "Expr::current_timestamp()")] diff --git a/apps/recorder/src/models/task_stream_item.rs b/apps/recorder/src/models/task_stream_item.rs deleted file mode 100644 index 648882d..0000000 --- a/apps/recorder/src/models/task_stream_item.rs +++ /dev/null @@ -1,62 +0,0 @@ -use async_trait::async_trait; -use sea_orm::entity::prelude::*; -use serde::{Deserialize, Serialize}; - -#[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, -)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] -#[serde(rename_all = "snake_case")] -pub enum TaskStatus { - #[sea_orm(string_value = "r")] - Running, - #[sea_orm(string_value = "s")] - Success, - #[sea_orm(string_value = "f")] - Failed, -} - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] -#[sea_orm(table_name = "tasks")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: i32, - pub task_id: i32, - pub subscriber_id: i32, - pub item: serde_json::Value, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::subscribers::Entity", - from = "Column::SubscriberId", - to = "super::subscribers::Column::Id", - on_update = "Cascade", - on_delete = "Cascade" - )] - Subscriber, - #[sea_orm( - belongs_to = "super::tasks::Entity", - from = "Column::TaskId", - to = "super::tasks::Column::Id", - on_update = "Cascade", - on_delete = "Cascade" - )] - Task, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Subscriber.def() - } -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Task.def() - } -} - -#[async_trait] -impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/tasks.rs b/apps/recorder/src/models/tasks.rs deleted file mode 100644 index cfd6d7f..0000000 --- a/apps/recorder/src/models/tasks.rs +++ /dev/null @@ -1,95 +0,0 @@ -use async_trait::async_trait; -use sea_orm::{QuerySelect, entity::prelude::*}; -use serde::{Deserialize, Serialize}; - -use crate::{app::AppContextTrait, errors::RecorderResult}; - -#[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, -)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] -#[serde(rename_all = "snake_case")] -pub enum TaskStatus { - #[sea_orm(string_value = "p")] - Pending, - #[sea_orm(string_value = "r")] - Running, - #[sea_orm(string_value = "s")] - Success, - #[sea_orm(string_value = "f")] - Failed, -} - -#[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, -)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] -#[serde(rename_all = "snake_case")] -pub enum TaskMode { - #[sea_orm(string_value = "stream")] - Stream, - #[sea_orm(string_value = "future")] - Future, -} - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] -#[sea_orm(table_name = "tasks")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: i32, - pub subscriber_id: i32, - pub task_mode: TaskMode, - pub task_status: TaskStatus, - pub task_type: String, - pub state_data: serde_json::Value, - pub request_data: serde_json::Value, - pub error_data: serde_json::Value, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm(has_many = "super::task_stream_item::Entity")] - StreamItem, - #[sea_orm( - belongs_to = "super::subscribers::Entity", - from = "Column::SubscriberId", - to = "super::subscribers::Column::Id", - on_update = "Cascade", - on_delete = "Cascade" - )] - Subscriber, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Subscriber.def() - } -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::StreamItem.def() - } -} - -impl Model { - pub async fn find_stream_task_by_id( - ctx: &dyn AppContextTrait, - task_id: i32, - ) -> RecorderResult)>> { - let db = ctx.db(); - let res = Entity::find() - .filter(Column::Id.eq(task_id)) - .filter(Column::TaskMode.eq(TaskMode::Stream)) - .find_with_related(super::task_stream_item::Entity) - .limit(1) - .all(db) - .await? - .pop(); - - Ok(res) - } -} - -#[async_trait] -impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/tasks/mikan/mod.rs b/apps/recorder/src/tasks/mikan/mod.rs index c2f1fde..70dbf35 100644 --- a/apps/recorder/src/tasks/mikan/mod.rs +++ b/apps/recorder/src/tasks/mikan/mod.rs @@ -1,5 +1,6 @@ -mod extract_season_subscription; +mod scrape_season_subscription; -pub use extract_season_subscription::{ - ExtractMikanSeasonSubscriptionTask, register_extract_mikan_season_subscription_task, +pub use scrape_season_subscription::{ + ScrapeMikanSeasonSubscriptionTask, ScrapeMikanSeasonSubscriptionTaskResult, + register_scrape_mikan_season_subscription_task, }; diff --git a/apps/recorder/src/tasks/mikan/extract_season_subscription.rs b/apps/recorder/src/tasks/mikan/scrape_season_subscription.rs similarity index 82% rename from apps/recorder/src/tasks/mikan/extract_season_subscription.rs rename to apps/recorder/src/tasks/mikan/scrape_season_subscription.rs index 9db481b..c71fa21 100644 --- a/apps/recorder/src/tasks/mikan/extract_season_subscription.rs +++ b/apps/recorder/src/tasks/mikan/scrape_season_subscription.rs @@ -16,7 +16,7 @@ use crate::{ const TASK_NAME: &str = "mikan_extract_season_subscription"; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ExtractMikanSeasonSubscriptionTask { +pub struct ScrapeMikanSeasonSubscriptionTask { pub task_id: i32, pub year: i32, pub season_str: MikanSeasonStr, @@ -26,7 +26,7 @@ pub struct ExtractMikanSeasonSubscriptionTask { } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ExtractMikanSeasonSubscriptionTaskResult { +pub struct ScrapeMikanSeasonSubscriptionTaskResult { pub task_id: i32, pub year: i32, pub season_str: MikanSeasonStr, @@ -36,10 +36,10 @@ pub struct ExtractMikanSeasonSubscriptionTaskResult { pub bangumi_meta_list: Vec, } -pub async fn extract_mikan_season_subscription( - job: ExtractMikanSeasonSubscriptionTask, +pub async fn scrape_mikan_season_subscription( + job: ScrapeMikanSeasonSubscriptionTask, data: Data>, -) -> RecorderResult> { +) -> RecorderResult> { let ctx = data.deref(); let mikan_client = ctx.mikan(); @@ -56,7 +56,7 @@ pub async fn extract_mikan_season_subscription( ) .await?; - Ok(GoTo::Done(ExtractMikanSeasonSubscriptionTaskResult { + Ok(GoTo::Done(ScrapeMikanSeasonSubscriptionTaskResult { bangumi_meta_list, credential_id: job.credential_id, season_str: job.season_str, @@ -67,14 +67,14 @@ pub async fn extract_mikan_season_subscription( })) } -pub fn register_extract_mikan_season_subscription_task( +pub fn register_scrape_mikan_season_subscription_task( monitor: Monitor, ctx: Arc, ) -> RecorderResult<(Monitor, PostgresStorage>)> { let pool = ctx.db().get_postgres_connection_pool().clone(); let storage = PostgresStorage::new(pool); - let steps = StepBuilder::new().step_fn(extract_mikan_season_subscription); + let steps = StepBuilder::new().step_fn(scrape_mikan_season_subscription); let worker = WorkerBuilder::new(TASK_NAME) .catch_panic() diff --git a/apps/recorder/src/tasks/service.rs b/apps/recorder/src/tasks/service.rs index bf327e9..12d4d93 100644 --- a/apps/recorder/src/tasks/service.rs +++ b/apps/recorder/src/tasks/service.rs @@ -4,14 +4,14 @@ use apalis::prelude::*; use apalis_sql::postgres::PostgresStorage; use tokio::sync::Mutex; -use super::{TaskConfig, mikan::register_extract_mikan_season_subscription_task}; +use super::{TaskConfig, mikan::register_scrape_mikan_season_subscription_task}; use crate::{app::AppContextTrait, errors::RecorderResult}; pub struct TaskService { config: TaskConfig, #[allow(dead_code)] monitor: Arc>, - pub extract_mikan_season_subscription_task_storage: + pub scrape_mikan_season_subscription_task_storage: PostgresStorage>, } @@ -21,13 +21,13 @@ impl TaskService { ctx: Arc, ) -> RecorderResult { let monitor = Monitor::new(); - let (monitor, extract_mikan_season_subscription_task_storage) = - register_extract_mikan_season_subscription_task(monitor, ctx.clone())?; + let (monitor, scrape_mikan_season_subscription_task_storage) = + register_scrape_mikan_season_subscription_task(monitor, ctx.clone())?; Ok(Self { config, monitor: Arc::new(Mutex::new(monitor)), - extract_mikan_season_subscription_task_storage, + scrape_mikan_season_subscription_task_storage, }) } }