feat: add task system

This commit is contained in:
master 2025-05-09 00:56:26 +08:00
parent 791b75b3af
commit 9d58d961bd
21 changed files with 70 additions and 190 deletions

View File

@ -2,4 +2,5 @@
recorder-playground = "run -p recorder --example playground -- --environment development"
[build]
rustflags = ["-Zthreads=8", "-Zshare-generics=y"]
# rustflags = ["-Zthreads=8", "-Zshare-generics=y"]
rustflags = ["-Zthreads=8"]

View File

@ -72,7 +72,7 @@ scraper = "0.23"
jwt-authorizer = "0.15.0"
log = "0.4"
async-graphql = { version = "7", features = [] }
async-graphql = { version = "7", features = ["dynamic-schema"] }
async-graphql-axum = "7"
seaography = { version = "1.1", features = [
"with-json",

View File

@ -1,5 +1,6 @@
use std::{ops::Deref, time::Duration};
use apalis_sql::postgres::PostgresStorage;
use sea_orm::{
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, ExecResult,
QueryResult, Statement,
@ -53,6 +54,10 @@ impl DatabaseService {
if config.auto_migrate {
Migrator::up(&db, None).await?;
{
let pool = db.get_postgres_connection_pool();
PostgresStorage::setup(pool).await?;
}
}
Ok(Self {

View File

@ -54,6 +54,8 @@ pub enum RecorderError {
IOError { source: std::io::Error },
#[snafu(transparent)]
DbError { source: sea_orm::DbErr },
#[snafu(transparent)]
DbSqlxError { source: sea_orm::SqlxError },
#[snafu(transparent, context(false))]
FigmentError {
#[snafu(source(from(figment::Error, Box::new)))]

View File

@ -48,6 +48,33 @@ pub struct MikanBangumiMeta {
pub fansub: String,
}
#[async_graphql::Object]
impl MikanBangumiMeta {
async fn homepage(&self) -> &str {
self.homepage.as_str()
}
async fn origin_poster_src(&self) -> Option<&str> {
self.origin_poster_src.as_ref().map(|url| url.as_str())
}
async fn bangumi_title(&self) -> &str {
&self.bangumi_title
}
async fn mikan_bangumi_id(&self) -> &str {
&self.mikan_bangumi_id
}
async fn mikan_fansub_id(&self) -> &str {
&self.mikan_fansub_id
}
async fn fansub(&self) -> &str {
&self.fansub
}
}
impl MikanBangumiMeta {
pub fn from_bangumi_index_and_fansub_meta(
bangumi_index_meta: MikanBangumiIndexMeta,
@ -138,15 +165,19 @@ impl MikanEpisodeHomepageUrlMeta {
}
}
#[derive(Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[derive(async_graphql::Enum, Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum MikanSeasonStr {
#[serde(rename = "")]
#[graphql(name = "spring")]
Spring,
#[serde(rename = "")]
#[graphql(name = "summer")]
Summer,
#[serde(rename = "")]
#[graphql(name = "autumn")]
Autumn,
#[serde(rename = "")]
#[graphql(name = "winter")]
Winter,
}

View File

@ -0,0 +1,4 @@
pub mod filter;
pub mod guard;
pub mod transformer;
pub mod util;

View File

@ -0,0 +1 @@

View File

@ -1,11 +1,8 @@
pub mod config;
pub mod filter;
pub mod guard;
pub mod infra;
pub mod mikan;
pub mod schema_root;
pub mod service;
pub mod subscriptions;
pub mod transformer;
pub mod util;
pub use config::GraphQLConfig;
pub use schema_root::schema;

View File

@ -3,12 +3,12 @@ use once_cell::sync::OnceCell;
use sea_orm::{DatabaseConnection, EntityTrait, Iterable};
use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper};
use super::transformer::{filter_condition_transformer, mutation_input_object_transformer};
use crate::graphql::{
use crate::graphql::infra::{
filter::{
SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info, subscriber_id_condition_function,
},
guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id},
transformer::{filter_condition_transformer, mutation_input_object_transformer},
util::{get_entity_column_key, get_entity_key},
};

View File

@ -9,5 +9,3 @@ pub mod subscribers;
pub mod subscription_bangumi;
pub mod subscription_episode;
pub mod subscriptions;
pub mod task_stream_item;
pub mod tasks;

View File

@ -1,4 +1,3 @@
use async_graphql::SimpleObject;
use async_trait::async_trait;
use sea_orm::{ActiveValue, FromJsonQueryResult, TransactionTrait, entity::prelude::*};
use serde::{Deserialize, Serialize};
@ -10,14 +9,12 @@ use crate::{
pub const SEED_SUBSCRIBER: &str = "konobangu";
#[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject,
)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
pub struct SubscriberBangumiConfig {
pub leading_group_tag: Option<bool>,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize, SimpleObject)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscribers")]
pub struct Model {
#[sea_orm(default_expr = "Expr::current_timestamp()")]

View File

@ -1,62 +0,0 @@
use async_trait::async_trait;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
#[sea_orm(string_value = "r")]
Running,
#[sea_orm(string_value = "s")]
Success,
#[sea_orm(string_value = "f")]
Failed,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub task_id: i32,
pub subscriber_id: i32,
pub item: serde_json::Value,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
#[sea_orm(
belongs_to = "super::tasks::Entity",
from = "Column::TaskId",
to = "super::tasks::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Task,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::Task.def()
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,95 +0,0 @@
use async_trait::async_trait;
use sea_orm::{QuerySelect, entity::prelude::*};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RecorderResult};
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
#[sea_orm(string_value = "p")]
Pending,
#[sea_orm(string_value = "r")]
Running,
#[sea_orm(string_value = "s")]
Success,
#[sea_orm(string_value = "f")]
Failed,
}
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskMode {
#[sea_orm(string_value = "stream")]
Stream,
#[sea_orm(string_value = "future")]
Future,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub task_mode: TaskMode,
pub task_status: TaskStatus,
pub task_type: String,
pub state_data: serde_json::Value,
pub request_data: serde_json::Value,
pub error_data: serde_json::Value,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::task_stream_item::Entity")]
StreamItem,
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::task_stream_item::Entity> for Entity {
fn to() -> RelationDef {
Relation::StreamItem.def()
}
}
impl Model {
pub async fn find_stream_task_by_id(
ctx: &dyn AppContextTrait,
task_id: i32,
) -> RecorderResult<Option<(Model, Vec<super::task_stream_item::Model>)>> {
let db = ctx.db();
let res = Entity::find()
.filter(Column::Id.eq(task_id))
.filter(Column::TaskMode.eq(TaskMode::Stream))
.find_with_related(super::task_stream_item::Entity)
.limit(1)
.all(db)
.await?
.pop();
Ok(res)
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,5 +1,6 @@
mod extract_season_subscription;
mod scrape_season_subscription;
pub use extract_season_subscription::{
ExtractMikanSeasonSubscriptionTask, register_extract_mikan_season_subscription_task,
pub use scrape_season_subscription::{
ScrapeMikanSeasonSubscriptionTask, ScrapeMikanSeasonSubscriptionTaskResult,
register_scrape_mikan_season_subscription_task,
};

View File

@ -16,7 +16,7 @@ use crate::{
const TASK_NAME: &str = "mikan_extract_season_subscription";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanSeasonSubscriptionTask {
pub struct ScrapeMikanSeasonSubscriptionTask {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
@ -26,7 +26,7 @@ pub struct ExtractMikanSeasonSubscriptionTask {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanSeasonSubscriptionTaskResult {
pub struct ScrapeMikanSeasonSubscriptionTaskResult {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
@ -36,10 +36,10 @@ pub struct ExtractMikanSeasonSubscriptionTaskResult {
pub bangumi_meta_list: Vec<MikanBangumiMeta>,
}
pub async fn extract_mikan_season_subscription(
job: ExtractMikanSeasonSubscriptionTask,
pub async fn scrape_mikan_season_subscription(
job: ScrapeMikanSeasonSubscriptionTask,
data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<GoTo<ExtractMikanSeasonSubscriptionTaskResult>> {
) -> RecorderResult<GoTo<ScrapeMikanSeasonSubscriptionTaskResult>> {
let ctx = data.deref();
let mikan_client = ctx.mikan();
@ -56,7 +56,7 @@ pub async fn extract_mikan_season_subscription(
)
.await?;
Ok(GoTo::Done(ExtractMikanSeasonSubscriptionTaskResult {
Ok(GoTo::Done(ScrapeMikanSeasonSubscriptionTaskResult {
bangumi_meta_list,
credential_id: job.credential_id,
season_str: job.season_str,
@ -67,14 +67,14 @@ pub async fn extract_mikan_season_subscription(
}))
}
pub fn register_extract_mikan_season_subscription_task(
pub fn register_scrape_mikan_season_subscription_task(
monitor: Monitor,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<(Monitor, PostgresStorage<StepRequest<serde_json::Value>>)> {
let pool = ctx.db().get_postgres_connection_pool().clone();
let storage = PostgresStorage::new(pool);
let steps = StepBuilder::new().step_fn(extract_mikan_season_subscription);
let steps = StepBuilder::new().step_fn(scrape_mikan_season_subscription);
let worker = WorkerBuilder::new(TASK_NAME)
.catch_panic()

View File

@ -4,14 +4,14 @@ use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use tokio::sync::Mutex;
use super::{TaskConfig, mikan::register_extract_mikan_season_subscription_task};
use super::{TaskConfig, mikan::register_scrape_mikan_season_subscription_task};
use crate::{app::AppContextTrait, errors::RecorderResult};
pub struct TaskService {
config: TaskConfig,
#[allow(dead_code)]
monitor: Arc<Mutex<Monitor>>,
pub extract_mikan_season_subscription_task_storage:
pub scrape_mikan_season_subscription_task_storage:
PostgresStorage<StepRequest<serde_json::Value>>,
}
@ -21,13 +21,13 @@ impl TaskService {
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Self> {
let monitor = Monitor::new();
let (monitor, extract_mikan_season_subscription_task_storage) =
register_extract_mikan_season_subscription_task(monitor, ctx.clone())?;
let (monitor, scrape_mikan_season_subscription_task_storage) =
register_scrape_mikan_season_subscription_task(monitor, ctx.clone())?;
Ok(Self {
config,
monitor: Arc::new(Mutex::new(monitor)),
extract_mikan_season_subscription_task_storage,
scrape_mikan_season_subscription_task_storage,
})
}
}