diff --git a/apps/recorder/src/models/cron/mod.rs b/apps/recorder/src/models/cron/mod.rs index 98a4667..2ac57c1 100644 --- a/apps/recorder/src/models/cron/mod.rs +++ b/apps/recorder/src/models/cron/mod.rs @@ -10,8 +10,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use croner::Cron; use sea_orm::{ - ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect, - Statement, TransactionTrait, entity::prelude::*, sea_query::LockType, + ActiveValue::Set, + Condition, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect, + Statement, TransactionTrait, + entity::prelude::*, + sea_query::{ExprTrait, LockBehavior, LockType}, sqlx::postgres::PgNotification, }; use serde::{Deserialize, Serialize}; @@ -126,6 +129,7 @@ impl Model { ctx: &dyn AppContextTrait, notification: PgNotification, worker_id: &str, + retry_duration: chrono::Duration, ) -> RecorderResult<()> { let payload: Self = serde_json::from_str(notification.payload())?; let cron_id = payload.id; @@ -140,7 +144,8 @@ impl Model { } Err(e) => { tracing::error!("Error executing cron {cron_id}: {e}"); - cron.mark_cron_failed(ctx, &e.to_string()).await?; + cron.mark_cron_failed(ctx, &e.to_string(), retry_duration) + .await?; } }, None => { @@ -162,7 +167,7 @@ impl Model { let txn = db.begin().await?; let cron = Entity::find_by_id(cron_id) - .lock(LockType::Update) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) .one(&txn) .await?; @@ -235,7 +240,12 @@ impl Model { Ok(()) } - async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> { + async fn mark_cron_failed( + &self, + ctx: &dyn AppContextTrait, + error: &str, + retry_duration: chrono::Duration, + ) -> RecorderResult<()> { let db = ctx.db(); let should_retry = self.attempts < self.max_attempts; @@ -247,7 +257,7 @@ impl Model { }; let next_run = if should_retry { - Some(Utc::now() + chrono::Duration::seconds(5)) + Some(Utc::now() + retry_duration) } else { Some(self.calculate_next_run(&self.cron_expr)?) }; @@ -281,6 +291,55 @@ impl Model { Ok(()) } + pub async fn check_and_cleanup_expired_cron_locks( + ctx: &dyn AppContextTrait, + retry_duration: chrono::Duration, + ) -> RecorderResult<()> { + let db = ctx.db(); + + let condition = Condition::all() + .add(Column::Status.eq(CronStatus::Running)) + .add(Column::LastRun.is_not_null()) + .add(Column::TimeoutMs.is_not_null()) + .add( + Expr::col(Column::LastRun) + .add(Expr::col(Column::TimeoutMs).mul(Expr::cust("INTERVAL '1 millisecond'"))) + .lte(Expr::current_timestamp()), + ); + + let cron_ids = Entity::find() + .select_only() + .column(Column::Id) + .filter(condition.clone()) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .into_tuple::() + .all(db) + .await?; + + for cron_id in cron_ids { + let txn = db.begin().await?; + let locked_cron = Entity::find_by_id(cron_id) + .filter(condition.clone()) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .one(&txn) + .await?; + + if let Some(locked_cron) = locked_cron { + locked_cron + .mark_cron_failed( + ctx, + format!("Cron timeout of {}ms", locked_cron.timeout_ms).as_str(), + retry_duration, + ) + .await?; + txn.commit().await?; + } else { + txn.rollback().await?; + } + } + Ok(()) + } + fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult> { let cron_expr = Cron::new(cron_expr).parse()?; diff --git a/apps/recorder/src/task/config.rs b/apps/recorder/src/task/config.rs index 7d8379e..f01d423 100644 --- a/apps/recorder/src/task/config.rs +++ b/apps/recorder/src/task/config.rs @@ -12,6 +12,8 @@ pub struct TaskConfig { pub subscriber_task_timeout: Duration, #[serde(default = "default_system_task_timeout")] pub system_task_timeout: Duration, + #[serde(default = "default_cron_retry_duration")] + pub cron_retry_duration: Duration, } impl Default for TaskConfig { @@ -21,6 +23,7 @@ impl Default for TaskConfig { system_task_concurrency: default_system_task_workers(), subscriber_task_timeout: default_subscriber_task_timeout(), system_task_timeout: default_system_task_timeout(), + cron_retry_duration: default_cron_retry_duration(), } } } @@ -48,3 +51,7 @@ pub fn default_subscriber_task_timeout() -> Duration { pub fn default_system_task_timeout() -> Duration { Duration::from_secs(3600) } + +pub fn default_cron_retry_duration() -> Duration { + Duration::from_secs(5) +} diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index 5e0b02c..f2492cd 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -170,9 +170,14 @@ impl TaskService { let listener = self.setup_cron_due_listening().await?; let ctx = self.ctx.clone(); let cron_worker_id = self.cron_worker_id.clone(); + let retry_duration = chrono::Duration::milliseconds( + self.config.cron_retry_duration.as_millis() as i64, + ); tokio::task::spawn(async move { - if let Err(e) = Self::listen_cron_due(listener, ctx, &cron_worker_id).await { + if let Err(e) = + Self::listen_cron_due(listener, ctx, &cron_worker_id, retry_duration).await + { tracing::error!("Error listening to cron due: {e}"); } }); @@ -180,11 +185,24 @@ impl TaskService { Ok::<_, RecorderError>(()) }, async { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); let ctx = self.ctx.clone(); + let retry_duration = chrono::Duration::milliseconds( + self.config.cron_retry_duration.as_millis() as i64, + ); tokio::task::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); loop { interval.tick().await; + if let Err(e) = cron::Model::check_and_cleanup_expired_cron_locks( + ctx.as_ref(), + retry_duration, + ) + .await + { + tracing::error!( + "Error checking and cleaning up expired cron locks: {e}" + ); + } if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await { tracing::error!("Error checking and triggering due crons: {e}"); @@ -257,12 +275,19 @@ impl TaskService { mut listener: PgListener, ctx: Arc, worker_id: &str, + retry_duration: chrono::Duration, ) -> RecorderResult<()> { listener.listen(CRON_DUE_EVENT).await?; + loop { let notification = listener.recv().await?; - if let Err(e) = - cron::Model::handle_cron_notification(ctx.as_ref(), notification, worker_id).await + if let Err(e) = cron::Model::handle_cron_notification( + ctx.as_ref(), + notification, + worker_id, + retry_duration, + ) + .await { tracing::error!("Error handling cron notification: {e}"); }