From 3a8eb88e1aebc97463df476cd8db0ad04d0190d2 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Thu, 26 Jun 2025 02:56:55 +0800 Subject: [PATCH] feat: add cron --- Cargo.lock | 10 + apps/recorder/Cargo.toml | 1 + apps/recorder/src/app/core.rs | 26 +- apps/recorder/src/auth/oidc.rs | 7 +- apps/recorder/src/errors/app_error.rs | 32 +- apps/recorder/src/extract/mikan/client.rs | 7 +- .../src/graphql/domains/subscriber_tasks.rs | 8 +- .../src/graphql/domains/subscriptions.rs | 24 +- apps/recorder/src/migrations/defs.rs | 21 ++ .../migrations/m20250629_065628_add_cron.rs | 278 ++++++++++++++++ apps/recorder/src/migrations/mod.rs | 2 + apps/recorder/src/models/auth.rs | 4 +- apps/recorder/src/models/cron/core.rs | 9 + apps/recorder/src/models/cron/mod.rs | 305 ++++++++++++++++++ apps/recorder/src/models/cron/registry.rs | 1 + apps/recorder/src/models/feeds/mod.rs | 4 +- apps/recorder/src/models/feeds/registry.rs | 4 +- apps/recorder/src/models/mod.rs | 1 + .../mod.rs} | 0 apps/recorder/src/models/subscribers.rs | 7 +- apps/recorder/src/models/subscriptions/mod.rs | 12 +- apps/recorder/src/task/registry/mod.rs | 134 +------- apps/recorder/src/task/registry/subscriber.rs | 100 ++++++ apps/recorder/src/task/registry/system.rs | 43 +++ apps/recorder/src/task/service.rs | 122 ++++++- 25 files changed, 947 insertions(+), 215 deletions(-) create mode 100644 apps/recorder/src/migrations/m20250629_065628_add_cron.rs create mode 100644 apps/recorder/src/models/cron/core.rs create mode 100644 apps/recorder/src/models/cron/mod.rs create mode 100644 apps/recorder/src/models/cron/registry.rs rename apps/recorder/src/models/{subscriber_tasks.rs => subscriber_tasks/mod.rs} (100%) create mode 100644 apps/recorder/src/task/registry/subscriber.rs create mode 100644 apps/recorder/src/task/registry/system.rs diff --git a/Cargo.lock b/Cargo.lock index 3c2f747..6c8031e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,6 +1580,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "croner" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c344b0690c1ad1c7176fe18eb173e0c927008fdaaa256e40dfd43ddd149c0843" +dependencies = [ + "chrono", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -6750,6 +6759,7 @@ dependencies = [ "cocoon", "color-eyre", "convert_case 0.8.0", + "croner", "ctor", "dotenvy", "downloader", diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 1486d5b..2c30a70 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -164,6 +164,7 @@ quick-xml = { version = "0.37.5", features = [ "serde-types", "serde", ] } +croner = "2.2.0" [dev-dependencies] inquire = { workspace = true } diff --git a/apps/recorder/src/app/core.rs b/apps/recorder/src/app/core.rs index 046cc60..d1a96a9 100644 --- a/apps/recorder/src/app/core.rs +++ b/apps/recorder/src/app/core.rs @@ -107,26 +107,12 @@ impl App { Ok::<(), RecorderError>(()) }, async { - { - let monitor = task.setup_monitor().await?; - if graceful_shutdown { - monitor - .run_with_signal(async move { - Self::shutdown_signal().await; - tracing::info!("apalis shutting down..."); - Ok(()) - }) - .await?; - } else { - monitor.run().await?; - } - } - - Ok::<(), RecorderError>(()) - }, - async { - let listener = task.setup_listener().await?; - listener.listen().await?; + task.run(if graceful_shutdown { + Some(Self::shutdown_signal) + } else { + None + }) + .await?; Ok::<(), RecorderError>(()) } diff --git a/apps/recorder/src/auth/oidc.rs b/apps/recorder/src/auth/oidc.rs index 18a83c3..211bf83 100644 --- a/apps/recorder/src/auth/oidc.rs +++ b/apps/recorder/src/auth/oidc.rs @@ -21,7 +21,6 @@ use openidconnect::{ OAuth2TokenResponse, PkceCodeChallenge, PkceCodeVerifier, RedirectUrl, TokenResponse, core::{CoreAuthenticationFlow, CoreClient, CoreProviderMetadata}, }; -use sea_orm::DbErr; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::ResultExt; @@ -338,9 +337,9 @@ impl AuthServiceTrait for OidcAuthService { } } let subscriber_auth = match crate::models::auth::Model::find_by_pid(ctx, sub).await { - Err(RecorderError::DbError { - source: DbErr::RecordNotFound(..), - }) => crate::models::auth::Model::create_from_oidc(ctx, sub.to_string()).await, + Err(RecorderError::ModelEntityNotFound { .. }) => { + crate::models::auth::Model::create_from_oidc(ctx, sub.to_string()).await + } r => r, } .map_err(|e| { diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs index 9b4dc34..271296f 100644 --- a/apps/recorder/src/errors/app_error.rs +++ b/apps/recorder/src/errors/app_error.rs @@ -18,6 +18,8 @@ use crate::{ #[derive(Snafu, Debug)] #[snafu(visibility(pub(crate)))] pub enum RecorderError { + #[snafu(transparent)] + CronError { source: croner::errors::CronError }, #[snafu(display( "HTTP {status} {reason}, source = {source:?}", status = status, @@ -120,8 +122,13 @@ pub enum RecorderError { #[snafu(source(from(Box, OptDynErr::some)))] source: OptDynErr, }, - #[snafu(display("Model Entity {entity} not found or not belong to subscriber"))] - ModelEntityNotFound { entity: Cow<'static, str> }, + #[snafu(display("Model Entity {entity} not found or not belong to subscriber{}", ( + detail.as_ref().map(|detail| format!(" : {detail}"))).unwrap_or_default() + ))] + ModelEntityNotFound { + entity: Cow<'static, str>, + detail: Option, + }, #[snafu(transparent)] FetchError { source: FetchError }, #[snafu(display("Credential3rdError: {message}, source = {source}"))] @@ -185,9 +192,20 @@ impl RecorderError { } } - pub fn from_db_record_not_found(detail: T) -> Self { - Self::DbError { - source: sea_orm::DbErr::RecordNotFound(detail.to_string()), + pub fn from_model_not_found_detail>, T: ToString>( + model: C, + detail: T, + ) -> Self { + Self::ModelEntityNotFound { + entity: model.into(), + detail: Some(detail.to_string()), + } + } + + pub fn from_model_not_found>>(model: C) -> Self { + Self::ModelEntityNotFound { + entity: model.into(), + detail: None, } } } @@ -252,9 +270,9 @@ impl IntoResponse for RecorderError { ) .into_response() } - Self::ModelEntityNotFound { entity } => ( + merr @ Self::ModelEntityNotFound { .. } => ( StatusCode::NOT_FOUND, - Json::(StandardErrorResponse::from(entity.to_string())), + Json::(StandardErrorResponse::from(merr.to_string())), ) .into_response(), err => ( diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index cfdcece..51e9234 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -4,7 +4,7 @@ use fetch::{HttpClient, HttpClientTrait}; use maplit::hashmap; use scraper::{Html, Selector}; use sea_orm::{ - ActiveModelTrait, ActiveValue::Set, ColumnTrait, DbErr, EntityTrait, QueryFilter, TryIntoModel, + ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter, TryIntoModel, }; use url::Url; use util::OptDynErr; @@ -227,8 +227,9 @@ impl MikanClient { self.fork_with_userpass_credential(userpass_credential) .await } else { - Err(RecorderError::from_db_record_not_found( - DbErr::RecordNotFound(format!("credential={credential_id} not found")), + Err(RecorderError::from_model_not_found_detail( + "credential", + format!("credential id {credential_id} not found"), )) } } diff --git a/apps/recorder/src/graphql/domains/subscriber_tasks.rs b/apps/recorder/src/graphql/domains/subscriber_tasks.rs index 0ada662..4887b91 100644 --- a/apps/recorder/src/graphql/domains/subscriber_tasks.rs +++ b/apps/recorder/src/graphql/domains/subscriber_tasks.rs @@ -93,9 +93,7 @@ pub fn register_subscriber_tasks_entity_mutations( .into_tuple::() .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "SubscriberTask".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; let task = app_ctx.task(); task.retry_subscriber_task(job_id.clone()).await?; @@ -104,9 +102,7 @@ pub fn register_subscriber_tasks_entity_mutations( .filter(subscriber_tasks::Column::Id.eq(&job_id)) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "SubscriberTask".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model))) }) diff --git a/apps/recorder/src/graphql/domains/subscriptions.rs b/apps/recorder/src/graphql/domains/subscriptions.rs index 1dd2e66..99f8cb8 100644 --- a/apps/recorder/src/graphql/domains/subscriptions.rs +++ b/apps/recorder/src/graphql/domains/subscriptions.rs @@ -63,9 +63,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; @@ -85,9 +83,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "SubscriberTask".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; Ok(Some(FieldValue::owned_any(task_model))) }) @@ -121,9 +117,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; @@ -141,9 +135,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "SubscriberTask".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; Ok(Some(FieldValue::owned_any(task_model))) }) @@ -178,9 +170,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(filters_condition) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; @@ -198,9 +188,7 @@ pub fn register_subscriptions_to_schema_builder( .filter(subscriber_tasks::Column::Id.eq(task_id.to_string())) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "SubscriberTask".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?; Ok(Some(FieldValue::owned_any(task_model))) }) diff --git a/apps/recorder/src/migrations/defs.rs b/apps/recorder/src/migrations/defs.rs index ac07220..2427f33 100644 --- a/apps/recorder/src/migrations/defs.rs +++ b/apps/recorder/src/migrations/defs.rs @@ -171,6 +171,27 @@ pub enum Feeds { SubscriptionId, } +#[derive(DeriveIden)] +pub enum Cron { + Table, + Id, + CronSource, + SubscriberId, + SubscriptionId, + CronExpr, + NextRun, + LastRun, + LastError, + Enabled, + LockedBy, + LockedAt, + TimeoutMs, + Attempts, + MaxAttempts, + Priority, + Status, +} + macro_rules! create_postgres_enum_for_active_enum { ($manager: expr, $active_enum: expr, $($enum_value:expr),+) => { { diff --git a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs new file mode 100644 index 0000000..f725077 --- /dev/null +++ b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs @@ -0,0 +1,278 @@ +use async_trait::async_trait; +use sea_orm_migration::{prelude::*, schema::*}; + +use crate::{ + migrations::defs::{ + Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z, + }, + models::cron::{ + CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, + CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum, + CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, + NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + }, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + create_postgres_enum_for_active_enum!(manager, CronSourceEnum, CronSource::Subscription) + .await?; + + create_postgres_enum_for_active_enum!( + manager, + CronStatusEnum, + CronStatus::Pending, + CronStatus::Running, + CronStatus::Completed, + CronStatus::Failed + ) + .await?; + + manager + .create_table( + table_auto_z(Cron::Table) + .col(pk_auto(Cron::Id)) + .col(string(Cron::CronExpr)) + .col(enumeration( + Cron::CronSource, + CronSourceEnum, + CronSource::iden_values(), + )) + .col(integer_null(Cron::SubscriberId)) + .col(integer_null(Cron::SubscriptionId)) + .col(timestamp_with_time_zone_null(Cron::NextRun)) + .col(timestamp_with_time_zone_null(Cron::LastRun)) + .col(string_null(Cron::LastError)) + .col(boolean(Cron::Enabled).default(true)) + .col(string_null(Cron::LockedBy)) + .col(timestamp_with_time_zone_null(Cron::LockedAt)) + .col(integer_null(Cron::TimeoutMs)) + .col(integer(Cron::Attempts)) + .col(integer(Cron::MaxAttempts)) + .col(integer(Cron::Priority)) + .col(enumeration( + Cron::Status, + CronStatusEnum, + CronStatus::iden_values(), + )) + .foreign_key( + ForeignKey::create() + .name("fk_cron_subscriber_id") + .from(Cron::Table, Cron::SubscriberId) + .to(Subscribers::Table, Subscribers::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .foreign_key( + ForeignKey::create() + .name("fk_cron_subscription_id") + .from(Cron::Table, Cron::SubscriptionId) + .to(Subscriptions::Table, Subscriptions::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .to_owned(), + ) + .await?; + + manager + .create_postgres_auto_update_ts_trigger_for_col(Cron::Table, GeneralIds::UpdatedAt) + .await?; + + manager + .create_index( + IndexCreateStatement::new() + .if_not_exists() + .name("idx_cron_cron_source") + .table(Cron::Table) + .col(Cron::CronSource) + .to_owned(), + ) + .await?; + + manager + .create_index( + IndexCreateStatement::new() + .if_not_exists() + .name("idx_cron_next_run") + .table(Cron::Table) + .col(Cron::NextRun) + .to_owned(), + ) + .await?; + + let db = manager.get_connection(); + + db.execute_unprepared(&format!( + r#"CREATE OR REPLACE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}() RETURNS trigger AS $$ + BEGIN + -- Check if the cron is due to run + IF NEW.{next_run} IS NOT NULL + AND NEW.{next_run} <= CURRENT_TIMESTAMP + AND NEW.{enabled} = true + AND NEW.{status} = '{pending}' + AND NEW.{attempts} < NEW.{max_attempts} + -- Check if not locked or lock timeout + AND ( + NEW.{locked_at} IS NULL + OR ( + NEW.{timeout_ms} IS NOT NULL + AND (NEW.{locked_at} + NEW.{timeout_ms} * INTERVAL '1 millisecond') <= CURRENT_TIMESTAMP + ) + ) + -- Make sure the cron is a new due event, not a repeat event + AND ( + OLD.{next_run} IS NULL + OR OLD.{next_run} > CURRENT_TIMESTAMP + OR OLD.{enabled} = false + OR OLD.{status} != '{pending}' + OR OLD.{attempts} != NEW.{attempts} + ) + THEN + PERFORM pg_notify('{CRON_DUE_EVENT}', row_to_json(NEW)::text); + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql;"#, + next_run = &Cron::NextRun.to_string(), + enabled = &Cron::Enabled.to_string(), + locked_at = &Cron::LockedAt.to_string(), + timeout_ms = &Cron::TimeoutMs.to_string(), + status = &Cron::Status.to_string(), + pending = &CronStatus::Pending.to_string(), + attempts = &Cron::Attempts.to_string(), + max_attempts = &Cron::MaxAttempts.to_string(), + )) + .await?; + + db.execute_unprepared(&format!( + r#"CREATE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} + AFTER INSERT OR UPDATE ON {table} + FOR EACH ROW + EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, + table = &Cron::Table.to_string(), + )) + .await?; + + db.execute_unprepared(&format!( + r#"CREATE OR REPLACE FUNCTION {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}() RETURNS INTEGER AS $$ + DECLARE + affected_count INTEGER; + BEGIN + UPDATE {table} + SET + {locked_by} = NULL, + {locked_at} = NULL, + {status} = '{pending}' + WHERE + {locked_by} IS NOT NULL + AND {timeout_ms} IS NOT NULL + AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP + AND {status} = '{running}'; + GET DIAGNOSTICS affected_count = ROW_COUNT; + RETURN affected_count; + END; + $$ LANGUAGE plpgsql;"#, + table = &Cron::Table.to_string(), + locked_by = &Cron::LockedBy.to_string(), + locked_at = &Cron::LockedAt.to_string(), + status = &Cron::Status.to_string(), + running = &CronStatus::Running.to_string(), + pending = &CronStatus::Pending.to_string(), + timeout_ms = &Cron::TimeoutMs.to_string(), + )) + .await?; + + db.execute_unprepared(&format!( + r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$ + DECLARE + cron_record RECORD; + notification_count INTEGER := 0; + BEGIN + FOR cron_record IN + SELECT * FROM {table} + WHERE {next_run} IS NOT NULL + AND {next_run} <= CURRENT_TIMESTAMP + AND {enabled} = true + AND {status} = '{pending}' + AND {attempts} < {max_attempts} + AND ( + {locked_at} IS NULL + OR ( + {timeout_ms} IS NOT NULL + AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP + ) + ) + ORDER BY {priority} ASC, {next_run} ASC + FOR UPDATE SKIP LOCKED + LOOP + PERFORM pg_notify('{CRON_DUE_EVENT}', row_to_json(cron_record)::text); + notification_count := notification_count + 1; + END LOOP; + RETURN notification_count; + END; + $$ LANGUAGE plpgsql;"#, + table = &Cron::Table.to_string(), + next_run = &Cron::NextRun.to_string(), + enabled = &Cron::Enabled.to_string(), + status = &Cron::Status.to_string(), + pending = &CronStatus::Pending.to_string(), + locked_at = &Cron::LockedAt.to_string(), + timeout_ms = &Cron::TimeoutMs.to_string(), + priority = &Cron::Priority.to_string(), + attempts = &Cron::Attempts.to_string(), + max_attempts = &Cron::MaxAttempts.to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + + db.execute_unprepared(&format!( + r#"DROP TRIGGER IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} ON {table};"#, + table = &Cron::Table.to_string(), + )) + .await?; + + db.execute_unprepared(&format!( + r#"DROP FUNCTION IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, + )) + .await?; + + db.execute_unprepared(&format!( + r#"DROP FUNCTION IF EXISTS {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}();"#, + )) + .await?; + + db.execute_unprepared(&format!( + r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#, + )) + .await?; + + manager + .drop_table( + TableDropStatement::new() + .if_exists() + .table(Cron::Table) + .to_owned(), + ) + .await?; + + manager + .drop_postgres_enum_for_active_enum(CronSourceEnum) + .await?; + + manager + .drop_postgres_enum_for_active_enum(CronStatusEnum) + .await?; + + Ok(()) + } +} diff --git a/apps/recorder/src/migrations/mod.rs b/apps/recorder/src/migrations/mod.rs index fe19430..8c0a9b4 100644 --- a/apps/recorder/src/migrations/mod.rs +++ b/apps/recorder/src/migrations/mod.rs @@ -11,6 +11,7 @@ pub mod m20250520_021135_subscriber_tasks; pub mod m20250622_015618_feeds; pub mod m20250622_020819_bangumi_and_episode_type; pub mod m20250625_060701_add_subscription_id_to_subscriber_tasks; +pub mod m20250629_065628_add_cron; pub struct Migrator; @@ -26,6 +27,7 @@ impl MigratorTrait for Migrator { Box::new(m20250622_015618_feeds::Migration), Box::new(m20250622_020819_bangumi_and_episode_type::Migration), Box::new(m20250625_060701_add_subscription_id_to_subscriber_tasks::Migration), + Box::new(m20250629_065628_add_cron::Migration), ] } } diff --git a/apps/recorder/src/models/auth.rs b/apps/recorder/src/models/auth.rs index 992513b..a916145 100644 --- a/apps/recorder/src/models/auth.rs +++ b/apps/recorder/src/models/auth.rs @@ -63,7 +63,9 @@ impl Model { .filter(Column::Pid.eq(pid)) .one(db) .await? - .ok_or_else(|| RecorderError::from_db_record_not_found("auth::find_by_pid"))?; + .ok_or_else(|| { + RecorderError::from_model_not_found_detail("auth", format!("pid {pid} not found")) + })?; Ok(subscriber_auth) } diff --git a/apps/recorder/src/models/cron/core.rs b/apps/recorder/src/models/cron/core.rs new file mode 100644 index 0000000..2324876 --- /dev/null +++ b/apps/recorder/src/models/cron/core.rs @@ -0,0 +1,9 @@ +pub const CRON_DUE_EVENT: &str = "cron_due"; + +pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str = + "check_and_cleanup_expired_cron_locks"; +pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons"; + +pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; +pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str = + "notify_due_cron_when_mutating_trigger"; diff --git a/apps/recorder/src/models/cron/mod.rs b/apps/recorder/src/models/cron/mod.rs new file mode 100644 index 0000000..dc9b3a5 --- /dev/null +++ b/apps/recorder/src/models/cron/mod.rs @@ -0,0 +1,305 @@ +mod core; +mod registry; + +pub use core::{ + CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, + CRON_DUE_EVENT, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, + NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, +}; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use croner::Cron; +use sea_orm::{ + ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect, + Statement, TransactionTrait, entity::prelude::*, sea_query::LockType, + sqlx::postgres::PgNotification, +}; +use serde::{Deserialize, Serialize}; + +use crate::{ + app::AppContextTrait, + errors::{RecorderError, RecorderResult}, + models::subscriptions::{self}, +}; + +#[derive( + Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_source")] +#[serde(rename_all = "snake_case")] +pub enum CronSource { + #[sea_orm(string_value = "subscription")] + Subscription, +} + +#[derive( + Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_status")] +#[serde(rename_all = "snake_case")] +pub enum CronStatus { + #[sea_orm(string_value = "pending")] + Pending, + #[sea_orm(string_value = "running")] + Running, + #[sea_orm(string_value = "completed")] + Completed, + #[sea_orm(string_value = "failed")] + Failed, +} + +#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "cron")] +pub struct Model { + #[sea_orm(default_expr = "Expr::current_timestamp()")] + pub created_at: DateTimeUtc, + #[sea_orm(default_expr = "Expr::current_timestamp()")] + pub updated_at: DateTimeUtc, + #[sea_orm(primary_key)] + pub id: i32, + pub cron_source: CronSource, + pub subscriber_id: Option, + pub subscription_id: Option, + pub cron_expr: String, + pub next_run: Option, + pub last_run: Option, + pub last_error: Option, + pub locked_by: Option, + pub locked_at: Option, + pub timeout_ms: i32, + #[sea_orm(default_expr = "0")] + pub attempts: i32, + #[sea_orm(default_expr = "1")] + pub max_attempts: i32, + #[sea_orm(default_expr = "0")] + pub priority: i32, + pub status: CronStatus, + #[sea_orm(default_expr = "true")] + pub enabled: bool, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, + #[sea_orm( + belongs_to = "super::subscriptions::Entity", + from = "Column::SubscriptionId", + to = "super::subscriptions::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscription, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscription.def() + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] +pub enum RelatedEntity { + #[sea_orm(entity = "super::subscribers::Entity")] + Subscriber, + #[sea_orm(entity = "super::subscriptions::Entity")] + Subscription, +} + +#[async_trait] +impl ActiveModelBehavior for ActiveModel {} + +impl Model { + pub async fn handle_cron_notification( + ctx: &dyn AppContextTrait, + notification: PgNotification, + worker_id: &str, + ) -> RecorderResult<()> { + let payload: Self = serde_json::from_str(notification.payload())?; + let cron_id = payload.id; + + tracing::debug!("Cron notification received for cron {cron_id} and worker {worker_id}"); + + match Self::try_acquire_lock_with_cron_id(ctx, cron_id, worker_id).await? { + Some(cron) => match cron.exec_cron(ctx).await { + Ok(()) => { + tracing::debug!("Cron {cron_id} executed successfully"); + cron.mark_cron_completed(ctx).await?; + } + Err(e) => { + tracing::error!("Error executing cron {cron_id}: {e}"); + cron.mark_cron_failed(ctx, &e.to_string()).await?; + } + }, + None => { + tracing::debug!( + "Cron lock not acquired for cron {cron_id} and worker {worker_id}, skipping..." + ); + } + } + + Ok(()) + } + + async fn try_acquire_lock_with_cron_id( + ctx: &dyn AppContextTrait, + cron_id: i32, + worker_id: &str, + ) -> RecorderResult> { + let db = ctx.db(); + let txn = db.begin().await?; + + let cron = Entity::find_by_id(cron_id) + .lock(LockType::Update) + .one(&txn) + .await?; + + if let Some(cron) = cron { + if cron.enabled + && cron.attempts < cron.max_attempts + && cron.status == CronStatus::Pending + && (cron.locked_at.is_none_or(|locked_at| { + locked_at + chrono::Duration::milliseconds(cron.timeout_ms as i64) <= Utc::now() + })) + && cron.next_run.is_some_and(|next_run| next_run <= Utc::now()) + { + let cron_active_model = ActiveModel { + id: Set(cron.id), + locked_by: Set(Some(worker_id.to_string())), + locked_at: Set(Some(Utc::now())), + status: Set(CronStatus::Running), + attempts: Set(cron.attempts + 1), + ..Default::default() + }; + let cron_model = cron_active_model.update(&txn).await?; + txn.commit().await?; + return Ok(Some(cron_model)); + } + txn.commit().await?; + return Ok(Some(cron)); + } + txn.rollback().await?; + Ok(None) + } + + async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { + match self.cron_source { + CronSource::Subscription => { + let subscription_id = self.subscription_id.unwrap_or_else(|| { + unreachable!("Subscription cron must have a subscription id") + }); + + let subscription = subscriptions::Entity::find_by_id(subscription_id) + .one(ctx.db()) + .await? + .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; + + subscription.exec_cron(ctx).await?; + } + } + + Ok(()) + } + + async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { + let db = ctx.db(); + + let next_run = self.calculate_next_run(&self.cron_expr)?; + + ActiveModel { + id: Set(self.id), + next_run: Set(Some(next_run)), + last_run: Set(Some(Utc::now())), + status: Set(CronStatus::Pending), + locked_by: Set(None), + locked_at: Set(None), + attempts: Set(0), + last_error: Set(None), + ..Default::default() + } + .update(db) + .await?; + + Ok(()) + } + + async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> { + let db = ctx.db(); + + let should_retry = self.attempts < self.max_attempts; + + let status = if should_retry { + CronStatus::Pending + } else { + CronStatus::Failed + }; + + let next_run = if should_retry { + Some(Utc::now() + chrono::Duration::seconds(5)) + } else { + Some(self.calculate_next_run(&self.cron_expr)?) + }; + + ActiveModel { + id: Set(self.id), + next_run: Set(next_run), + status: Set(status), + locked_by: Set(None), + locked_at: Set(None), + last_run: Set(Some(Utc::now())), + last_error: Set(Some(error.to_string())), + attempts: Set(if should_retry { self.attempts + 1 } else { 0 }), + ..Default::default() + } + .update(db) + .await?; + + Ok(()) + } + + pub async fn cleanup_expired_locks(ctx: &dyn AppContextTrait) -> RecorderResult { + let db = ctx.db(); + + let result = db + .execute(Statement::from_string( + db.get_database_backend(), + format!("SELECT {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}()"), + )) + .await?; + + Ok(result.rows_affected() as i32) + } + + pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> { + let db = ctx.db(); + + db.execute(Statement::from_string( + db.get_database_backend(), + format!("SELECT {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}()"), + )) + .await?; + + Ok(()) + } + + fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult> { + let cron_expr = Cron::new(cron_expr).parse()?; + + let next = cron_expr.find_next_occurrence(&Utc::now(), false)?; + + Ok(next) + } +} diff --git a/apps/recorder/src/models/cron/registry.rs b/apps/recorder/src/models/cron/registry.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/recorder/src/models/cron/registry.rs @@ -0,0 +1 @@ + diff --git a/apps/recorder/src/models/feeds/mod.rs b/apps/recorder/src/models/feeds/mod.rs index 7c4c8fd..150af3a 100644 --- a/apps/recorder/src/models/feeds/mod.rs +++ b/apps/recorder/src/models/feeds/mod.rs @@ -122,9 +122,7 @@ impl Model { .filter(Column::FeedType.eq(FeedType::Rss)) .one(db) .await? - .ok_or(RecorderError::ModelEntityNotFound { - entity: "Feed".into(), - })?; + .ok_or(RecorderError::from_model_not_found("Feed"))?; let feed = Feed::from_model(ctx, feed_model).await?; diff --git a/apps/recorder/src/models/feeds/registry.rs b/apps/recorder/src/models/feeds/registry.rs index 75d1254..73acf60 100644 --- a/apps/recorder/src/models/feeds/registry.rs +++ b/apps/recorder/src/models/feeds/registry.rs @@ -44,9 +44,7 @@ impl Feed { .await?; (subscription, episodes) } else { - return Err(RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - }); + return Err(RecorderError::from_model_not_found("Subscription")); }; Ok(Feed::SubscritpionEpisodes( diff --git a/apps/recorder/src/models/mod.rs b/apps/recorder/src/models/mod.rs index adbd234..c24796e 100644 --- a/apps/recorder/src/models/mod.rs +++ b/apps/recorder/src/models/mod.rs @@ -11,3 +11,4 @@ pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; pub mod subscriptions; +pub mod cron; diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks/mod.rs similarity index 100% rename from apps/recorder/src/models/subscriber_tasks.rs rename to apps/recorder/src/models/subscriber_tasks/mod.rs diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index 5c13d23..981522f 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -130,10 +130,9 @@ impl Model { pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RecorderResult { let db = ctx.db(); - let subscriber = Entity::find_by_id(id) - .one(db) - .await? - .ok_or_else(|| RecorderError::from_db_record_not_found("subscribers::find_by_id"))?; + let subscriber = Entity::find_by_id(id).one(db).await?.ok_or_else(|| { + RecorderError::from_model_not_found_detail("subscribers", format!("id {id} not found")) + })?; Ok(subscriber) } diff --git a/apps/recorder/src/models/subscriptions/mod.rs b/apps/recorder/src/models/subscriptions/mod.rs index 2e4e5ee..ce385e1 100644 --- a/apps/recorder/src/models/subscriptions/mod.rs +++ b/apps/recorder/src/models/subscriptions/mod.rs @@ -190,16 +190,16 @@ impl Model { let subscription_model = Entity::find_by_id(subscription_id) .one(db) .await? - .ok_or_else(|| RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - })?; + .ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?; if subscription_model.subscriber_id != subscriber_id { - Err(RecorderError::ModelEntityNotFound { - entity: "Subscription".into(), - })?; + Err(RecorderError::from_model_not_found("Subscription"))?; } Ok(subscription_model) } + + pub async fn exec_cron(&self, _ctx: &dyn AppContextTrait) -> RecorderResult<()> { + todo!() + } } diff --git a/apps/recorder/src/task/registry/mod.rs b/apps/recorder/src/task/registry/mod.rs index cca15ae..e3c5750 100644 --- a/apps/recorder/src/task/registry/mod.rs +++ b/apps/recorder/src/task/registry/mod.rs @@ -1,134 +1,18 @@ mod media; +mod subscriber; mod subscription; -use std::sync::Arc; +mod system; pub use media::OptimizeImageTask; -use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; -use serde::{Deserialize, Serialize}; +pub use subscriber::{ + SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, + SubscriberTaskTypeVariantIter, +}; pub use subscription::{ SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, }; - -use crate::{ - app::AppContextTrait, - errors::{RecorderError, RecorderResult}, - models::subscriptions::SubscriptionTrait, - task::AsyncTaskTrait, +pub use system::{ + SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, + SystemTaskTypeVariantIter, }; - -#[derive( - Clone, - Debug, - Serialize, - Deserialize, - PartialEq, - Eq, - Copy, - DeriveActiveEnum, - DeriveDisplay, - EnumIter, -)] -#[sea_orm(rs_type = "String", db_type = "Text")] -pub enum SubscriberTaskType { - #[serde(rename = "sync_one_subscription_feeds_incremental")] - #[sea_orm(string_value = "sync_one_subscription_feeds_incremental")] - SyncOneSubscriptionFeedsIncremental, - #[serde(rename = "sync_one_subscription_feeds_full")] - #[sea_orm(string_value = "sync_one_subscription_feeds_full")] - SyncOneSubscriptionFeedsFull, - #[serde(rename = "sync_one_subscription_sources")] - #[sea_orm(string_value = "sync_one_subscription_sources")] - SyncOneSubscriptionSources, -} - -impl TryFrom<&SubscriberTask> for serde_json::Value { - type Error = RecorderError; - - fn try_from(value: &SubscriberTask) -> Result { - let json_value = serde_json::to_value(value)?; - Ok(match json_value { - serde_json::Value::Object(mut map) => { - map.remove("task_type"); - serde_json::Value::Object(map) - } - _ => { - unreachable!("subscriber task must be an json object"); - } - }) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] -#[serde(tag = "task_type")] -pub enum SubscriberTask { - #[serde(rename = "sync_one_subscription_feeds_incremental")] - SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), - #[serde(rename = "sync_one_subscription_feeds_full")] - SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), - #[serde(rename = "sync_one_subscription_sources")] - SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), -} - -impl SubscriberTask { - pub fn get_subscriber_id(&self) -> i32 { - match self { - Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(), - Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(), - Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(), - } - } - - pub async fn run(self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await, - Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await, - Self::SyncOneSubscriptionSources(task) => task.run(ctx).await, - } - } - - pub fn task_type(&self) -> SubscriberTaskType { - match self { - Self::SyncOneSubscriptionFeedsIncremental(_) => { - SubscriberTaskType::SyncOneSubscriptionFeedsIncremental - } - Self::SyncOneSubscriptionFeedsFull(_) => { - SubscriberTaskType::SyncOneSubscriptionFeedsFull - } - Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources, - } - } -} - -#[derive( - Clone, - Debug, - Serialize, - Deserialize, - PartialEq, - Eq, - Copy, - DeriveActiveEnum, - DeriveDisplay, - EnumIter, -)] -#[sea_orm(rs_type = "String", db_type = "Text")] -pub enum SystemTaskType { - #[serde(rename = "optimize_image")] - #[sea_orm(string_value = "optimize_image")] - OptimizeImage, -} - -#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)] -pub enum SystemTask { - #[serde(rename = "optimize_image")] - OptimizeImage(OptimizeImageTask), -} - -impl SystemTask { - pub async fn run(self, ctx: Arc) -> RecorderResult<()> { - match self { - Self::OptimizeImage(task) => task.run(ctx).await, - } - } -} diff --git a/apps/recorder/src/task/registry/subscriber.rs b/apps/recorder/src/task/registry/subscriber.rs new file mode 100644 index 0000000..b1be78b --- /dev/null +++ b/apps/recorder/src/task/registry/subscriber.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; +use serde::{Deserialize, Serialize}; + +use crate::{ + app::AppContextTrait, + errors::{RecorderError, RecorderResult}, + models::subscriptions::SubscriptionTrait, + task::{ + AsyncTaskTrait, + registry::{ + SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, + SyncOneSubscriptionSourcesTask, + }, + }, +}; + +#[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, +)] +#[sea_orm(rs_type = "String", db_type = "Text")] +pub enum SubscriberTaskType { + #[serde(rename = "sync_one_subscription_feeds_incremental")] + #[sea_orm(string_value = "sync_one_subscription_feeds_incremental")] + SyncOneSubscriptionFeedsIncremental, + #[serde(rename = "sync_one_subscription_feeds_full")] + #[sea_orm(string_value = "sync_one_subscription_feeds_full")] + SyncOneSubscriptionFeedsFull, + #[serde(rename = "sync_one_subscription_sources")] + #[sea_orm(string_value = "sync_one_subscription_sources")] + SyncOneSubscriptionSources, +} + +impl TryFrom<&SubscriberTask> for serde_json::Value { + type Error = RecorderError; + + fn try_from(value: &SubscriberTask) -> Result { + let json_value = serde_json::to_value(value)?; + Ok(match json_value { + serde_json::Value::Object(mut map) => { + map.remove("task_type"); + serde_json::Value::Object(map) + } + _ => { + unreachable!("subscriber task must be an json object"); + } + }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] +#[serde(tag = "task_type")] +pub enum SubscriberTask { + #[serde(rename = "sync_one_subscription_feeds_incremental")] + SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), + #[serde(rename = "sync_one_subscription_feeds_full")] + SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), + #[serde(rename = "sync_one_subscription_sources")] + SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), +} + +impl SubscriberTask { + pub fn get_subscriber_id(&self) -> i32 { + match self { + Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(), + Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(), + Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(), + } + } + + pub async fn run(self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await, + Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await, + Self::SyncOneSubscriptionSources(task) => task.run(ctx).await, + } + } + + pub fn task_type(&self) -> SubscriberTaskType { + match self { + Self::SyncOneSubscriptionFeedsIncremental(_) => { + SubscriberTaskType::SyncOneSubscriptionFeedsIncremental + } + Self::SyncOneSubscriptionFeedsFull(_) => { + SubscriberTaskType::SyncOneSubscriptionFeedsFull + } + Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources, + } + } +} diff --git a/apps/recorder/src/task/registry/system.rs b/apps/recorder/src/task/registry/system.rs new file mode 100644 index 0000000..88d7b86 --- /dev/null +++ b/apps/recorder/src/task/registry/system.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; + +use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; +use serde::{Deserialize, Serialize}; + +use crate::{ + app::AppContextTrait, + errors::RecorderResult, + task::{AsyncTaskTrait, registry::media::OptimizeImageTask}, +}; + +#[derive( + Clone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Copy, + DeriveActiveEnum, + DeriveDisplay, + EnumIter, +)] +#[sea_orm(rs_type = "String", db_type = "Text")] +pub enum SystemTaskType { + #[serde(rename = "optimize_image")] + #[sea_orm(string_value = "optimize_image")] + OptimizeImage, +} + +#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)] +pub enum SystemTask { + #[serde(rename = "optimize_image")] + OptimizeImage(OptimizeImageTask), +} + +impl SystemTask { + pub async fn run(self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::OptimizeImage(task) => task.run(ctx).await, + } + } +} diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index 6210f45..da68bbc 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -1,16 +1,18 @@ -use std::{ops::Deref, str::FromStr, sync::Arc}; +use std::{future::Future, ops::Deref, str::FromStr, sync::Arc}; use apalis::prelude::*; use apalis_sql::{ Config, context::SqlContext, - postgres::{PgListen, PostgresStorage}, + postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage}, }; +use sea_orm::sqlx::postgres::PgListener; use tokio::sync::RwLock; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, + models::cron::{self, CRON_DUE_EVENT}, task::{ SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, TaskConfig, config::{default_subscriber_task_workers, default_system_task_workers}, @@ -21,8 +23,9 @@ use crate::{ pub struct TaskService { pub config: TaskConfig, ctx: Arc, - subscriber_task_storage: Arc>>, - system_task_storage: Arc>>, + subscriber_task_storage: Arc>>, + system_task_storage: Arc>>, + cron_worker_id: String, } impl TaskService { @@ -43,12 +46,13 @@ impl TaskService { let system_task_storage_config = Config::new(SYSTEM_TASK_APALIS_NAME).set_keep_alive(config.system_task_timeout); let subscriber_task_storage = - PostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config); + ApalisPostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config); let system_task_storage = - PostgresStorage::new_with_config(pool, system_task_storage_config); + ApalisPostgresStorage::new_with_config(pool, system_task_storage_config); Ok(Self { config, + cron_worker_id: nanoid::nanoid!(), ctx, subscriber_task_storage: Arc::new(RwLock::new(subscriber_task_storage)), system_task_storage: Arc::new(RwLock::new(system_task_storage)), @@ -132,8 +136,73 @@ impl TaskService { Ok(task_id) } - pub async fn setup_monitor(&self) -> RecorderResult { - let mut monitor = Monitor::new(); + pub async fn run(&self, shutdown_signal: Option) -> RecorderResult<()> + where + F: Fn() -> Fut + Send + 'static, + Fut: Future + Send, + { + tokio::try_join!( + async { + let monitor = self.setup_apalis_monitor().await?; + if let Some(shutdown_signal) = shutdown_signal { + monitor + .run_with_signal(async move { + shutdown_signal().await; + tracing::info!("apalis shutting down..."); + Ok(()) + }) + .await?; + } else { + monitor.run().await?; + } + Ok::<_, RecorderError>(()) + }, + async { + let listener = self.setup_apalis_listener().await?; + tokio::task::spawn(async move { + if let Err(e) = listener.listen().await { + tracing::error!("Error listening to apalis: {e}"); + } + }); + Ok::<_, RecorderError>(()) + }, + async { + let listener = self.setup_cron_due_listening().await?; + let ctx = self.ctx.clone(); + let cron_worker_id = self.cron_worker_id.clone(); + + tokio::task::spawn(async move { + if let Err(e) = Self::listen_cron_due(listener, ctx, &cron_worker_id).await { + tracing::error!("Error listening to cron due: {e}"); + } + }); + + Ok::<_, RecorderError>(()) + }, + async { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + let ctx = self.ctx.clone(); + tokio::task::spawn(async move { + loop { + interval.tick().await; + if let Err(e) = cron::Model::cleanup_expired_locks(ctx.as_ref()).await { + tracing::error!("Error cleaning up expired locks: {e}"); + } + if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await + { + tracing::error!("Error checking and triggering due crons: {e}"); + } + } + }); + + Ok::<_, RecorderError>(()) + } + )?; + Ok(()) + } + + async fn setup_apalis_monitor(&self) -> RecorderResult { + let mut apalis_monitor = Monitor::new(); { let subscriber_task_worker = WorkerBuilder::new(SUBSCRIBER_TASK_APALIS_NAME) @@ -155,28 +224,51 @@ impl TaskService { .backend(self.system_task_storage.read().await.clone()) .build_fn(Self::run_system_task); - monitor = monitor + apalis_monitor = apalis_monitor .register(subscriber_task_worker) .register(system_task_worker); } - Ok(monitor) + Ok(apalis_monitor) } - pub async fn setup_listener(&self) -> RecorderResult { + async fn setup_apalis_listener(&self) -> RecorderResult { let pool = self.ctx.db().get_postgres_connection_pool().clone(); - let mut task_listener = PgListen::new(pool).await?; + let mut apalis_pg_listener = ApalisPgListen::new(pool).await?; { let mut subscriber_task_storage = self.subscriber_task_storage.write().await; - task_listener.subscribe_with(&mut subscriber_task_storage); + apalis_pg_listener.subscribe_with(&mut subscriber_task_storage); } { let mut system_task_storage = self.system_task_storage.write().await; - task_listener.subscribe_with(&mut system_task_storage); + apalis_pg_listener.subscribe_with(&mut system_task_storage); } - Ok(task_listener) + Ok(apalis_pg_listener) + } + + async fn setup_cron_due_listening(&self) -> RecorderResult { + let pool = self.ctx.db().get_postgres_connection_pool().clone(); + let listener = PgListener::connect_with(&pool).await?; + + Ok(listener) + } + + async fn listen_cron_due( + mut listener: PgListener, + ctx: Arc, + worker_id: &str, + ) -> RecorderResult<()> { + listener.listen(CRON_DUE_EVENT).await?; + loop { + let notification = listener.recv().await?; + if let Err(e) = + cron::Model::handle_cron_notification(ctx.as_ref(), notification, worker_id).await + { + tracing::error!("Error handling cron notification: {e}"); + } + } } }