fix: remove inconsistent cleanup function

This commit is contained in:
master 2025-06-27 02:18:23 +08:00
parent 3a8eb88e1a
commit c8501b1768
4 changed files with 6 additions and 59 deletions

View File

@ -1,4 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use sea_orm::ActiveEnum;
use sea_orm_migration::{prelude::*, schema::*}; use sea_orm_migration::{prelude::*, schema::*};
use crate::{ use crate::{
@ -6,7 +7,6 @@ use crate::{
Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z, Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z,
}, },
models::cron::{ models::cron::{
CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME,
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum,
CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME,
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
@ -143,14 +143,14 @@ impl MigrationTrait for Migration {
locked_at = &Cron::LockedAt.to_string(), locked_at = &Cron::LockedAt.to_string(),
timeout_ms = &Cron::TimeoutMs.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(),
status = &Cron::Status.to_string(), status = &Cron::Status.to_string(),
pending = &CronStatus::Pending.to_string(), pending = &CronStatus::Pending.to_value(),
attempts = &Cron::Attempts.to_string(), attempts = &Cron::Attempts.to_string(),
max_attempts = &Cron::MaxAttempts.to_string(), max_attempts = &Cron::MaxAttempts.to_string(),
)) ))
.await?; .await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"CREATE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} r#"CREATE OR REPLACE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME}
AFTER INSERT OR UPDATE ON {table} AFTER INSERT OR UPDATE ON {table}
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#, EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#,
@ -158,35 +158,6 @@ impl MigrationTrait for Migration {
)) ))
.await?; .await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}() RETURNS INTEGER AS $$
DECLARE
affected_count INTEGER;
BEGIN
UPDATE {table}
SET
{locked_by} = NULL,
{locked_at} = NULL,
{status} = '{pending}'
WHERE
{locked_by} IS NOT NULL
AND {timeout_ms} IS NOT NULL
AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP
AND {status} = '{running}';
GET DIAGNOSTICS affected_count = ROW_COUNT;
RETURN affected_count;
END;
$$ LANGUAGE plpgsql;"#,
table = &Cron::Table.to_string(),
locked_by = &Cron::LockedBy.to_string(),
locked_at = &Cron::LockedAt.to_string(),
status = &Cron::Status.to_string(),
running = &CronStatus::Running.to_string(),
pending = &CronStatus::Pending.to_string(),
timeout_ms = &Cron::TimeoutMs.to_string(),
))
.await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$ r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$
DECLARE DECLARE
@ -220,7 +191,7 @@ impl MigrationTrait for Migration {
next_run = &Cron::NextRun.to_string(), next_run = &Cron::NextRun.to_string(),
enabled = &Cron::Enabled.to_string(), enabled = &Cron::Enabled.to_string(),
status = &Cron::Status.to_string(), status = &Cron::Status.to_string(),
pending = &CronStatus::Pending.to_string(), pending = &CronStatus::Pending.to_value(),
locked_at = &Cron::LockedAt.to_string(), locked_at = &Cron::LockedAt.to_string(),
timeout_ms = &Cron::TimeoutMs.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(),
priority = &Cron::Priority.to_string(), priority = &Cron::Priority.to_string(),
@ -246,11 +217,6 @@ impl MigrationTrait for Migration {
)) ))
.await?; .await?;
db.execute_unprepared(&format!(
r#"DROP FUNCTION IF EXISTS {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}();"#,
))
.await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#, r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#,
)) ))

View File

@ -1,7 +1,5 @@
pub const CRON_DUE_EVENT: &str = "cron_due"; pub const CRON_DUE_EVENT: &str = "cron_due";
pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str =
"check_and_cleanup_expired_cron_locks";
pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons"; pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons";
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating";

View File

@ -2,9 +2,8 @@ mod core;
mod registry; mod registry;
pub use core::{ pub use core::{
CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT,
CRON_DUE_EVENT, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@ -270,19 +269,6 @@ impl Model {
Ok(()) Ok(())
} }
pub async fn cleanup_expired_locks(ctx: &dyn AppContextTrait) -> RecorderResult<i32> {
let db = ctx.db();
let result = db
.execute(Statement::from_string(
db.get_database_backend(),
format!("SELECT {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}()"),
))
.await?;
Ok(result.rows_affected() as i32)
}
pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> { pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> {
let db = ctx.db(); let db = ctx.db();

View File

@ -185,9 +185,6 @@ impl TaskService {
tokio::task::spawn(async move { tokio::task::spawn(async move {
loop { loop {
interval.tick().await; interval.tick().await;
if let Err(e) = cron::Model::cleanup_expired_locks(ctx.as_ref()).await {
tracing::error!("Error cleaning up expired locks: {e}");
}
if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
{ {
tracing::error!("Error checking and triggering due crons: {e}"); tracing::error!("Error checking and triggering due crons: {e}");