diff --git a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs index f725077..7dee4f8 100644 --- a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs +++ b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use sea_orm::ActiveEnum; use sea_orm_migration::{prelude::*, schema::*}; use crate::{ @@ -6,7 +7,6 @@ use crate::{ Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z, }, models::cron::{ - CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum, CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, @@ -143,14 +143,14 @@ impl MigrationTrait for Migration { locked_at = &Cron::LockedAt.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(), status = &Cron::Status.to_string(), - pending = &CronStatus::Pending.to_string(), + pending = &CronStatus::Pending.to_value(), attempts = &Cron::Attempts.to_string(), max_attempts = &Cron::MaxAttempts.to_string(), )) .await?; db.execute_unprepared(&format!( - r#"CREATE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} + r#"CREATE OR REPLACE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} AFTER INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, @@ -158,35 +158,6 @@ impl MigrationTrait for Migration { )) .await?; - db.execute_unprepared(&format!( - r#"CREATE OR REPLACE FUNCTION {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}() RETURNS INTEGER AS $$ - DECLARE - affected_count INTEGER; - BEGIN - UPDATE {table} - SET - {locked_by} = NULL, - {locked_at} = NULL, - {status} = '{pending}' - WHERE - {locked_by} IS NOT NULL - AND {timeout_ms} IS NOT NULL - AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP - AND {status} = '{running}'; - GET DIAGNOSTICS affected_count = ROW_COUNT; - RETURN affected_count; - END; - $$ LANGUAGE plpgsql;"#, - table = &Cron::Table.to_string(), - locked_by = &Cron::LockedBy.to_string(), - locked_at = &Cron::LockedAt.to_string(), - status = &Cron::Status.to_string(), - running = &CronStatus::Running.to_string(), - pending = &CronStatus::Pending.to_string(), - timeout_ms = &Cron::TimeoutMs.to_string(), - )) - .await?; - db.execute_unprepared(&format!( r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$ DECLARE @@ -220,7 +191,7 @@ impl MigrationTrait for Migration { next_run = &Cron::NextRun.to_string(), enabled = &Cron::Enabled.to_string(), status = &Cron::Status.to_string(), - pending = &CronStatus::Pending.to_string(), + pending = &CronStatus::Pending.to_value(), locked_at = &Cron::LockedAt.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(), priority = &Cron::Priority.to_string(), @@ -246,11 +217,6 @@ impl MigrationTrait for Migration { )) .await?; - db.execute_unprepared(&format!( - r#"DROP FUNCTION IF EXISTS {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}();"#, - )) - .await?; - db.execute_unprepared(&format!( r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#, )) diff --git a/apps/recorder/src/models/cron/core.rs b/apps/recorder/src/models/cron/core.rs index 2324876..cf21096 100644 --- a/apps/recorder/src/models/cron/core.rs +++ b/apps/recorder/src/models/cron/core.rs @@ -1,7 +1,5 @@ pub const CRON_DUE_EVENT: &str = "cron_due"; -pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str = - "check_and_cleanup_expired_cron_locks"; pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons"; pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; diff --git a/apps/recorder/src/models/cron/mod.rs b/apps/recorder/src/models/cron/mod.rs index dc9b3a5..98a4667 100644 --- a/apps/recorder/src/models/cron/mod.rs +++ b/apps/recorder/src/models/cron/mod.rs @@ -2,9 +2,8 @@ mod core; mod registry; pub use core::{ - CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, - CRON_DUE_EVENT, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, - NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, + NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, }; use async_trait::async_trait; @@ -270,19 +269,6 @@ impl Model { Ok(()) } - pub async fn cleanup_expired_locks(ctx: &dyn AppContextTrait) -> RecorderResult { - let db = ctx.db(); - - let result = db - .execute(Statement::from_string( - db.get_database_backend(), - format!("SELECT {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}()"), - )) - .await?; - - Ok(result.rows_affected() as i32) - } - pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> { let db = ctx.db(); diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index da68bbc..5e0b02c 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -185,9 +185,6 @@ impl TaskService { tokio::task::spawn(async move { loop { interval.tick().await; - if let Err(e) = cron::Model::cleanup_expired_locks(ctx.as_ref()).await { - tracing::error!("Error cleaning up expired locks: {e}"); - } if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await { tracing::error!("Error checking and triggering due crons: {e}");